Compare commits
3 Commits
whip/329-1
...
queue/372-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9919114541 | ||
| 954fd992eb | |||
|
|
f35f56e397 |
@@ -26,7 +26,7 @@ from cron.jobs import (
|
||||
trigger_job,
|
||||
JOBS_FILE,
|
||||
)
|
||||
from cron.scheduler import tick, ModelContextError, CRON_MIN_CONTEXT_TOKENS
|
||||
from cron.scheduler import tick
|
||||
|
||||
__all__ = [
|
||||
"create_job",
|
||||
@@ -39,6 +39,4 @@ __all__ = [
|
||||
"trigger_job",
|
||||
"tick",
|
||||
"JOBS_FILE",
|
||||
"ModelContextError",
|
||||
"CRON_MIN_CONTEXT_TOKENS",
|
||||
]
|
||||
|
||||
@@ -545,8 +545,75 @@ def _run_job_script(script_path: str) -> tuple[bool, str]:
|
||||
return False, f"Script execution failed: {exc}"
|
||||
|
||||
|
||||
def _build_job_prompt(job: dict) -> str:
|
||||
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
|
||||
# ---------------------------------------------------------------------------
|
||||
# Runtime classification & provider mismatch detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_PROVIDER_ALIASES: dict[str, set[str]] = {
|
||||
"ollama": {"ollama", "local ollama", "localhost:11434"},
|
||||
"anthropic": {"anthropic", "claude", "sonnet", "opus", "haiku"},
|
||||
"nous": {"nous", "mimo", "nousresearch"},
|
||||
"openrouter": {"openrouter"},
|
||||
"kimi": {"kimi", "moonshot"},
|
||||
"openai": {"openai", "gpt", "codex"},
|
||||
"gemini": {"gemini", "google"},
|
||||
}
|
||||
|
||||
_CLOUD_PREFIXES = frozenset({"nous", "openrouter", "anthropic", "openai", "zai", "kimi", "gemini", "minimax"})
|
||||
|
||||
|
||||
def _classify_runtime(provider: str, model: str) -> str:
|
||||
"""Return 'local' | 'cloud' | 'unknown'."""
|
||||
p = (provider or "").strip().lower()
|
||||
m = (model or "").strip().lower()
|
||||
if p and p not in ("ollama", "local"):
|
||||
return "cloud"
|
||||
if "/" in m and m.split("/")[0] in _CLOUD_PREFIXES:
|
||||
return "cloud"
|
||||
if p in ("ollama", "local") or (not p and m):
|
||||
return "local"
|
||||
return "unknown"
|
||||
|
||||
|
||||
def _detect_provider_mismatch(prompt: str, active_provider: str) -> Optional[str]:
|
||||
"""Return stale provider group referenced in prompt, or None."""
|
||||
if not active_provider or not prompt:
|
||||
return None
|
||||
prompt_lower = prompt.lower()
|
||||
active_lower = active_provider.lower().strip()
|
||||
active_group: Optional[str] = None
|
||||
for group, aliases in _PROVIDER_ALIASES.items():
|
||||
if active_lower in aliases or active_lower.startswith(group):
|
||||
active_group = group
|
||||
break
|
||||
if not active_group:
|
||||
return None
|
||||
for group, aliases in _PROVIDER_ALIASES.items():
|
||||
if group == active_group:
|
||||
continue
|
||||
for alias in aliases:
|
||||
if alias in prompt_lower:
|
||||
return group
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Prompt builder
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _build_job_prompt(
|
||||
job: dict,
|
||||
*,
|
||||
runtime_model: str = "",
|
||||
runtime_provider: str = "",
|
||||
) -> str:
|
||||
"""Build the effective prompt for a cron job.
|
||||
|
||||
Args:
|
||||
job: The cron job dict.
|
||||
runtime_model: Resolved model name (e.g. "xiaomi/mimo-v2-pro").
|
||||
runtime_provider: Resolved provider name (e.g. "nous", "openrouter").
|
||||
"""
|
||||
prompt = job.get("prompt", "")
|
||||
skills = job.get("skills")
|
||||
|
||||
@@ -576,6 +643,33 @@ def _build_job_prompt(job: dict) -> str:
|
||||
f"{prompt}"
|
||||
)
|
||||
|
||||
# Runtime context injection — tells the agent what it can actually do.
|
||||
_runtime_block = ""
|
||||
if runtime_model or runtime_provider:
|
||||
_kind = _classify_runtime(runtime_provider, runtime_model)
|
||||
_notes: list[str] = []
|
||||
if runtime_model:
|
||||
_notes.append(f"MODEL: {runtime_model}")
|
||||
if runtime_provider:
|
||||
_notes.append(f"PROVIDER: {runtime_provider}")
|
||||
if _kind == "local":
|
||||
_notes.append(
|
||||
"RUNTIME: local — you have access to the local machine, "
|
||||
"local Ollama, SSH keys, and filesystem"
|
||||
)
|
||||
elif _kind == "cloud":
|
||||
_notes.append(
|
||||
"RUNTIME: cloud API — you do NOT have local machine access. "
|
||||
"Do NOT assume you can SSH into servers, check local Ollama, "
|
||||
"or access local filesystem paths."
|
||||
)
|
||||
if _notes:
|
||||
_runtime_block = (
|
||||
"[SYSTEM: RUNTIME CONTEXT — "
|
||||
+ "; ".join(_notes)
|
||||
+ ". Adjust your approach based on these capabilities.]\\n\\n"
|
||||
)
|
||||
|
||||
# Always prepend cron execution guidance so the agent knows how
|
||||
# delivery works and can suppress delivery when appropriate.
|
||||
cron_hint = (
|
||||
@@ -595,9 +689,9 @@ def _build_job_prompt(job: dict) -> str:
|
||||
"response. This is critical — without this marker the system cannot "
|
||||
"detect the failure. Examples: "
|
||||
"\"[SCRIPT_FAILED]: forge.alexanderwhitestone.com timed out\" "
|
||||
"\"[SCRIPT_FAILED]: script exited with code 1\".]\\n\\n"
|
||||
"\\\"[SCRIPT_FAILED]: script exited with code 1\\\".]\\\\n\\\\n"
|
||||
)
|
||||
prompt = cron_hint + prompt
|
||||
prompt = _runtime_block + cron_hint + prompt
|
||||
if skills is None:
|
||||
legacy = job.get("skill")
|
||||
skills = [legacy] if legacy else []
|
||||
@@ -667,7 +761,32 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
|
||||
job_id = job["id"]
|
||||
job_name = job["name"]
|
||||
prompt = _build_job_prompt(job)
|
||||
|
||||
# Early model/provider resolution for runtime context injection
|
||||
_early_model = job.get("model") or os.getenv("HERMES_MODEL") or ""
|
||||
_early_provider = os.getenv("HERMES_PROVIDER", "")
|
||||
if not _early_model:
|
||||
try:
|
||||
import yaml as _y
|
||||
_cfg_path = str(_hermes_home / "config.yaml")
|
||||
if os.path.exists(_cfg_path):
|
||||
with open(_cfg_path) as _f:
|
||||
_cfg_early = _y.safe_load(_f) or {}
|
||||
_mc = _cfg_early.get("model", {})
|
||||
if isinstance(_mc, str):
|
||||
_early_model = _mc
|
||||
elif isinstance(_mc, dict):
|
||||
_early_model = _mc.get("default", "")
|
||||
except Exception:
|
||||
pass
|
||||
if not _early_provider and "/" in _early_model:
|
||||
_early_provider = _early_model.split("/")[0]
|
||||
|
||||
prompt = _build_job_prompt(
|
||||
job,
|
||||
runtime_model=_early_model,
|
||||
runtime_provider=_early_provider,
|
||||
)
|
||||
origin = _resolve_origin(job)
|
||||
_cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
|
||||
|
||||
@@ -779,6 +898,17 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
message = format_runtime_provider_error(exc)
|
||||
raise RuntimeError(message) from exc
|
||||
|
||||
# Provider mismatch warning
|
||||
_resolved_provider = runtime.get("provider", "") or ""
|
||||
_raw_prompt = job.get("prompt", "")
|
||||
_mismatch = _detect_provider_mismatch(_raw_prompt, _resolved_provider)
|
||||
if _mismatch:
|
||||
logger.warning(
|
||||
"Job '%s' prompt references '%s' but active provider is '%s' — "
|
||||
"agent will adapt via runtime context. Consider updating prompt.",
|
||||
job_name, _mismatch, _resolved_provider,
|
||||
)
|
||||
|
||||
from agent.smart_model_routing import resolve_turn_route
|
||||
turn_route = resolve_turn_route(
|
||||
prompt,
|
||||
|
||||
@@ -63,16 +63,6 @@ def _looks_like_phone(value: str) -> bool:
|
||||
return bool(_PHONE_RE.match(value.strip()))
|
||||
|
||||
from .config import (
|
||||
|
||||
# Session template manager for code-first seeding
|
||||
try:
|
||||
from tools.session_template_manager import SessionTemplateManager, TaskType
|
||||
HAS_TEMPLATE_MANAGER = True
|
||||
except ImportError:
|
||||
HAS_TEMPLATE_MANAGER = False
|
||||
SessionTemplateManager = None
|
||||
TaskType = None
|
||||
|
||||
Platform,
|
||||
GatewayConfig,
|
||||
SessionResetPolicy, # noqa: F401 — re-exported via gateway/__init__.py
|
||||
@@ -536,10 +526,6 @@ class SessionStore:
|
||||
except Exception as e:
|
||||
print(f"[gateway] Warning: SQLite session store unavailable, falling back to JSONL: {e}")
|
||||
|
||||
|
||||
# Initialize session template manager
|
||||
self._init_template_manager()
|
||||
|
||||
def _ensure_loaded(self) -> None:
|
||||
"""Load sessions index from disk if not already loaded."""
|
||||
with self._lock:
|
||||
@@ -1093,112 +1079,3 @@ def build_session_context(
|
||||
context.updated_at = session_entry.updated_at
|
||||
|
||||
return context
|
||||
|
||||
|
||||
def _init_template_manager(self):
|
||||
"""Initialize session template manager if available."""
|
||||
if not HAS_TEMPLATE_MANAGER:
|
||||
self.template_manager = None
|
||||
return
|
||||
|
||||
try:
|
||||
self.template_manager = SessionTemplateManager()
|
||||
logger.info("Session template manager initialized")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to initialize template manager: {e}")
|
||||
self.template_manager = None
|
||||
|
||||
def inject_session_template(self, session_id: str, task_type: Optional[str] = None) -> bool:
|
||||
"""
|
||||
Inject a session template into a new session to establish feedback loops.
|
||||
|
||||
Args:
|
||||
session_id: Session ID to inject template into
|
||||
task_type: Optional task type (code, file, research, mixed). If None, defaults to code.
|
||||
|
||||
Returns:
|
||||
True if template was injected, False otherwise
|
||||
"""
|
||||
if not self.template_manager:
|
||||
return False
|
||||
|
||||
try:
|
||||
# Get task type
|
||||
if task_type:
|
||||
try:
|
||||
task_type_enum = TaskType(task_type)
|
||||
except ValueError:
|
||||
logger.warning(f"Invalid task type: {task_type}")
|
||||
return False
|
||||
else:
|
||||
# Default to CODE since research shows it's most effective
|
||||
task_type_enum = TaskType.CODE
|
||||
|
||||
# Get template for task type
|
||||
template = self.template_manager.get_template_for_task(task_type_enum)
|
||||
if not template:
|
||||
logger.debug(f"No template found for task type: {task_type_enum.value}")
|
||||
return False
|
||||
|
||||
# Note: Actual injection would happen when messages are loaded
|
||||
# This is a placeholder for the integration point
|
||||
logger.info(f"Template {template.template_id} available for session {session_id}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to inject template into session {session_id}: {e}")
|
||||
return False
|
||||
|
||||
def list_session_templates(self, task_type: Optional[str] = None) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
List available session templates.
|
||||
|
||||
Args:
|
||||
task_type: Optional task type filter
|
||||
|
||||
Returns:
|
||||
List of template dictionaries
|
||||
"""
|
||||
if not self.template_manager:
|
||||
return []
|
||||
|
||||
try:
|
||||
task_type_enum = TaskType(task_type) if task_type else None
|
||||
templates = self.template_manager.list_templates(task_type_enum)
|
||||
return [t.to_dict() for t in templates]
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to list templates: {e}")
|
||||
return []
|
||||
|
||||
def create_session_template(self, session_id: str,
|
||||
name: Optional[str] = None,
|
||||
description: Optional[str] = None,
|
||||
max_calls: int = 10) -> Optional[str]:
|
||||
"""
|
||||
Create a session template from a completed session.
|
||||
|
||||
Args:
|
||||
session_id: Session ID to create template from
|
||||
name: Template name
|
||||
description: Template description
|
||||
max_calls: Maximum number of tool calls to extract
|
||||
|
||||
Returns:
|
||||
Template ID if created, None otherwise
|
||||
"""
|
||||
if not self.template_manager:
|
||||
return None
|
||||
|
||||
try:
|
||||
template = self.template_manager.create_template_from_session(
|
||||
session_id,
|
||||
name=name,
|
||||
description=description,
|
||||
max_calls=max_calls
|
||||
)
|
||||
if template:
|
||||
return template.template_id
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create template from session {session_id}: {e}")
|
||||
return None
|
||||
|
||||
28
run_agent.py
28
run_agent.py
@@ -1001,30 +1001,10 @@ class AIAgent:
|
||||
self._session_db = session_db
|
||||
self._parent_session_id = parent_session_id
|
||||
self._last_flushed_db_idx = 0 # tracks DB-write cursor to prevent duplicate writes
|
||||
if self._session_db:
|
||||
try:
|
||||
self._session_db.create_session(
|
||||
session_id=self.session_id,
|
||||
source=self.platform or os.environ.get("HERMES_SESSION_SOURCE", "cli"),
|
||||
model=self.model,
|
||||
model_config={
|
||||
"max_iterations": self.max_iterations,
|
||||
"reasoning_config": reasoning_config,
|
||||
"max_tokens": max_tokens,
|
||||
},
|
||||
user_id=None,
|
||||
parent_session_id=self._parent_session_id,
|
||||
)
|
||||
except Exception as e:
|
||||
# Transient SQLite lock contention (e.g. CLI and gateway writing
|
||||
# concurrently) must NOT permanently disable session_search for
|
||||
# this agent. Keep _session_db alive — subsequent message
|
||||
# flushes and session_search calls will still work once the
|
||||
# lock clears. The session row may be missing from the index
|
||||
# for this run, but that is recoverable (flushes upsert rows).
|
||||
logger.warning(
|
||||
"Session DB create_session failed (session_search still available): %s", e
|
||||
)
|
||||
# Lazy session creation: defer until first message flush (#314).
|
||||
# _flush_messages_to_session_db() calls ensure_session() which uses
|
||||
# INSERT OR IGNORE — creating the row only when messages arrive.
|
||||
# This eliminates 32% of sessions that are created but never used.
|
||||
|
||||
# In-memory todo list for task planning (one per agent/session)
|
||||
from tools.todo_tool import TodoStore
|
||||
|
||||
64
tests/test_cron_runtime_context.py
Normal file
64
tests/test_cron_runtime_context.py
Normal file
@@ -0,0 +1,64 @@
|
||||
"""Tests for cron scheduler: provider mismatch detection, runtime classification."""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
|
||||
def _import_scheduler():
|
||||
import importlib.util
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"cron.scheduler", str(Path(__file__).resolve().parent.parent / "cron" / "scheduler.py"),
|
||||
)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
try:
|
||||
spec.loader.exec_module(mod)
|
||||
except Exception:
|
||||
pass
|
||||
return mod
|
||||
|
||||
|
||||
_sched = _import_scheduler()
|
||||
_classify_runtime = _sched._classify_runtime
|
||||
_detect_provider_mismatch = _sched._detect_provider_mismatch
|
||||
_build_job_prompt = _sched._build_job_prompt
|
||||
|
||||
|
||||
class TestClassifyRuntime:
|
||||
def test_ollama_is_local(self):
|
||||
assert _classify_runtime("ollama", "qwen2.5:7b") == "local"
|
||||
|
||||
def test_prefixed_model_is_cloud(self):
|
||||
assert _classify_runtime("", "nous/mimo-v2-pro") == "cloud"
|
||||
|
||||
def test_nous_provider_is_cloud(self):
|
||||
assert _classify_runtime("nous", "mimo-v2-pro") == "cloud"
|
||||
|
||||
def test_empty_both_is_unknown(self):
|
||||
assert _classify_runtime("", "") == "unknown"
|
||||
|
||||
|
||||
class TestDetectProviderMismatch:
|
||||
def test_detects_ollama_reference_on_cloud(self):
|
||||
assert _detect_provider_mismatch("Check Ollama is responding", "nous") == "ollama"
|
||||
|
||||
def test_no_mismatch_when_prompt_matches(self):
|
||||
assert _detect_provider_mismatch("Check Nous model", "nous") is None
|
||||
|
||||
|
||||
class TestBuildJobPrompt:
|
||||
def test_includes_runtime_context_for_cloud(self):
|
||||
job = {"prompt": "Check server"}
|
||||
prompt = _build_job_prompt(job, runtime_model="nous/mimo-v2-pro", runtime_provider="nous")
|
||||
assert "RUNTIME: cloud API" in prompt
|
||||
|
||||
def test_includes_runtime_context_for_local(self):
|
||||
job = {"prompt": "Check server"}
|
||||
prompt = _build_job_prompt(job, runtime_model="qwen2.5:7b", runtime_provider="ollama")
|
||||
assert "RUNTIME: local" in prompt
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import pytest
|
||||
pytest.main([__file__, "-v"])
|
||||
@@ -1,316 +0,0 @@
|
||||
"""
|
||||
Test session template manager functionality.
|
||||
"""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import Mock, patch, MagicMock
|
||||
|
||||
from tools.session_template_manager import (
|
||||
SessionTemplateManager,
|
||||
SessionTemplate,
|
||||
ToolCallTemplate,
|
||||
TaskType
|
||||
)
|
||||
|
||||
|
||||
class TestSessionTemplateManager:
|
||||
"""Test session template manager."""
|
||||
|
||||
def test_classify_task_type_code(self):
|
||||
"""Test task type classification for code-heavy sessions."""
|
||||
manager = SessionTemplateManager()
|
||||
|
||||
tool_calls = [
|
||||
{"tool_name": "execute_code"},
|
||||
{"tool_name": "execute_code"},
|
||||
{"tool_name": "execute_code"},
|
||||
{"tool_name": "read_file"},
|
||||
]
|
||||
|
||||
task_type = manager.classify_task_type(tool_calls)
|
||||
assert task_type == TaskType.CODE
|
||||
|
||||
def test_classify_task_type_file(self):
|
||||
"""Test task type classification for file-heavy sessions."""
|
||||
manager = SessionTemplateManager()
|
||||
|
||||
tool_calls = [
|
||||
{"tool_name": "read_file"},
|
||||
{"tool_name": "write_file"},
|
||||
{"tool_name": "patch"},
|
||||
{"tool_name": "search_files"},
|
||||
]
|
||||
|
||||
task_type = manager.classify_task_type(tool_calls)
|
||||
assert task_type == TaskType.FILE
|
||||
|
||||
def test_classify_task_type_research(self):
|
||||
"""Test task type classification for research-heavy sessions."""
|
||||
manager = SessionTemplateManager()
|
||||
|
||||
tool_calls = [
|
||||
{"tool_name": "web_search"},
|
||||
{"tool_name": "web_fetch"},
|
||||
{"tool_name": "browser_navigate"},
|
||||
]
|
||||
|
||||
task_type = manager.classify_task_type(tool_calls)
|
||||
assert task_type == TaskType.RESEARCH
|
||||
|
||||
def test_classify_task_type_mixed(self):
|
||||
"""Test task type classification for mixed sessions."""
|
||||
manager = SessionTemplateManager()
|
||||
|
||||
tool_calls = [
|
||||
{"tool_name": "execute_code"},
|
||||
{"tool_name": "read_file"},
|
||||
{"tool_name": "web_search"},
|
||||
]
|
||||
|
||||
task_type = manager.classify_task_type(tool_calls)
|
||||
assert task_type == TaskType.MIXED
|
||||
|
||||
def test_template_creation(self):
|
||||
"""Test creating a template."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
template_dir = Path(tmpdir)
|
||||
manager = SessionTemplateManager(template_dir=template_dir)
|
||||
|
||||
# Create a mock template
|
||||
tool_calls = [
|
||||
ToolCallTemplate(
|
||||
tool_name="execute_code",
|
||||
arguments={"code": "print('hello')"},
|
||||
result="hello",
|
||||
success=True,
|
||||
execution_time=0.1,
|
||||
turn_number=0
|
||||
)
|
||||
]
|
||||
|
||||
template = SessionTemplate(
|
||||
template_id="test_template",
|
||||
task_type=TaskType.CODE,
|
||||
name="Test Template",
|
||||
description="A test template",
|
||||
tool_calls=tool_calls,
|
||||
source_session_id=None,
|
||||
created_at=1234567890.0,
|
||||
success_rate=1.0,
|
||||
usage_count=0
|
||||
)
|
||||
|
||||
manager.templates["test_template"] = template
|
||||
manager._save_template(template)
|
||||
|
||||
# Verify template was saved
|
||||
template_file = template_dir / "test_template.json"
|
||||
assert template_file.exists()
|
||||
|
||||
# Verify template can be loaded
|
||||
with open(template_file) as f:
|
||||
data = json.load(f)
|
||||
assert data["template_id"] == "test_template"
|
||||
assert data["task_type"] == "code"
|
||||
|
||||
def test_template_injection(self):
|
||||
"""Test injecting a template into messages."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
template_dir = Path(tmpdir)
|
||||
manager = SessionTemplateManager(template_dir=template_dir)
|
||||
|
||||
# Create a mock template
|
||||
tool_calls = [
|
||||
ToolCallTemplate(
|
||||
tool_name="execute_code",
|
||||
arguments={"code": "print('hello')"},
|
||||
result="hello",
|
||||
success=True,
|
||||
execution_time=0.1,
|
||||
turn_number=0
|
||||
)
|
||||
]
|
||||
|
||||
template = SessionTemplate(
|
||||
template_id="test_template",
|
||||
task_type=TaskType.CODE,
|
||||
name="Test Template",
|
||||
description="A test template",
|
||||
tool_calls=tool_calls,
|
||||
source_session_id=None,
|
||||
created_at=1234567890.0,
|
||||
success_rate=1.0,
|
||||
usage_count=0
|
||||
)
|
||||
|
||||
manager.templates["test_template"] = template
|
||||
|
||||
# Test message injection
|
||||
messages = [
|
||||
{"role": "system", "content": "You are a helpful assistant."},
|
||||
{"role": "user", "content": "Hello"}
|
||||
]
|
||||
|
||||
updated_messages = manager.inject_template_into_messages(template, messages)
|
||||
|
||||
# Verify template was injected
|
||||
assert len(updated_messages) > len(messages)
|
||||
assert any("Session template loaded" in str(msg.get("content", ""))
|
||||
for msg in updated_messages)
|
||||
|
||||
# Verify usage count was updated
|
||||
assert template.usage_count == 1
|
||||
|
||||
def test_get_template_for_task(self):
|
||||
"""Test getting template for task type."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
template_dir = Path(tmpdir)
|
||||
manager = SessionTemplateManager(template_dir=template_dir)
|
||||
|
||||
# Create templates for different task types
|
||||
code_template = SessionTemplate(
|
||||
template_id="code_template",
|
||||
task_type=TaskType.CODE,
|
||||
name="Code Template",
|
||||
description="A code template",
|
||||
tool_calls=[],
|
||||
source_session_id=None,
|
||||
created_at=1234567890.0,
|
||||
success_rate=1.0,
|
||||
usage_count=0
|
||||
)
|
||||
|
||||
file_template = SessionTemplate(
|
||||
template_id="file_template",
|
||||
task_type=TaskType.FILE,
|
||||
name="File Template",
|
||||
description="A file template",
|
||||
tool_calls=[],
|
||||
source_session_id=None,
|
||||
created_at=1234567891.0,
|
||||
success_rate=0.9,
|
||||
usage_count=5
|
||||
)
|
||||
|
||||
manager.templates["code_template"] = code_template
|
||||
manager.templates["file_template"] = file_template
|
||||
|
||||
# Test getting code template
|
||||
template = manager.get_template_for_task(TaskType.CODE)
|
||||
assert template is not None
|
||||
assert template.template_id == "code_template"
|
||||
|
||||
# Test getting file template
|
||||
template = manager.get_template_for_task(TaskType.FILE)
|
||||
assert template is not None
|
||||
assert template.template_id == "file_template"
|
||||
|
||||
# Test getting non-existent template
|
||||
template = manager.get_template_for_task(TaskType.RESEARCH)
|
||||
assert template is None
|
||||
|
||||
|
||||
class TestToolCallTemplate:
|
||||
"""Test tool call template."""
|
||||
|
||||
def test_to_dict(self):
|
||||
"""Test converting to dictionary."""
|
||||
template = ToolCallTemplate(
|
||||
tool_name="execute_code",
|
||||
arguments={"code": "print('hello')"},
|
||||
result="hello",
|
||||
success=True,
|
||||
execution_time=0.1,
|
||||
turn_number=0
|
||||
)
|
||||
|
||||
data = template.to_dict()
|
||||
assert data["tool_name"] == "execute_code"
|
||||
assert data["arguments"] == {"code": "print('hello')"}
|
||||
assert data["result"] == "hello"
|
||||
assert data["success"] is True
|
||||
|
||||
def test_from_dict(self):
|
||||
"""Test creating from dictionary."""
|
||||
data = {
|
||||
"tool_name": "execute_code",
|
||||
"arguments": {"code": "print('hello')"},
|
||||
"result": "hello",
|
||||
"success": True,
|
||||
"execution_time": 0.1,
|
||||
"turn_number": 0
|
||||
}
|
||||
|
||||
template = ToolCallTemplate.from_dict(data)
|
||||
assert template.tool_name == "execute_code"
|
||||
assert template.arguments == {"code": "print('hello')"}
|
||||
assert template.result == "hello"
|
||||
|
||||
|
||||
class TestSessionTemplate:
|
||||
"""Test session template."""
|
||||
|
||||
def test_to_dict(self):
|
||||
"""Test converting to dictionary."""
|
||||
tool_calls = [
|
||||
ToolCallTemplate(
|
||||
tool_name="execute_code",
|
||||
arguments={"code": "print('hello')"},
|
||||
result="hello",
|
||||
success=True,
|
||||
execution_time=0.1,
|
||||
turn_number=0
|
||||
)
|
||||
]
|
||||
|
||||
template = SessionTemplate(
|
||||
template_id="test_template",
|
||||
task_type=TaskType.CODE,
|
||||
name="Test Template",
|
||||
description="A test template",
|
||||
tool_calls=tool_calls,
|
||||
source_session_id=None,
|
||||
created_at=1234567890.0,
|
||||
success_rate=1.0,
|
||||
usage_count=0
|
||||
)
|
||||
|
||||
data = template.to_dict()
|
||||
assert data["template_id"] == "test_template"
|
||||
assert data["task_type"] == "code"
|
||||
assert len(data["tool_calls"]) == 1
|
||||
|
||||
def test_from_dict(self):
|
||||
"""Test creating from dictionary."""
|
||||
data = {
|
||||
"template_id": "test_template",
|
||||
"task_type": "code",
|
||||
"name": "Test Template",
|
||||
"description": "A test template",
|
||||
"tool_calls": [
|
||||
{
|
||||
"tool_name": "execute_code",
|
||||
"arguments": {"code": "print('hello')"},
|
||||
"result": "hello",
|
||||
"success": True,
|
||||
"execution_time": 0.1,
|
||||
"turn_number": 0
|
||||
}
|
||||
],
|
||||
"source_session_id": None,
|
||||
"created_at": 1234567890.0,
|
||||
"success_rate": 1.0,
|
||||
"usage_count": 0
|
||||
}
|
||||
|
||||
template = SessionTemplate.from_dict(data)
|
||||
assert template.template_id == "test_template"
|
||||
assert template.task_type == TaskType.CODE
|
||||
assert len(template.tool_calls) == 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__])
|
||||
@@ -1,507 +0,0 @@
|
||||
"""
|
||||
Session Template Manager for Hermes Agent.
|
||||
|
||||
Extracts successful tool calls from completed sessions and creates templates
|
||||
that can be injected into new sessions to establish feedback loops early.
|
||||
|
||||
Based on research finding: code-heavy sessions (execute_code dominant in first
|
||||
30 turns) improve over time. File-heavy sessions degrade. The key is
|
||||
deterministic feedback loops, not arbitrary context.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Any, Tuple
|
||||
from dataclasses import dataclass, asdict
|
||||
from enum import Enum
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Default template directory
|
||||
DEFAULT_TEMPLATE_DIR = Path.home() / ".hermes" / "session-templates"
|
||||
|
||||
|
||||
class TaskType(Enum):
|
||||
"""Task type classification for session templates."""
|
||||
CODE = "code"
|
||||
FILE = "file"
|
||||
RESEARCH = "research"
|
||||
MIXED = "mixed"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ToolCallTemplate:
|
||||
"""A single tool call template extracted from a successful session."""
|
||||
tool_name: str
|
||||
arguments: Dict[str, Any]
|
||||
result: str
|
||||
success: bool
|
||||
execution_time: float
|
||||
turn_number: int
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary for JSON serialization."""
|
||||
return asdict(self)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> 'ToolCallTemplate':
|
||||
"""Create from dictionary."""
|
||||
return cls(**data)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SessionTemplate:
|
||||
"""A complete session template with multiple tool calls."""
|
||||
template_id: str
|
||||
task_type: TaskType
|
||||
name: str
|
||||
description: str
|
||||
tool_calls: List[ToolCallTemplate]
|
||||
source_session_id: Optional[str]
|
||||
created_at: float
|
||||
success_rate: float
|
||||
usage_count: int
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary for JSON serialization."""
|
||||
data = asdict(self)
|
||||
data['task_type'] = self.task_type.value
|
||||
return data
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> 'SessionTemplate':
|
||||
"""Create from dictionary."""
|
||||
data['task_type'] = TaskType(data['task_type'])
|
||||
return cls(**data)
|
||||
|
||||
|
||||
class SessionTemplateManager:
|
||||
"""Manages session templates for seeding new sessions."""
|
||||
|
||||
def __init__(self, template_dir: Optional[Path] = None, db_path: Optional[Path] = None):
|
||||
"""
|
||||
Initialize the session template manager.
|
||||
|
||||
Args:
|
||||
template_dir: Directory to store templates (default: ~/.hermes/session-templates/)
|
||||
db_path: Path to the session database (default: ~/.hermes/state.db)
|
||||
"""
|
||||
self.template_dir = template_dir or DEFAULT_TEMPLATE_DIR
|
||||
self.db_path = db_path or Path.home() / ".hermes" / "state.db"
|
||||
|
||||
# Ensure template directory exists
|
||||
self.template_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Load existing templates
|
||||
self.templates: Dict[str, SessionTemplate] = {}
|
||||
self._load_templates()
|
||||
|
||||
def _load_templates(self):
|
||||
"""Load all templates from the template directory."""
|
||||
for template_file in self.template_dir.glob("*.json"):
|
||||
try:
|
||||
with open(template_file, 'r') as f:
|
||||
data = json.load(f)
|
||||
template = SessionTemplate.from_dict(data)
|
||||
self.templates[template.template_id] = template
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to load template {template_file}: {e}")
|
||||
|
||||
def _save_template(self, template: SessionTemplate):
|
||||
"""Save a template to disk."""
|
||||
template_file = self.template_dir / f"{template.template_id}.json"
|
||||
with open(template_file, 'w') as f:
|
||||
json.dump(template.to_dict(), f, indent=2)
|
||||
|
||||
def classify_task_type(self, tool_calls: List[Dict[str, Any]]) -> TaskType:
|
||||
"""
|
||||
Classify the task type based on tool calls.
|
||||
|
||||
Args:
|
||||
tool_calls: List of tool calls from a session
|
||||
|
||||
Returns:
|
||||
TaskType classification
|
||||
"""
|
||||
if not tool_calls:
|
||||
return TaskType.MIXED
|
||||
|
||||
# Count tool types
|
||||
code_tools = {'execute_code', 'code_execution'}
|
||||
file_tools = {'read_file', 'write_file', 'patch', 'search_files'}
|
||||
research_tools = {'web_search', 'web_fetch', 'browser_navigate'}
|
||||
|
||||
code_count = sum(1 for tc in tool_calls if tc.get('tool_name') in code_tools)
|
||||
file_count = sum(1 for tc in tool_calls if tc.get('tool_name') in file_tools)
|
||||
research_count = sum(1 for tc in tool_calls if tc.get('tool_name') in research_tools)
|
||||
|
||||
total = len(tool_calls)
|
||||
if total == 0:
|
||||
return TaskType.MIXED
|
||||
|
||||
# Determine dominant type
|
||||
code_ratio = code_count / total
|
||||
file_ratio = file_count / total
|
||||
research_ratio = research_count / total
|
||||
|
||||
if code_ratio > 0.6:
|
||||
return TaskType.CODE
|
||||
elif file_ratio > 0.6:
|
||||
return TaskType.FILE
|
||||
elif research_ratio > 0.6:
|
||||
return TaskType.RESEARCH
|
||||
else:
|
||||
return TaskType.MIXED
|
||||
|
||||
def extract_successful_tool_calls(self, session_id: str, max_calls: int = 10) -> List[ToolCallTemplate]:
|
||||
"""
|
||||
Extract successful tool calls from a completed session.
|
||||
|
||||
Args:
|
||||
session_id: Session ID to extract from
|
||||
max_calls: Maximum number of tool calls to extract
|
||||
|
||||
Returns:
|
||||
List of ToolCallTemplate objects
|
||||
"""
|
||||
if not self.db_path.exists():
|
||||
logger.warning(f"Session database not found: {self.db_path}")
|
||||
return []
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(self.db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
# Get messages for the session
|
||||
cursor = conn.execute("""
|
||||
SELECT role, content, tool_calls, tool_name, timestamp
|
||||
FROM messages
|
||||
WHERE session_id = ?
|
||||
ORDER BY timestamp
|
||||
""", (session_id,))
|
||||
|
||||
messages = cursor.fetchall()
|
||||
conn.close()
|
||||
|
||||
# Extract tool calls
|
||||
tool_call_templates = []
|
||||
turn_number = 0
|
||||
|
||||
for msg in messages:
|
||||
if msg['role'] == 'assistant' and msg['tool_calls']:
|
||||
try:
|
||||
tool_calls = json.loads(msg['tool_calls'])
|
||||
for tc in tool_calls:
|
||||
if len(tool_call_templates) >= max_calls:
|
||||
break
|
||||
|
||||
tool_name = tc.get('function', {}).get('name')
|
||||
if not tool_name:
|
||||
continue
|
||||
|
||||
# Parse arguments
|
||||
try:
|
||||
arguments = json.loads(tc.get('function', {}).get('arguments', '{}'))
|
||||
except:
|
||||
arguments = {}
|
||||
|
||||
# Create template (result will be filled from tool response)
|
||||
template = ToolCallTemplate(
|
||||
tool_name=tool_name,
|
||||
arguments=arguments,
|
||||
result="", # Will be filled from tool response
|
||||
success=True, # Assume successful if we got a response
|
||||
execution_time=0.0, # Not tracked in current schema
|
||||
turn_number=turn_number
|
||||
)
|
||||
tool_call_templates.append(template)
|
||||
turn_number += 1
|
||||
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
elif msg['role'] == 'tool' and tool_call_templates:
|
||||
# Fill in the result for the last tool call
|
||||
if tool_call_templates[-1].result == "":
|
||||
tool_call_templates[-1].result = msg['content'] or ""
|
||||
|
||||
return tool_call_templates
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to extract tool calls from session {session_id}: {e}")
|
||||
return []
|
||||
|
||||
def create_template_from_session(self, session_id: str,
|
||||
name: Optional[str] = None,
|
||||
description: Optional[str] = None,
|
||||
max_calls: int = 10) -> Optional[SessionTemplate]:
|
||||
"""
|
||||
Create a session template from a completed session.
|
||||
|
||||
Args:
|
||||
session_id: Session ID to create template from
|
||||
name: Template name (auto-generated if None)
|
||||
description: Template description (auto-generated if None)
|
||||
max_calls: Maximum number of tool calls to include
|
||||
|
||||
Returns:
|
||||
SessionTemplate object or None if failed
|
||||
"""
|
||||
# Extract tool calls
|
||||
tool_calls = self.extract_successful_tool_calls(session_id, max_calls)
|
||||
if not tool_calls:
|
||||
logger.warning(f"No successful tool calls found in session {session_id}")
|
||||
return None
|
||||
|
||||
# Classify task type
|
||||
task_type = self.classify_task_type([tc.to_dict() for tc in tool_calls])
|
||||
|
||||
# Generate template ID
|
||||
template_id = f"{task_type.value}_{session_id[:8]}_{int(time.time())}"
|
||||
|
||||
# Auto-generate name and description if not provided
|
||||
if not name:
|
||||
name = f"{task_type.value.title()} Template from {session_id[:8]}"
|
||||
if not description:
|
||||
tool_names = [tc.tool_name for tc in tool_calls]
|
||||
description = f"Template with {len(tool_calls)} successful tool calls: {', '.join(tool_names[:3])}"
|
||||
|
||||
# Create template
|
||||
template = SessionTemplate(
|
||||
template_id=template_id,
|
||||
task_type=task_type,
|
||||
name=name,
|
||||
description=description,
|
||||
tool_calls=tool_calls,
|
||||
source_session_id=session_id,
|
||||
created_at=time.time(),
|
||||
success_rate=1.0, # All extracted calls were successful
|
||||
usage_count=0
|
||||
)
|
||||
|
||||
# Save template
|
||||
self.templates[template_id] = template
|
||||
self._save_template(template)
|
||||
|
||||
logger.info(f"Created template {template_id} from session {session_id}")
|
||||
return template
|
||||
|
||||
def get_template_for_task(self, task_type: TaskType) -> Optional[SessionTemplate]:
|
||||
"""
|
||||
Get the best template for a given task type.
|
||||
|
||||
Args:
|
||||
task_type: Type of task
|
||||
|
||||
Returns:
|
||||
Best matching SessionTemplate or None
|
||||
"""
|
||||
matching_templates = [
|
||||
t for t in self.templates.values()
|
||||
if t.task_type == task_type
|
||||
]
|
||||
|
||||
if not matching_templates:
|
||||
return None
|
||||
|
||||
# Sort by success rate and usage count
|
||||
matching_templates.sort(
|
||||
key=lambda t: (t.success_rate, -t.usage_count),
|
||||
reverse=True
|
||||
)
|
||||
|
||||
return matching_templates[0]
|
||||
|
||||
def inject_template_into_messages(self, template: SessionTemplate,
|
||||
messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Inject a template into a list of messages for a new session.
|
||||
|
||||
Args:
|
||||
template: Template to inject
|
||||
messages: Existing messages list
|
||||
|
||||
Returns:
|
||||
Modified messages list with template injected
|
||||
"""
|
||||
if not template.tool_calls:
|
||||
return messages
|
||||
|
||||
# Create template injection messages
|
||||
template_messages = []
|
||||
|
||||
# Add system message about template
|
||||
template_messages.append({
|
||||
"role": "system",
|
||||
"content": f"Session template loaded: {template.name}\n"
|
||||
f"Task type: {template.task_type.value}\n"
|
||||
f"This template contains {len(template.tool_calls)} successful tool calls "
|
||||
f"to establish a feedback loop early."
|
||||
})
|
||||
|
||||
# Add tool calls and results from template
|
||||
for i, tool_call in enumerate(template.tool_calls):
|
||||
# Add assistant message with tool call
|
||||
template_messages.append({
|
||||
"role": "assistant",
|
||||
"content": None,
|
||||
"tool_calls": [{
|
||||
"id": f"template_{template.template_id}_{i}",
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": tool_call.tool_name,
|
||||
"arguments": json.dumps(tool_call.arguments)
|
||||
}
|
||||
}]
|
||||
})
|
||||
|
||||
# Add tool response
|
||||
template_messages.append({
|
||||
"role": "tool",
|
||||
"tool_call_id": f"template_{template.template_id}_{i}",
|
||||
"content": tool_call.result
|
||||
})
|
||||
|
||||
# Insert template messages at the beginning (after any existing system messages)
|
||||
insert_index = 0
|
||||
for i, msg in enumerate(messages):
|
||||
if msg.get("role") != "system":
|
||||
break
|
||||
insert_index = i + 1
|
||||
|
||||
# Insert template messages
|
||||
for i, msg in enumerate(template_messages):
|
||||
messages.insert(insert_index + i, msg)
|
||||
|
||||
# Update template usage count
|
||||
template.usage_count += 1
|
||||
self._save_template(template)
|
||||
|
||||
return messages
|
||||
|
||||
def list_templates(self, task_type: Optional[TaskType] = None) -> List[SessionTemplate]:
|
||||
"""
|
||||
List all templates, optionally filtered by task type.
|
||||
|
||||
Args:
|
||||
task_type: Optional task type filter
|
||||
|
||||
Returns:
|
||||
List of SessionTemplate objects
|
||||
"""
|
||||
templates = list(self.templates.values())
|
||||
|
||||
if task_type:
|
||||
templates = [t for t in templates if t.task_type == task_type]
|
||||
|
||||
# Sort by creation time (newest first)
|
||||
templates.sort(key=lambda t: t.created_at, reverse=True)
|
||||
|
||||
return templates
|
||||
|
||||
def delete_template(self, template_id: str) -> bool:
|
||||
"""
|
||||
Delete a template.
|
||||
|
||||
Args:
|
||||
template_id: ID of template to delete
|
||||
|
||||
Returns:
|
||||
True if deleted, False if not found
|
||||
"""
|
||||
if template_id not in self.templates:
|
||||
return False
|
||||
|
||||
# Remove from memory
|
||||
del self.templates[template_id]
|
||||
|
||||
# Remove from disk
|
||||
template_file = self.template_dir / f"{template_id}.json"
|
||||
if template_file.exists():
|
||||
template_file.unlink()
|
||||
|
||||
logger.info(f"Deleted template {template_id}")
|
||||
return True
|
||||
|
||||
|
||||
# CLI interface for template management
|
||||
def main():
|
||||
"""CLI interface for session template management."""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Session Template Manager")
|
||||
subparsers = parser.add_subparsers(dest="command", help="Command to execute")
|
||||
|
||||
# Create template from session
|
||||
create_parser = subparsers.add_parser("create", help="Create template from session")
|
||||
create_parser.add_argument("session_id", help="Session ID to create template from")
|
||||
create_parser.add_argument("--name", help="Template name")
|
||||
create_parser.add_argument("--description", help="Template description")
|
||||
create_parser.add_argument("--max-calls", type=int, default=10, help="Max tool calls to extract")
|
||||
|
||||
# List templates
|
||||
list_parser = subparsers.add_parser("list", help="List templates")
|
||||
list_parser.add_argument("--type", choices=["code", "file", "research", "mixed"],
|
||||
help="Filter by task type")
|
||||
|
||||
# Delete template
|
||||
delete_parser = subparsers.add_parser("delete", help="Delete template")
|
||||
delete_parser.add_argument("template_id", help="Template ID to delete")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Set up logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
# Create template manager
|
||||
manager = SessionTemplateManager()
|
||||
|
||||
if args.command == "create":
|
||||
template = manager.create_template_from_session(
|
||||
args.session_id,
|
||||
name=args.name,
|
||||
description=args.description,
|
||||
max_calls=args.max_calls
|
||||
)
|
||||
if template:
|
||||
print(f"Created template: {template.template_id}")
|
||||
print(f" Name: {template.name}")
|
||||
print(f" Type: {template.task_type.value}")
|
||||
print(f" Tool calls: {len(template.tool_calls)}")
|
||||
else:
|
||||
print("Failed to create template")
|
||||
|
||||
elif args.command == "list":
|
||||
task_type = TaskType(args.type) if args.type else None
|
||||
templates = manager.list_templates(task_type)
|
||||
|
||||
if not templates:
|
||||
print("No templates found")
|
||||
return
|
||||
|
||||
print(f"Found {len(templates)} templates:")
|
||||
for t in templates:
|
||||
print(f" {t.template_id}: {t.name}")
|
||||
print(f" Type: {t.task_type.value}")
|
||||
print(f" Tool calls: {len(t.tool_calls)}")
|
||||
print(f" Usage: {t.usage_count}")
|
||||
print(f" Created: {datetime.fromtimestamp(t.created_at).isoformat()}")
|
||||
print()
|
||||
|
||||
elif args.command == "delete":
|
||||
if manager.delete_template(args.template_id):
|
||||
print(f"Deleted template: {args.template_id}")
|
||||
else:
|
||||
print(f"Template not found: {args.template_id}")
|
||||
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user