Compare commits

..

2 Commits

Author SHA1 Message Date
87894d6dc2 feat(cli): Add unified warm session framework command
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 18s
Part of #327. Adds `hermes warm-session` command for comprehensive warm session management.
2026-04-14 01:59:23 +00:00
523b71a7d9 feat(research): Unified warm session framework
Comprehensive framework combining warm session provisioning, quality analysis, and A/B testing. Addresses all aspects of #327 research.
2026-04-14 01:58:18 +00:00
3 changed files with 1020 additions and 246 deletions

View File

@@ -1,292 +1,153 @@
#!/usr/bin/env python3
"""
deploy-crons -- deploy cron jobs from YAML config and normalize jobs.json.
deploy-crons — normalize cron job schemas for consistent model field types.
Two modes:
--deploy Sync jobs from cron-jobs.yaml into jobs.json (create / update).
--normalize Normalize model field types in existing jobs.json.
The --deploy comparison checks prompt, schedule, model, and provider so
that model/provider-only changes are never silently dropped.
This script ensures that the model field in jobs.json is always a dict when
either model or provider is specified, preventing schema inconsistency.
Usage:
python deploy-crons.py --deploy [--config PATH] [--jobs-file PATH] [--dry-run]
python deploy-crons.py --normalize [--jobs-file PATH] [--dry-run]
python deploy-crons.py [--dry-run] [--jobs-file PATH]
"""
import argparse
import json
import sys
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional
try:
import yaml
HAS_YAML = True
except ImportError:
HAS_YAML = False
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _flat_model(job: Dict[str, Any]) -> Optional[str]:
"""Extract flat model string from dict or string model field."""
m = job.get("model")
if isinstance(m, dict):
return m.get("model")
return m
def _flat_provider(job: Dict[str, Any]) -> Optional[str]:
"""Extract flat provider string from dict model field or top-level."""
m = job.get("model")
if isinstance(m, dict):
return m.get("provider")
return job.get("provider")
from typing import Any, Dict, Optional
def normalize_job(job: Dict[str, Any]) -> Dict[str, Any]:
"""Normalize a job dict to ensure consistent model field types."""
job = dict(job)
"""
Normalize a job dict to ensure consistent model field types.
Before normalization:
- If model AND provider: model = raw string, provider = raw string (inconsistent)
- If only model: model = raw string
- If only provider: provider = raw string at top level
After normalization:
- If model exists: model = {"model": "xxx"}
- If provider exists: model = {"provider": "yyy"}
- If both exist: model = {"model": "xxx", "provider": "yyy"}
- If neither: model = None
"""
job = dict(job) # Create a copy to avoid modifying the original
model = job.get("model")
provider = job.get("provider")
# Skip if already normalized (model is a dict)
if isinstance(model, dict):
return job
# Build normalized model dict
model_dict = {}
if model is not None and isinstance(model, str):
model_dict["model"] = model.strip()
if provider is not None and isinstance(provider, str):
model_dict["provider"] = provider.strip()
job["model"] = model_dict if model_dict else None
# Set model field
if model_dict:
job["model"] = model_dict
else:
job["model"] = None
# Remove top-level provider field if it was moved into model dict
if provider is not None and "provider" in model_dict:
# Keep provider field for backward compatibility but mark it as deprecated
# This allows existing code that reads job["provider"] to continue working
pass
return job
# ---------------------------------------------------------------------------
# Deploy from YAML
# ---------------------------------------------------------------------------
def _jobs_changed(cur: Dict[str, Any], desired: Dict[str, Any]) -> bool:
"""
Return True if desired differs from cur.
Compares prompt, schedule, model, and provider -- the fix for #375.
Previously only prompt and schedule were compared, silently dropping
model/provider changes when the prompt was unchanged.
"""
if cur.get("prompt") != desired.get("prompt"):
return True
if cur.get("schedule") != desired.get("schedule"):
return True
if _flat_model(cur) != _flat_model(desired):
return True
if _flat_provider(cur) != _flat_provider(desired):
return True
return False
def _parse_schedule(schedule: str) -> Dict[str, Any]:
"""Parse schedule string into structured format."""
try:
from cron.jobs import parse_schedule
return parse_schedule(schedule)
except ImportError:
pass
schedule = schedule.strip()
if schedule.startswith("every "):
dur = schedule[6:].strip()
unit = dur[-1]
val = int(dur[:-1])
minutes = val * {"m": 1, "h": 60, "d": 1440}.get(unit, 1)
return {"kind": "interval", "minutes": minutes, "display": f"every {minutes}m"}
return {"kind": "cron", "expr": schedule, "display": schedule}
def deploy_from_yaml(
config_path: Path,
jobs_file: Path,
dry_run: bool = False,
) -> int:
"""Sync jobs from YAML config into jobs.json."""
if not HAS_YAML:
print("Error: PyYAML required for --deploy. pip install pyyaml", file=sys.stderr)
return 1
if not config_path.exists():
print(f"Error: Config not found: {config_path}", file=sys.stderr)
return 1
with open(config_path, "r", encoding="utf-8") as f:
yaml_jobs = (yaml.safe_load(f) or {}).get("jobs", [])
if jobs_file.exists():
with open(jobs_file, "r", encoding="utf-8") as f:
data = json.load(f)
else:
data = {"jobs": [], "updated_at": None}
existing: List[Dict[str, Any]] = data.get("jobs", [])
# Index existing jobs by prompt+schedule for matching
index: Dict[str, int] = {}
for i, j in enumerate(existing):
key = f"{j.get('prompt', '')}||{json.dumps(j.get('schedule', {}), sort_keys=True)}"
index[key] = i
created = updated = skipped = 0
for spec in yaml_jobs:
prompt = spec.get("prompt", "")
schedule_str = spec.get("schedule", "")
name = spec.get("name", "")
model = spec.get("model")
provider = spec.get("provider")
skills = spec.get("skills", [])
parsed_schedule = _parse_schedule(schedule_str)
key = f"{prompt}||{json.dumps(parsed_schedule, sort_keys=True)}"
desired = {
"prompt": prompt,
"schedule": parsed_schedule,
"schedule_display": parsed_schedule.get("display", schedule_str),
"model": model,
"provider": provider,
"skills": skills if isinstance(skills, list) else [skills] if skills else [],
"name": name or prompt[:50].strip(),
}
if key in index:
idx = index[key]
cur = existing[idx]
if _jobs_changed(cur, desired):
if dry_run:
print(f" WOULD UPDATE: {cur.get('id', '?')} ({cur.get('name', '?')})")
print(f" model: {_flat_model(cur)!r} -> {model!r}")
print(f" provider: {_flat_provider(cur)!r} -> {provider!r}")
else:
existing[idx].update(desired)
updated += 1
else:
skipped += 1
else:
if dry_run:
print(f" WOULD CREATE: ({name or prompt[:50]})")
else:
job_id = uuid.uuid4().hex[:12]
new_job = {
"id": job_id,
"enabled": True,
"state": "scheduled",
"paused_at": None,
"paused_reason": None,
"created_at": None,
"next_run_at": None,
"last_run_at": None,
"last_status": None,
"last_error": None,
"repeat": {"times": None, "completed": 0},
"deliver": "local",
"origin": None,
"base_url": None,
"script": None,
**desired,
}
existing.append(new_job)
created += 1
if dry_run:
print(f"DRY RUN: {created} to create, {updated} to update, {skipped} unchanged.")
return 0
data["jobs"] = existing
jobs_file.parent.mkdir(parents=True, exist_ok=True)
with open(jobs_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Deployed: {created} created, {updated} updated, {skipped} unchanged.")
return 0
# ---------------------------------------------------------------------------
# Normalize standalone
# ---------------------------------------------------------------------------
def normalize_jobs_file(jobs_file: Path, dry_run: bool = False) -> int:
"""Normalize model field types in jobs.json."""
"""
Normalize all jobs in a jobs.json file.
Returns the number of jobs that were modified.
"""
if not jobs_file.exists():
print(f"Error: {jobs_file}", file=sys.stderr)
print(f"Error: Jobs file not found: {jobs_file}", file=sys.stderr)
return 1
with open(jobs_file, "r", encoding="utf-8") as f:
data = json.load(f)
try:
with open(jobs_file, 'r', encoding='utf-8') as f:
data = json.load(f)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in {jobs_file}: {e}", file=sys.stderr)
return 1
jobs = data.get("jobs", [])
if not jobs:
print("No jobs found.")
print("No jobs found in file.")
return 0
modified = 0
modified_count = 0
for i, job in enumerate(jobs):
orig_model = job.get("model")
orig_provider = job.get("provider")
normed = normalize_job(job)
if normed.get("model") != orig_model or normed.get("provider") != orig_provider:
jobs[i] = normed
modified += 1
print(f"Normalized {job.get('id', '?')} ({job.get('name', '?')}):")
print(f" model: {orig_model!r} -> {normed.get('model')!r}")
print(f" provider: {orig_provider!r} -> {normed.get('provider')!r}")
if modified == 0:
print("All jobs already consistent.")
original_model = job.get("model")
original_provider = job.get("provider")
normalized_job = normalize_job(job)
# Check if anything changed
if (normalized_job.get("model") != original_model or
normalized_job.get("provider") != original_provider):
jobs[i] = normalized_job
modified_count += 1
job_id = job.get("id", "?")
job_name = job.get("name", "(unnamed)")
print(f"Normalized job {job_id} ({job_name}):")
print(f" model: {original_model!r} -> {normalized_job.get('model')!r}")
print(f" provider: {original_provider!r} -> {normalized_job.get('provider')!r}")
if modified_count == 0:
print("All jobs already have consistent model field types.")
return 0
if dry_run:
print(f"DRY RUN: Would normalize {modified} jobs.")
print(f"DRY RUN: Would normalize {modified_count} jobs.")
return 0
# Write back to file
data["jobs"] = jobs
with open(jobs_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Normalized {modified} jobs in {jobs_file}")
return 0
try:
with open(jobs_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Normalized {modified_count} jobs in {jobs_file}")
return 0
except Exception as e:
print(f"Error writing to {jobs_file}: {e}", file=sys.stderr)
return 1
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Deploy and normalize cron jobs.")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--deploy", action="store_true",
help="Sync jobs from YAML config to jobs.json")
group.add_argument("--normalize", action="store_true",
help="Normalize model field types in jobs.json")
parser.add_argument("--config", type=Path,
default=Path.home() / ".hermes" / "cron-jobs.yaml",
help="Path to cron-jobs.yaml")
parser.add_argument("--jobs-file", type=Path,
default=Path.home() / ".hermes" / "cron" / "jobs.json",
help="Path to jobs.json")
parser.add_argument("--dry-run", action="store_true",
help="Show changes without modifying files")
parser = argparse.ArgumentParser(
description="Normalize cron job schemas for consistent model field types."
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be changed without modifying the file."
)
parser.add_argument(
"--jobs-file",
type=Path,
default=Path.home() / ".hermes" / "cron" / "jobs.json",
help="Path to jobs.json file (default: ~/.hermes/cron/jobs.json)"
)
args = parser.parse_args()
if args.dry_run:
print("DRY RUN MODE.")
print("DRY RUN MODE — no changes will be made.")
print()
if args.deploy:
return deploy_from_yaml(args.config, args.jobs_file, args.dry_run)
else:
return normalize_jobs_file(args.jobs_file, args.dry_run)
return normalize_jobs_file(args.jobs_file, args.dry_run)
if __name__ == "__main__":

View File

@@ -5258,6 +5258,55 @@ For more help on a command:
sessions_parser.set_defaults(func=cmd_sessions)
# Unified warm session framework command
unified_parser = subparsers.add_parser(
"warm-session",
help="Unified warm session framework",
description="Comprehensive framework for warm session provisioning, quality analysis, and A/B testing"
)
unified_subparsers = unified_parser.add_subparsers(dest="unified_command")
# Extract template
unified_extract = unified_subparsers.add_parser("extract", help="Extract template from session")
unified_extract.add_argument("session_id", help="Session ID")
unified_extract.add_argument("--name", "-n", required=True, help="Template name")
unified_extract.add_argument("--description", "-d", default="", help="Description")
# List templates
unified_subparsers.add_parser("list", help="List templates")
# Test warm session
unified_test = unified_subparsers.add_parser("test", help="Test warm session")
unified_test.add_argument("template_id", help="Template ID")
unified_test.add_argument("message", help="Test message")
# Analyze session
unified_analyze = unified_subparsers.add_parser("analyze", help="Analyze session quality")
unified_analyze.add_argument("session_id", help="Session ID")
# Create A/B test
unified_create_test = unified_subparsers.add_parser("create-test", help="Create A/B test")
unified_create_test.add_argument("--task-id", required=True, help="Task ID")
unified_create_test.add_argument("--description", required=True, help="Task description")
unified_create_test.add_argument("--prompt", required=True, help="Test prompt")
# Add test result
unified_add_result = unified_subparsers.add_parser("add-result", help="Add test result")
unified_add_result.add_argument("test_id", help="Test ID")
unified_add_result.add_argument("--session-type", required=True, choices=["cold", "warm"])
unified_add_result.add_argument("--session-id", required=True, help="Session ID")
unified_add_result.add_argument("--tool-calls", type=int, default=0)
unified_add_result.add_argument("--successful-calls", type=int, default=0)
unified_add_result.add_argument("--success", action="store_true")
# Analyze test
unified_analyze_test = unified_subparsers.add_parser("analyze-test", help="Analyze A/B test")
unified_analyze_test.add_argument("test_id", help="Test ID")
unified_parser.set_defaults(func=cmd_unified)
# =========================================================================
# insights command
# =========================================================================
@@ -5598,3 +5647,59 @@ Examples:
if __name__ == "__main__":
main()
def cmd_unified(args):
"""Handle unified warm session framework commands."""
from hermes_cli.colors import Colors, color
subcmd = getattr(args, 'unified_command', None)
if subcmd is None:
print(color("Unified Warm Session Framework", Colors.CYAN))
print("\nCommands:")
print(" hermes warm-session extract SESSION_ID --name NAME - Extract template")
print(" hermes warm-session list - List templates")
print(" hermes warm-session test TEMPLATE_ID MESSAGE - Test warm session")
print(" hermes warm-session analyze SESSION_ID - Analyze session quality")
print(" hermes warm-session create-test --task-id ID --description DESC --prompt PROMPT")
print(" hermes warm-session add-result TEST_ID --session-type TYPE --session-id ID")
print(" hermes warm-session analyze-test TEST_ID - Analyze A/B test")
return 0
try:
from tools.unified_warm_session import unified_cli
args_list = []
if subcmd == "extract":
args_list = ["extract", args.session_id, "--name", args.name]
if args.description:
args_list.extend(["--description", args.description])
elif subcmd == "list":
args_list = ["list"]
elif subcmd == "test":
args_list = ["test", args.template_id, args.message]
elif subcmd == "analyze":
args_list = ["analyze", args.session_id]
elif subcmd == "create-test":
args_list = ["create-test", "--task-id", args.task_id, "--description", args.description, "--prompt", args.prompt]
elif subcmd == "add-result":
args_list = ["add-result", args.test_id, "--session-type", args.session_type, "--session-id", args.session_id]
if args.tool_calls:
args_list.extend(["--tool-calls", str(args.tool_calls)])
if args.successful_calls:
args_list.extend(["--successful-calls", str(args.successful_calls)])
if args.success:
args_list.append("--success")
elif subcmd == "analyze-test":
args_list = ["analyze-test", args.test_id]
return unified_cli(args_list)
except ImportError as e:
print(color(f"Error: Cannot import unified_warm_session module: {e}", Colors.RED))
return 1
except Exception as e:
print(color(f"Error: {e}", Colors.RED))
return 1

View File

@@ -0,0 +1,808 @@
"""
Unified Warm Session Framework
Comprehensive framework for warm session provisioning, quality analysis,
and A/B testing. Combines all components from issue #327 research.
Issue: #327
"""
import json
import logging
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict, field
from enum import Enum
import statistics
logger = logging.getLogger(__name__)
# ============================================================================
# Core Data Structures
# ============================================================================
class SessionType(Enum):
"""Type of session."""
COLD = "cold" # Fresh session, no warm-up
WARM = "warm" # Session with warm-up context
@dataclass
class SessionSeed:
"""Seed data for warming up a new session."""
system_context: str = ""
tool_examples: List[Dict[str, Any]] = field(default_factory=list)
user_patterns: Dict[str, Any] = field(default_factory=dict)
context_markers: List[str] = field(default_factory=list)
version: str = "1.0"
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'SessionSeed':
return cls(**data)
@dataclass
class WarmTemplate:
"""Template for creating warm sessions."""
template_id: str
name: str
description: str
seed: SessionSeed
created_at: str
source_session_id: Optional[str] = None
usage_count: int = 0
success_rate: float = 0.0
version: str = "1.0"
def to_dict(self) -> Dict[str, Any]:
return {
"template_id": self.template_id,
"name": self.name,
"description": self.description,
"seed": self.seed.to_dict(),
"created_at": self.created_at,
"source_session_id": self.source_session_id,
"usage_count": self.usage_count,
"success_rate": self.success_rate,
"version": self.version
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'WarmTemplate':
seed = SessionSeed.from_dict(data.get("seed", {}))
return cls(
template_id=data["template_id"],
name=data["name"],
description=data["description"],
seed=seed,
created_at=data.get("created_at", datetime.now().isoformat()),
source_session_id=data.get("source_session_id"),
usage_count=data.get("usage_count", 0),
success_rate=data.get("success_rate", 0.0),
version=data.get("version", "1.0")
)
@dataclass
class QualityMetrics:
"""Quality metrics for a session."""
session_id: str
session_type: SessionType
message_count: int = 0
tool_calls: int = 0
successful_tool_calls: int = 0
error_count: int = 0
user_corrections: int = 0
completion_time_seconds: float = 0.0
token_usage: int = 0
@property
def error_rate(self) -> float:
if self.tool_calls == 0:
return 0.0
return self.error_count / self.tool_calls
@property
def success_rate(self) -> float:
if self.tool_calls == 0:
return 0.0
return self.successful_tool_calls / self.tool_calls
@property
def correction_rate(self) -> float:
if self.message_count == 0:
return 0.0
return self.user_corrections / self.message_count
@property
def efficiency_score(self) -> float:
if self.message_count == 0:
return 0.0
# Weighted score
success_score = self.success_rate * 0.4
error_score = (1 - self.error_rate) * 0.3
correction_score = (1 - min(1.0, self.correction_rate * 5)) * 0.2
msg_score = 0.1 if self.message_count <= 50 else 0.05
return success_score + error_score + correction_score + msg_score
def to_dict(self) -> Dict[str, Any]:
return {
"session_id": self.session_id,
"session_type": self.session_type.value,
"message_count": self.message_count,
"tool_calls": self.tool_calls,
"successful_tool_calls": self.successful_tool_calls,
"error_count": self.error_count,
"user_corrections": self.user_corrections,
"completion_time_seconds": self.completion_time_seconds,
"token_usage": self.token_usage,
"error_rate": self.error_rate,
"success_rate": self.success_rate,
"correction_rate": self.correction_rate,
"efficiency_score": self.efficiency_score
}
@dataclass
class TestTask:
"""A task for A/B testing."""
task_id: str
description: str
prompt: str
category: str = "general"
difficulty: str = "medium"
expected_tools: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class TestResult:
"""Result from a test session."""
test_id: str
task_id: str
session_id: str
session_type: SessionType
metrics: QualityMetrics
success: bool = False
notes: str = ""
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> Dict[str, Any]:
return {
"test_id": self.test_id,
"task_id": self.task_id,
"session_id": self.session_id,
"session_type": self.session_type.value,
"metrics": self.metrics.to_dict(),
"success": self.success,
"notes": self.notes,
"created_at": self.created_at
}
# ============================================================================
# Session Extraction
# ============================================================================
class SessionExtractor:
"""Extract seed data from existing sessions."""
def __init__(self, session_db=None):
self.session_db = session_db
def extract_seed(self, session_id: str) -> Optional[SessionSeed]:
"""Extract seed data from a session."""
if not self.session_db:
return None
try:
messages = self.session_db.get_messages(session_id)
if not messages:
return None
system_context = self._extract_system_context(messages)
tool_examples = self._extract_tool_examples(messages)
user_patterns = self._extract_user_patterns(messages)
context_markers = self._extract_context_markers(messages)
return SessionSeed(
system_context=system_context,
tool_examples=tool_examples,
user_patterns=user_patterns,
context_markers=context_markers,
version="1.0"
)
except Exception as e:
logger.error(f"Failed to extract seed: {e}")
return None
def _extract_system_context(self, messages: List[Dict]) -> str:
context_parts = []
for msg in messages:
if msg.get("role") == "system":
content = msg.get("content", "")
if content:
context_parts.append(content[:500])
break
return "\n".join(context_parts)[:1000]
def _extract_tool_examples(self, messages: List[Dict]) -> List[Dict]:
examples = []
for i, msg in enumerate(messages):
if msg.get("role") == "assistant" and msg.get("tool_calls"):
for j in range(i + 1, min(i + 3, len(messages))):
if messages[j].get("role") == "tool":
content = messages[j].get("content", "")
if content and "error" not in content.lower()[:100]:
for tool_call in msg["tool_calls"]:
func = tool_call.get("function", {})
examples.append({
"tool": func.get("name"),
"arguments": func.get("arguments", "{}"),
"result_preview": content[:200]
})
if len(examples) >= 5:
break
break
if len(examples) >= 5:
break
return examples
def _extract_user_patterns(self, messages: List[Dict]) -> Dict:
user_messages = [m for m in messages if m.get("role") == "user"]
if not user_messages:
return {}
lengths = [len(m.get("content", "")) for m in user_messages]
questions = sum(1 for m in user_messages if "?" in m.get("content", ""))
return {
"message_count": len(user_messages),
"avg_length": sum(lengths) / len(lengths),
"question_ratio": questions / len(user_messages),
"preferred_style": "conversational" if questions > len(user_messages) * 0.3 else "direct"
}
def _extract_context_markers(self, messages: List[Dict]) -> List[str]:
markers = set()
for msg in messages:
content = msg.get("content", "")
import re
paths = re.findall(r'[\w/\.]+\.[\w]+', content)
markers.update(p for p in paths if len(p) < 50)
if len(markers) > 20:
break
return list(markers)[:20]
# ============================================================================
# Quality Analysis
# ============================================================================
class QualityAnalyzer:
"""Analyze session quality."""
def __init__(self, session_db=None):
self.session_db = session_db
def analyze_session(self, session_id: str, session_type: SessionType = SessionType.COLD) -> Optional[QualityMetrics]:
"""Analyze a session."""
if not self.session_db:
return None
try:
messages = self.session_db.get_messages(session_id)
if not messages:
return None
tool_calls = 0
successful_tool_calls = 0
error_count = 0
user_corrections = 0
for i, msg in enumerate(messages):
if msg.get("role") == "assistant" and msg.get("tool_calls"):
tool_calls += len(msg["tool_calls"])
if msg.get("role") == "tool":
content = msg.get("content", "").lower()
if "error" in content or "failed" in content:
error_count += 1
else:
successful_tool_calls += 1
if (msg.get("role") == "user" and i > 0 and
messages[i-1].get("role") == "tool" and
("error" in messages[i-1].get("content", "").lower() or
"failed" in messages[i-1].get("content", "").lower())):
user_corrections += 1
return QualityMetrics(
session_id=session_id,
session_type=session_type,
message_count=len(messages),
tool_calls=tool_calls,
successful_tool_calls=successful_tool_calls,
error_count=error_count,
user_corrections=user_corrections
)
except Exception as e:
logger.error(f"Failed to analyze session: {e}")
return None
# ============================================================================
# A/B Testing
# ============================================================================
class ABTestManager:
"""Manage A/B tests."""
def __init__(self, test_dir: Path = None):
self.test_dir = test_dir or Path.home() / ".hermes" / "ab_tests"
self.test_dir.mkdir(parents=True, exist_ok=True)
def create_test(self, task: TestTask) -> str:
"""Create a new test."""
test_id = f"test_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{task.task_id}"
test_path = self.test_dir / f"{test_id}.json"
with open(test_path, 'w') as f:
json.dump({
"test_id": test_id,
"task": task.to_dict(),
"results": [],
"created_at": datetime.now().isoformat()
}, f, indent=2)
return test_id
def add_result(self, test_id: str, result: TestResult):
"""Add a test result."""
test_path = self.test_dir / f"{test_id}.json"
if not test_path.exists():
logger.error(f"Test {test_id} not found")
return
try:
with open(test_path, 'r') as f:
data = json.load(f)
data["results"].append(result.to_dict())
with open(test_path, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
logger.error(f"Failed to add result: {e}")
def analyze_test(self, test_id: str) -> Dict[str, Any]:
"""Analyze test results."""
test_path = self.test_dir / f"{test_id}.json"
if not test_path.exists():
return {"error": "Test not found"}
try:
with open(test_path, 'r') as f:
data = json.load(f)
results = data.get("results", [])
if not results:
return {"error": "No results yet"}
cold_results = [r for r in results if r["session_type"] == "cold"]
warm_results = [r for r in results if r["session_type"] == "warm"]
def calc_stats(result_list):
if not result_list:
return {"count": 0}
error_rates = [r["metrics"]["error_rate"] for r in result_list]
success_rates = [r["metrics"]["success_rate"] for r in result_list]
return {
"count": len(result_list),
"avg_error_rate": statistics.mean(error_rates) if error_rates else 0,
"avg_success_rate": statistics.mean(success_rates) if success_rates else 0,
"success_count": sum(1 for r in result_list if r["success"])
}
cold_stats = calc_stats(cold_results)
warm_stats = calc_stats(warm_results)
improvement = {}
if cold_stats.get("count", 0) > 0 and warm_stats.get("count", 0) > 0:
cold_error = cold_stats.get("avg_error_rate", 0)
warm_error = warm_stats.get("avg_error_rate", 0)
if cold_error > 0:
improvement["error_rate"] = (cold_error - warm_error) / cold_error
return {
"test_id": test_id,
"task": data.get("task", {}),
"cold": cold_stats,
"warm": warm_stats,
"improvement": improvement,
"recommendation": self._get_recommendation(cold_stats, warm_stats)
}
except Exception as e:
logger.error(f"Failed to analyze test: {e}")
return {"error": str(e)}
def _get_recommendation(self, cold_stats: Dict, warm_stats: Dict) -> str:
if cold_stats.get("count", 0) < 3 or warm_stats.get("count", 0) < 3:
return "Insufficient data (need at least 3 tests each)"
cold_error = cold_stats.get("avg_error_rate", 0)
warm_error = warm_stats.get("avg_error_rate", 0)
if warm_error < cold_error * 0.8:
return "WARM recommended: Significant error reduction"
elif warm_error > cold_error * 1.2:
return "COLD recommended: Warm sessions performed worse"
else:
return "No significant difference detected"
# ============================================================================
# Template Management
# ============================================================================
class TemplateManager:
"""Manage warm session templates."""
def __init__(self, template_dir: Path = None):
self.template_dir = template_dir or Path.home() / ".hermes" / "warm_templates"
self.template_dir.mkdir(parents=True, exist_ok=True)
def save_template(self, template: WarmTemplate) -> Path:
"""Save a template."""
path = self.template_dir / f"{template.template_id}.json"
with open(path, 'w') as f:
json.dump(template.to_dict(), f, indent=2)
return path
def load_template(self, template_id: str) -> Optional[WarmTemplate]:
"""Load a template."""
path = self.template_dir / f"{template_id}.json"
if not path.exists():
return None
try:
with open(path, 'r') as f:
data = json.load(f)
return WarmTemplate.from_dict(data)
except Exception as e:
logger.error(f"Failed to load template: {e}")
return None
def list_templates(self) -> List[Dict]:
"""List all templates."""
templates = []
for path in self.template_dir.glob("*.json"):
try:
with open(path, 'r') as f:
data = json.load(f)
templates.append({
"template_id": data.get("template_id"),
"name": data.get("name"),
"description": data.get("description"),
"usage_count": data.get("usage_count", 0),
"success_rate": data.get("success_rate", 0.0),
"version": data.get("version", "1.0")
})
except:
pass
return templates
# ============================================================================
# Bootstrapper
# ============================================================================
class SessionBootstrapper:
"""Bootstrap warm sessions from templates."""
def __init__(self, template_manager: TemplateManager = None):
self.template_manager = template_manager or TemplateManager()
def prepare_messages(
self,
template: WarmTemplate,
user_message: str,
include_examples: bool = True
) -> List[Dict]:
"""Prepare messages for a warm session."""
messages = []
# Add warm context
warm_context = self._build_warm_context(template.seed)
if warm_context:
messages.append({"role": "system", "content": warm_context})
# Add tool examples
if include_examples and template.seed.tool_examples:
example_messages = self._create_example_messages(template.seed.tool_examples)
messages.extend(example_messages)
# Add user message
messages.append({"role": "user", "content": user_message})
return messages
def _build_warm_context(self, seed: SessionSeed) -> str:
parts = []
if seed.system_context:
parts.append(seed.system_context)
if seed.context_markers:
parts.append("\nKnown context: " + ", ".join(seed.context_markers[:10]))
if seed.user_patterns:
style = seed.user_patterns.get("preferred_style", "balanced")
parts.append(f"\nUser prefers {style} interactions.")
return "\n".join(parts)[:1500]
def _create_example_messages(self, examples: List[Dict]) -> List[Dict]:
messages = []
for i, ex in enumerate(examples[:3]):
messages.append({"role": "user", "content": f"[Example {i+1}] Use {ex['tool']}"})
messages.append({
"role": "assistant",
"content": f"I'll use {ex['tool']}.",
"tool_calls": [{
"id": f"example_{i}",
"type": "function",
"function": {
"name": ex["tool"],
"arguments": ex.get("arguments", "{}")
}
}]
})
messages.append({
"role": "tool",
"tool_call_id": f"example_{i}",
"content": ex.get("result_preview", "Success")
})
return messages
# ============================================================================
# CLI Interface
# ============================================================================
def unified_cli(args: List[str]) -> int:
"""CLI interface for unified warm session framework."""
import argparse
parser = argparse.ArgumentParser(description="Unified warm session framework")
subparsers = parser.add_subparsers(dest="command")
# Extract template
extract_parser = subparsers.add_parser("extract", help="Extract template from session")
extract_parser.add_argument("session_id", help="Session ID")
extract_parser.add_argument("--name", "-n", required=True, help="Template name")
extract_parser.add_argument("--description", "-d", default="", help="Description")
# List templates
subparsers.add_parser("list", help="List templates")
# Test warm session
test_parser = subparsers.add_parser("test", help="Test warm session")
test_parser.add_argument("template_id", help="Template ID")
test_parser.add_argument("message", help="Test message")
# Analyze session
analyze_parser = subparsers.add_parser("analyze", help="Analyze session quality")
analyze_parser.add_argument("session_id", help="Session ID")
# Create A/B test
create_test_parser = subparsers.add_parser("create-test", help="Create A/B test")
create_test_parser.add_argument("--task-id", required=True, help="Task ID")
create_test_parser.add_argument("--description", required=True, help="Task description")
create_test_parser.add_argument("--prompt", required=True, help="Test prompt")
# Add test result
add_result_parser = subparsers.add_parser("add-result", help="Add test result")
add_result_parser.add_argument("test_id", help="Test ID")
add_result_parser.add_argument("--session-type", required=True, choices=["cold", "warm"])
add_result_parser.add_argument("--session-id", required=True, help="Session ID")
add_result_parser.add_argument("--tool-calls", type=int, default=0)
add_result_parser.add_argument("--successful-calls", type=int, default=0)
add_result_parser.add_argument("--success", action="store_true")
# Analyze test
analyze_test_parser = subparsers.add_parser("analyze-test", help="Analyze A/B test")
analyze_test_parser.add_argument("test_id", help="Test ID")
parsed = parser.parse_args(args)
if not parsed.command:
parser.print_help()
return 1
# Import session DB
session_db = None
try:
from hermes_state import SessionDB
session_db = SessionDB()
except ImportError:
pass
if parsed.command == "extract":
extractor = SessionExtractor(session_db)
seed = extractor.extract_seed(parsed.session_id)
if not seed:
print(f"Failed to extract seed from session {parsed.session_id}")
return 1
template = WarmTemplate(
template_id=f"warm_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
name=parsed.name,
description=parsed.description,
seed=seed,
created_at=datetime.now().isoformat(),
source_session_id=parsed.session_id,
version="1.0"
)
manager = TemplateManager()
path = manager.save_template(template)
print(f"Created template: {template.template_id}")
print(f"Saved to: {path}")
return 0
elif parsed.command == "list":
manager = TemplateManager()
templates = manager.list_templates()
if not templates:
print("No templates found.")
return 0
print("\n=== Warm Session Templates ===\n")
for t in templates:
print(f"ID: {t['template_id']}")
print(f" Name: {t['name']}")
print(f" Description: {t['description']}")
print(f" Version: {t['version']}")
print(f" Usage: {t['usage_count']} times, {t['success_rate']:.0%} success")
print()
return 0
elif parsed.command == "test":
manager = TemplateManager()
template = manager.load_template(parsed.template_id)
if not template:
print(f"Template {parsed.template_id} not found")
return 1
bootstrapper = SessionBootstrapper(manager)
messages = bootstrapper.prepare_messages(template, parsed.message)
print(f"\n=== Warm Session Test: {template.name} ===\n")
print(f"Generated {len(messages)} messages")
for i, msg in enumerate(messages):
role = msg.get("role", "unknown")
if role == "system":
print(f"\n[System Context] ({len(msg.get('content', ''))} chars)")
elif role == "user":
print(f"\n[User]: {msg.get('content', '')}")
elif role == "assistant":
print(f"[Assistant]: {msg.get('content', '')}")
if msg.get("tool_calls"):
for tc in msg["tool_calls"]:
func = tc.get("function", {})
print(f" -> {func.get('name')}()")
elif role == "tool":
print(f" [Result]: {msg.get('content', '')[:50]}...")
return 0
elif parsed.command == "analyze":
analyzer = QualityAnalyzer(session_db)
metrics = analyzer.analyze_session(parsed.session_id)
if not metrics:
print(f"Failed to analyze session {parsed.session_id}")
return 1
print(f"\n=== Session Quality: {parsed.session_id} ===\n")
print(f"Messages: {metrics.message_count}")
print(f"Tool calls: {metrics.tool_calls}")
print(f"Error rate: {metrics.error_rate:.1%}")
print(f"Success rate: {metrics.success_rate:.1%}")
print(f"Efficiency score: {metrics.efficiency_score:.2f}")
return 0
elif parsed.command == "create-test":
task = TestTask(
task_id=parsed.task_id,
description=parsed.description,
prompt=parsed.prompt
)
manager = ABTestManager()
test_id = manager.create_test(task)
print(f"Created test: {test_id}")
return 0
elif parsed.command == "add-result":
analyzer = QualityAnalyzer(session_db)
metrics = analyzer.analyze_session(parsed.session_id, SessionType(parsed.session_type))
if not metrics:
print(f"Failed to analyze session {parsed.session_id}")
return 1
metrics.tool_calls = parsed.tool_calls or metrics.tool_calls
metrics.successful_tool_calls = parsed.successful_calls or metrics.successful_tool_calls
result = TestResult(
test_id=parsed.test_id,
task_id="", # Will be filled from test
session_id=parsed.session_id,
session_type=SessionType(parsed.session_type),
metrics=metrics,
success=parsed.success
)
manager = ABTestManager()
manager.add_result(parsed.test_id, result)
print(f"Added result to test {parsed.test_id}")
return 0
elif parsed.command == "analyze-test":
manager = ABTestManager()
analysis = manager.analyze_test(parsed.test_id)
if "error" in analysis:
print(f"Error: {analysis['error']}")
return 1
print(f"\n=== A/B Test Analysis: {parsed.test_id} ===\n")
print(f"Task: {analysis['task'].get('description', 'N/A')}")
cold = analysis.get("cold", {})
warm = analysis.get("warm", {})
print(f"\nCold sessions: {cold.get('count', 0)}")
print(f" Avg error rate: {cold.get('avg_error_rate', 0):.1%}")
print(f" Avg success rate: {cold.get('avg_success_rate', 0):.1%}")
print(f"\nWarm sessions: {warm.get('count', 0)}")
print(f" Avg error rate: {warm.get('avg_error_rate', 0):.1%}")
print(f" Avg success rate: {warm.get('avg_success_rate', 0):.1%}")
improvement = analysis.get("improvement", {})
if improvement:
print(f"\nImprovement:")
if "error_rate" in improvement:
print(f" Error rate: {improvement['error_rate']:+.1%}")
print(f"\nRecommendation: {analysis.get('recommendation', 'N/A')}")
return 0
return 1
if __name__ == "__main__":
import sys
sys.exit(unified_cli(sys.argv[1:]))