Compare commits

..

2 Commits

Author SHA1 Message Date
a16fcac72c feat(cli): Add session quality analysis commands
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m7s
Part of #327. Adds `hermes quality` command for session quality analysis.
2026-04-14 01:52:26 +00:00
b87b4c6d96 feat(research): Add session quality analyzer
Part of #327. Analyzes session quality metrics to identify patterns and issues for improvement.
2026-04-14 01:51:47 +00:00
4 changed files with 569 additions and 667 deletions

View File

@@ -1,139 +0,0 @@
# Warm Session Provisioning
**Issue:** #327
## Overview
Warm session provisioning allows creating pre-contextualized agent sessions that start with established patterns and context, reducing initial errors and improving session quality.
## Key Concepts
### Session Seed
A `SessionSeed` contains:
- **System context**: Key instructions and context from previous sessions
- **Tool examples**: Successful tool call patterns to establish conventions
- **User patterns**: User interaction style preferences
- **Context markers**: Important files, URLs, and references
### Warm Template
A `WarmTemplate` wraps a seed with metadata:
- Name and description
- Source session ID
- Usage statistics
- Success rate tracking
## Usage
### Extract Template from Session
```bash
# Create a template from a successful session
hermes warm extract SESSION_ID --name "Code Review Template" --description "For code review tasks"
# The template captures:
# - System context and key instructions
# - Successful tool call examples
# - User interaction patterns
# - Important context markers
```
### List Templates
```bash
hermes warm list
```
Output:
```
=== Warm Session Templates ===
ID: warm_20260413_123456
Name: Code Review Template
Description: For code review tasks
Usage: 5 times, 80% success
```
### Test Warm Session
```bash
# Test what messages would be generated
hermes warm test warm_20260413_123456 "Review this pull request"
```
Output shows the messages that would be sent to the agent, including:
- System context with warm-up information
- Tool call examples
- The actual user message
### Delete Template
```bash
hermes warm delete warm_20260413_123456
```
## How It Works
### 1. Extraction Phase
When you extract a template:
1. System messages provide base context
2. First 10 user messages establish patterns
3. Successful tool calls become examples
4. File paths and URLs become context markers
### 2. Bootstrap Phase
When creating a warm session:
1. System context is injected as initial message
2. Tool examples establish successful patterns
3. User message follows the warm-up context
4. Agent starts with established conventions
## Example Workflow
```bash
# 1. Have a successful session
# ... work with the agent on a complex task ...
# 2. Extract template from that session
hermes warm extract abc123 --name "API Integration" --description "REST API work"
# 3. Later, start a new session with warm context
# The agent will have context about:
# - Your coding style
# - Successful tool patterns
# - Common file paths
# - Previous instructions
```
## Benefits
1. **Reduced Initial Errors**: Agent starts with proven patterns
2. **Consistent Behavior**: Established conventions carry over
3. **Faster Context**: No need to re-explain preferences
4. **Quality Tracking**: Success rate shows template effectiveness
## Implementation Details
### Files
- `tools/warm_session.py`: Core implementation
- `~/.hermes/warm_templates/`: Template storage
### Data Flow
```
Session -> SessionExtractor -> SessionSeed -> WarmTemplate
WarmSessionBootstrapper -> Messages -> Agent
```
## Research Context
This implementation addresses Finding #4 from the empirical audit:
- Marathon sessions show different error patterns
- Context establishment affects session quality
- Pre-seeding can improve initial session reliability
## Future Enhancements
1. **Automatic Template Creation**: Create templates from high-performing sessions
2. **Template Sharing**: Export/import templates between installations
3. **A/B Testing**: Compare warm vs cold session performance
4. **Smart Selection**: Automatically choose best template for task type

View File

@@ -5259,33 +5259,26 @@ For more help on a command:
sessions_parser.set_defaults(func=cmd_sessions)
# Warm session command
warm_parser = subparsers.add_parser(
"warm",
help="Warm session provisioning",
description="Create pre-contextualized sessions from templates"
# Session quality analyzer command
quality_parser = subparsers.add_parser(
"quality",
help="Session quality analysis",
description="Analyze session quality metrics and identify issues"
)
warm_subparsers = warm_parser.add_subparsers(dest="warm_command")
quality_subparsers = quality_parser.add_subparsers(dest="quality_command")
# Extract command
warm_extract = warm_subparsers.add_parser("extract", help="Extract template from session")
warm_extract.add_argument("session_id", help="Session ID to extract from")
warm_extract.add_argument("--name", "-n", required=True, help="Template name")
warm_extract.add_argument("--description", "-d", default="", help="Template description")
# Analyze single session
quality_analyze = quality_subparsers.add_parser("analyze", help="Analyze a single session")
quality_analyze.add_argument("session_id", help="Session ID to analyze")
# List command
warm_subparsers.add_parser("list", help="List available templates")
# Analyze multiple sessions
quality_batch = quality_subparsers.add_parser("batch", help="Analyze multiple sessions")
quality_batch.add_argument("session_ids", nargs="+", help="Session IDs to analyze")
# Test command
warm_test = warm_subparsers.add_parser("test", help="Test warm session creation")
warm_test.add_argument("template_id", help="Template ID")
warm_test.add_argument("message", help="Test message")
# List sessions with quality issues
quality_subparsers.add_parser("list-issues", help="List sessions with quality issues")
# Delete command
warm_delete = warm_subparsers.add_parser("delete", help="Delete a template")
warm_delete.add_argument("template_id", help="Template ID to delete")
warm_parser.set_defaults(func=cmd_warm)
quality_parser.set_defaults(func=cmd_quality)
# =========================================================================
@@ -5630,40 +5623,35 @@ if __name__ == "__main__":
main()
def cmd_warm(args):
"""Handle warm session commands."""
def cmd_quality(args):
"""Handle session quality analysis commands."""
from hermes_cli.colors import Colors, color
subcmd = getattr(args, 'warm_command', None)
subcmd = getattr(args, 'quality_command', None)
if subcmd is None:
print(color("Warm Session Provisioning", Colors.CYAN))
print(color("Session Quality Analysis", Colors.CYAN))
print("\nCommands:")
print(" hermes warm extract SESSION_ID --name NAME - Extract template from session")
print(" hermes warm list - List available templates")
print(" hermes warm test TEMPLATE_ID MESSAGE - Test warm session")
print(" hermes warm delete TEMPLATE_ID - Delete a template")
print(" hermes quality analyze SESSION_ID - Analyze a single session")
print(" hermes quality batch SESSION_IDS... - Analyze multiple sessions")
print(" hermes quality list-issues - List sessions with quality issues")
return 0
try:
from tools.warm_session import warm_session_cli
from tools.session_quality_analyzer import quality_analyzer_cli
args_list = []
if subcmd == "extract":
args_list = ["extract", args.session_id, "--name", args.name]
if args.description:
args_list.extend(["--description", args.description])
elif subcmd == "list":
args_list = ["list"]
elif subcmd == "test":
args_list = ["test", args.template_id, args.message]
elif subcmd == "delete":
args_list = ["delete", args.template_id]
if subcmd == "analyze":
args_list = ["analyze", args.session_id]
elif subcmd == "batch":
args_list = ["batch"] + args.session_ids
elif subcmd == "list-issues":
args_list = ["list-issues"]
return warm_session_cli(args_list)
return quality_analyzer_cli(args_list)
except ImportError as e:
print(color(f"Error: Cannot import warm_session module: {e}", Colors.RED))
print(color(f"Error: Cannot import session_quality_analyzer module: {e}", Colors.RED))
return 1
except Exception as e:
print(color(f"Error: {e}", Colors.RED))

View File

@@ -0,0 +1,538 @@
"""
Session Quality Analyzer
Analyzes session quality metrics to identify patterns for improvement.
Addresses research questions from issue #327.
Issue: #327
"""
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict, field
import statistics
logger = logging.getLogger(__name__)
@dataclass
class QualityMetrics:
"""Quality metrics for a session."""
session_id: str
message_count: int = 0
tool_calls: int = 0
successful_tool_calls: int = 0
error_count: int = 0
user_corrections: int = 0
completion_time_seconds: float = 0.0
context_switches: int = 0 # Number of topic changes
avg_response_time: float = 0.0
token_usage: int = 0
@property
def error_rate(self) -> float:
"""Calculate error rate."""
if self.tool_calls == 0:
return 0.0
return self.error_count / self.tool_calls
@property
def success_rate(self) -> float:
"""Calculate success rate."""
if self.tool_calls == 0:
return 0.0
return self.successful_tool_calls / self.tool_calls
@property
def correction_rate(self) -> float:
"""Calculate user correction rate."""
if self.message_count == 0:
return 0.0
return self.user_corrections / self.message_count
@property
def efficiency_score(self) -> float:
"""Calculate efficiency score (0-1)."""
if self.message_count == 0:
return 0.0
# Factors:
# 1. High success rate (weight: 0.4)
# 2. Low error rate (weight: 0.3)
# 3. Low correction rate (weight: 0.2)
# 4. Reasonable message count (weight: 0.1)
success_score = self.success_rate * 0.4
error_score = (1 - self.error_rate) * 0.3
correction_score = (1 - min(1.0, self.correction_rate * 5)) * 0.2 # Scale correction rate
# Message count penalty for very long sessions
msg_score = 0.1
if self.message_count > 100:
msg_score = 0.05
elif self.message_count > 50:
msg_score = 0.08
return success_score + error_score + correction_score + msg_score
def to_dict(self) -> Dict[str, Any]:
return {
"session_id": self.session_id,
"message_count": self.message_count,
"tool_calls": self.tool_calls,
"successful_tool_calls": self.successful_tool_calls,
"error_count": self.error_count,
"user_corrections": self.user_corrections,
"completion_time_seconds": self.completion_time_seconds,
"context_switches": self.context_switches,
"avg_response_time": self.avg_response_time,
"token_usage": self.token_usage,
"error_rate": self.error_rate,
"success_rate": self.success_rate,
"correction_rate": self.correction_rate,
"efficiency_score": self.efficiency_score
}
@dataclass
class QualityIssue:
"""Issue discovered during quality analysis."""
issue_id: str
session_id: str
issue_type: str # "high_error_rate", "frequent_corrections", "context_loss", etc.
severity: str # "low", "medium", "high", "critical"
description: str
evidence: Dict[str, Any] = field(default_factory=dict)
discovered_at: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class SessionAnalysis:
"""Complete analysis of a session."""
session_id: str
metrics: QualityMetrics
issues: List[QualityIssue] = field(default_factory=list)
patterns: List[str] = field(default_factory=list)
recommendations: List[str] = field(default_factory=list)
analyzed_at: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> Dict[str, Any]:
return {
"session_id": self.session_id,
"metrics": self.metrics.to_dict(),
"issues": [i.to_dict() for i in self.issues],
"patterns": self.patterns,
"recommendations": self.recommendations,
"analyzed_at": self.analyzed_at
}
class SessionQualityAnalyzer:
"""Analyze session quality and identify issues."""
def __init__(self, session_db=None):
self.session_db = session_db
def analyze_session(self, session_id: str) -> Optional[SessionAnalysis]:
"""Analyze a single session."""
if not self.session_db:
return None
try:
messages = self.session_db.get_messages(session_id)
if not messages:
return None
# Calculate metrics
metrics = self._calculate_metrics(session_id, messages)
# Identify issues
issues = self._identify_issues(metrics, messages)
# Identify patterns
patterns = self._identify_patterns(messages)
# Generate recommendations
recommendations = self._generate_recommendations(metrics, issues, patterns)
return SessionAnalysis(
session_id=session_id,
metrics=metrics,
issues=issues,
patterns=patterns,
recommendations=recommendations
)
except Exception as e:
logger.error(f"Failed to analyze session: {e}")
return None
def _calculate_metrics(self, session_id: str, messages: List[Dict]) -> QualityMetrics:
"""Calculate quality metrics for a session."""
tool_calls = 0
successful_tool_calls = 0
error_count = 0
user_corrections = 0
for i, msg in enumerate(messages):
# Count tool calls
if msg.get("role") == "assistant" and msg.get("tool_calls"):
tool_calls += len(msg["tool_calls"])
# Check tool results
if msg.get("role") == "tool":
content = msg.get("content", "").lower()
if "error" in content or "failed" in content:
error_count += 1
else:
successful_tool_calls += 1
# Count user corrections (user message after error)
if (msg.get("role") == "user" and i > 0 and
messages[i-1].get("role") == "tool" and
("error" in messages[i-1].get("content", "").lower() or
"failed" in messages[i-1].get("content", "").lower())):
user_corrections += 1
return QualityMetrics(
session_id=session_id,
message_count=len(messages),
tool_calls=tool_calls,
successful_tool_calls=successful_tool_calls,
error_count=error_count,
user_corrections=user_corrections
)
def _identify_issues(self, metrics: QualityMetrics, messages: List[Dict]) -> List[QualityIssue]:
"""Identify quality issues."""
issues = []
# High error rate
if metrics.error_rate > 0.2: # >20% errors
issues.append(QualityIssue(
issue_id=f"high_error_{metrics.session_id}",
session_id=metrics.session_id,
issue_type="high_error_rate",
severity="high" if metrics.error_rate > 0.3 else "medium",
description=f"High error rate: {metrics.error_rate:.1%}",
evidence={"error_rate": metrics.error_rate, "error_count": metrics.error_count}
))
# Frequent corrections
if metrics.correction_rate > 0.1: # >10% corrections
issues.append(QualityIssue(
issue_id=f"frequent_corrections_{metrics.session_id}",
session_id=metrics.session_id,
issue_type="frequent_corrections",
severity="medium",
description=f"Frequent user corrections: {metrics.correction_rate:.1%}",
evidence={"correction_rate": metrics.correction_rate, "corrections": metrics.user_corrections}
))
# Context loss (look for repeated questions)
repeated_questions = self._find_repeated_questions(messages)
if repeated_questions > 2:
issues.append(QualityIssue(
issue_id=f"context_loss_{metrics.session_id}",
session_id=metrics.session_id,
issue_type="context_loss",
severity="medium",
description=f"Possible context loss: {repeated_questions} repeated questions",
evidence={"repeated_questions": repeated_questions}
))
# Low efficiency
if metrics.efficiency_score < 0.5:
issues.append(QualityIssue(
issue_id=f"low_efficiency_{metrics.session_id}",
session_id=metrics.session_id,
issue_type="low_efficiency",
severity="low",
description=f"Low efficiency score: {metrics.efficiency_score:.2f}",
evidence={"efficiency_score": metrics.efficiency_score}
))
return issues
def _find_repeated_questions(self, messages: List[Dict]) -> int:
"""Find repeated questions in user messages."""
user_messages = [m.get("content", "").lower() for m in messages if m.get("role") == "user"]
# Simple heuristic: look for similar messages
repeated = 0
seen = set()
for msg in user_messages:
# Normalize message
normalized = " ".join(msg.split()[:10]) # First 10 words
if normalized in seen:
repeated += 1
else:
seen.add(normalized)
return repeated
def _identify_patterns(self, messages: List[Dict]) -> List[str]:
"""Identify patterns in the session."""
patterns = []
# Analyze tool usage
tool_usage = {}
for msg in messages:
if msg.get("role") == "assistant" and msg.get("tool_calls"):
for tc in msg["tool_calls"]:
tool_name = tc.get("function", {}).get("name", "unknown")
tool_usage[tool_name] = tool_usage.get(tool_name, 0) + 1
if tool_usage:
most_used = max(tool_usage.items(), key=lambda x: x[1])
patterns.append(f"Most used tool: {most_used[0]} ({most_used[1]} calls)")
# Analyze user message patterns
user_messages = [m.get("content", "") for m in messages if m.get("role") == "user"]
if user_messages:
avg_length = sum(len(m) for m in user_messages) / len(user_messages)
patterns.append(f"Average user message length: {avg_length:.0f} characters")
questions = sum(1 for m in user_messages if "?" in m)
patterns.append(f"Question ratio: {questions/len(user_messages):.0%}")
return patterns
def _generate_recommendations(
self,
metrics: QualityMetrics,
issues: List[QualityIssue],
patterns: List[str]
) -> List[str]:
"""Generate recommendations for improvement."""
recommendations = []
if metrics.error_rate > 0.2:
recommendations.append("Consider using more specific prompts to reduce errors")
if metrics.correction_rate > 0.1:
recommendations.append("Provide clearer instructions to reduce need for corrections")
if metrics.message_count > 100:
recommendations.append("Consider breaking long sessions into smaller focused sessions")
if metrics.efficiency_score < 0.5:
recommendations.append("Session efficiency is low - consider using templates or warm sessions")
return recommendations
def analyze_multiple_sessions(self, session_ids: List[str]) -> Dict[str, Any]:
"""Analyze multiple sessions and provide aggregate insights."""
analyses = []
for session_id in session_ids:
analysis = self.analyze_session(session_id)
if analysis:
analyses.append(analysis)
if not analyses:
return {"error": "No sessions analyzed"}
# Aggregate metrics
all_metrics = [a.metrics for a in analyses]
avg_error_rate = statistics.mean([m.error_rate for m in all_metrics])
avg_success_rate = statistics.mean([m.success_rate for m in all_metrics])
avg_efficiency = statistics.mean([m.efficiency_score for m in all_metrics])
# Collect all issues
all_issues = []
for a in analyses:
all_issues.extend(a.issues)
# Group issues by type
issues_by_type = {}
for issue in all_issues:
if issue.issue_type not in issues_by_type:
issues_by_type[issue.issue_type] = []
issues_by_type[issue.issue_type].append(issue)
return {
"sessions_analyzed": len(analyses),
"aggregate_metrics": {
"avg_error_rate": avg_error_rate,
"avg_success_rate": avg_success_rate,
"avg_efficiency": avg_efficiency
},
"issues_summary": {
issue_type: len(issues)
for issue_type, issues in issues_by_type.items()
},
"top_issues": [
{
"type": issue.issue_type,
"severity": issue.severity,
"description": issue.description,
"session_id": issue.session_id
}
for issue in sorted(all_issues, key=lambda x: x.severity == "critical", reverse=True)[:5]
],
"recommendations": self._generate_aggregate_recommendations(analyses)
}
def _generate_aggregate_recommendations(self, analyses: List[SessionAnalysis]) -> List[str]:
"""Generate recommendations based on aggregate analysis."""
recommendations = []
# Check for common issues
high_error_sessions = [a for a in analyses if a.metrics.error_rate > 0.2]
if len(high_error_sessions) > len(analyses) * 0.3: # >30% sessions have high errors
recommendations.append("Systematic issue: Many sessions have high error rates. Consider improving tool documentation or prompts.")
# Check for efficiency
low_efficiency = [a for a in analyses if a.metrics.efficiency_score < 0.5]
if len(low_efficiency) > len(analyses) * 0.5: # >50% sessions have low efficiency
recommendations.append("Consider implementing warm session provisioning to improve session efficiency.")
return recommendations
# CLI Interface
def quality_analyzer_cli(args: List[str]) -> int:
"""CLI interface for session quality analysis."""
import argparse
parser = argparse.ArgumentParser(description="Session quality analyzer")
subparsers = parser.add_subparsers(dest="command")
# Analyze single session
analyze_parser = subparsers.add_parser("analyze", help="Analyze a single session")
analyze_parser.add_argument("session_id", help="Session ID to analyze")
# Analyze multiple sessions
batch_parser = subparsers.add_parser("batch", help="Analyze multiple sessions")
batch_parser.add_argument("session_ids", nargs="+", help="Session IDs to analyze")
# List sessions with quality issues
subparsers.add_parser("list-issues", help="List sessions with quality issues")
parsed = parser.parse_args(args)
if not parsed.command:
parser.print_help()
return 1
try:
from hermes_state import SessionDB
session_db = SessionDB()
except ImportError:
print("Error: Cannot import SessionDB")
return 1
analyzer = SessionQualityAnalyzer(session_db)
if parsed.command == "analyze":
analysis = analyzer.analyze_session(parsed.session_id)
if not analysis:
print(f"Failed to analyze session {parsed.session_id}")
return 1
print(f"\n=== Session Quality Analysis: {parsed.session_id ===\n")
print(f"Messages: {analysis.metrics.message_count}")
print(f"Tool calls: {analysis.metrics.tool_calls}")
print(f"Error rate: {analysis.metrics.error_rate:.1%}")
print(f"Success rate: {analysis.metrics.success_rate:.1%}")
print(f"Efficiency score: {analysis.metrics.efficiency_score:.2f}")
if analysis.issues:
print(f"\nIssues found: {len(analysis.issues)}")
for issue in analysis.issues:
print(f" [{issue.severity.upper()}] {issue.description}")
if analysis.patterns:
print("\nPatterns:")
for pattern in analysis.patterns:
print(f" {pattern}")
if analysis.recommendations:
print("\nRecommendations:")
for rec in analysis.recommendations:
print(f" {rec}")
return 0
elif parsed.command == "batch":
result = analyzer.analyze_multiple_sessions(parsed.session_ids)
if "error" in result:
print(f"Error: {result['error']}")
return 1
print(f"\n=== Batch Analysis: {result['sessions_analyzed']} sessions ===\n")
agg = result.get("aggregate_metrics", {})
print(f"Average error rate: {agg.get('avg_error_rate', 0):.1%}")
print(f"Average success rate: {agg.get('avg_success_rate', 0):.1%}")
print(f"Average efficiency: {agg.get('avg_efficiency', 0):.2f}")
issues = result.get("issues_summary", {})
if issues:
print("\nIssues summary:")
for issue_type, count in issues.items():
print(f" {issue_type}: {count}")
top_issues = result.get("top_issues", [])
if top_issues:
print("\nTop issues:")
for issue in top_issues:
print(f" [{issue['severity'].upper()}] {issue['description']} (Session: {issue['session_id']})")
recommendations = result.get("recommendations", [])
if recommendations:
print("\nRecommendations:")
for rec in recommendations:
print(f" {rec}")
return 0
elif parsed.command == "list-issues":
# Get recent sessions
try:
sessions = session_db.get_messages.__self__.execute_write(
"SELECT id FROM sessions ORDER BY started_at DESC LIMIT 20"
)
if not sessions:
print("No sessions found")
return 0
session_ids = [s[0] for s in sessions]
result = analyzer.analyze_multiple_sessions(session_ids)
if "error" in result:
print(f"Error: {result['error']}")
return 1
print(f"\n=== Sessions with Quality Issues (last 20 sessions) ===\n")
for issue in result.get("top_issues", []):
print(f"Session: {issue['session_id']}")
print(f" [{issue['severity'].upper()}] {issue['description']}")
print()
except Exception as e:
print(f"Error: {e}")
return 1
return 0
return 1
if __name__ == "__main__":
import sys
sys.exit(quality_analyzer_cli(sys.argv[1:]))

View File

@@ -1,485 +0,0 @@
"""
Warm Session Provisioning: Practical Implementation
Provides mechanisms to create pre-contextualized sessions that start
with established patterns and context, reducing initial errors.
Issue: #327
"""
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, asdict, field
logger = logging.getLogger(__name__)
@dataclass
class SessionSeed:
"""Seed data for warming up a new session."""
system_context: str = ""
tool_examples: List[Dict[str, Any]] = field(default_factory=list)
user_patterns: Dict[str, Any] = field(default_factory=dict)
context_markers: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'SessionSeed':
return cls(**data)
@dataclass
class WarmTemplate:
"""Template for creating warm sessions."""
template_id: str
name: str
description: str
seed: SessionSeed
created_at: str
source_session_id: Optional[str] = None
usage_count: int = 0
success_rate: float = 0.0
def to_dict(self) -> Dict[str, Any]:
return {
"template_id": self.template_id,
"name": self.name,
"description": self.description,
"seed": self.seed.to_dict(),
"created_at": self.created_at,
"source_session_id": self.source_session_id,
"usage_count": self.usage_count,
"success_rate": self.success_rate
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'WarmTemplate':
seed = SessionSeed.from_dict(data.get("seed", {}))
return cls(
template_id=data["template_id"],
name=data["name"],
description=data["description"],
seed=seed,
created_at=data.get("created_at", datetime.now().isoformat()),
source_session_id=data.get("source_session_id"),
usage_count=data.get("usage_count", 0),
success_rate=data.get("success_rate", 0.0)
)
class SessionExtractor:
"""Extract seed data from existing sessions."""
def __init__(self, session_db=None):
self.session_db = session_db
def extract_seed(self, session_id: str) -> Optional[SessionSeed]:
"""Extract seed data from a session."""
if not self.session_db:
return None
try:
messages = self.session_db.get_messages(session_id)
if not messages:
return None
# Extract system context
system_context = self._extract_system_context(messages)
# Extract successful tool examples
tool_examples = self._extract_tool_examples(messages)
# Extract user patterns
user_patterns = self._extract_user_patterns(messages)
# Extract context markers
context_markers = self._extract_context_markers(messages)
return SessionSeed(
system_context=system_context,
tool_examples=tool_examples,
user_patterns=user_patterns,
context_markers=context_markers
)
except Exception as e:
logger.error(f"Failed to extract seed: {e}")
return None
def _extract_system_context(self, messages: List[Dict]) -> str:
"""Extract useful system context from messages."""
context_parts = []
# Look for system messages
for msg in messages:
if msg.get("role") == "system":
content = msg.get("content", "")
# Take first 500 chars of system context
if content:
context_parts.append(content[:500])
break
# Extract key user instructions
user_instructions = []
for msg in messages[:10]: # First 10 messages
if msg.get("role") == "user":
content = msg.get("content", "")
if len(content) > 50 and "?" not in content[:20]: # Likely instructions
user_instructions.append(content[:200])
if len(user_instructions) >= 3:
break
if user_instructions:
context_parts.append("\nKey instructions from session:\n" + "\n".join(f"- {i}" for i in user_instructions))
return "\n".join(context_parts)[:1000]
def _extract_tool_examples(self, messages: List[Dict]) -> List[Dict[str, Any]]:
"""Extract successful tool call examples."""
examples = []
for i, msg in enumerate(messages):
if msg.get("role") == "assistant" and msg.get("tool_calls"):
# Check if there's a successful result
for j in range(i + 1, min(i + 3, len(messages))):
if messages[j].get("role") == "tool":
content = messages[j].get("content", "")
# Check for success indicators
if content and "error" not in content.lower()[:100]:
for tool_call in msg["tool_calls"]:
func = tool_call.get("function", {})
examples.append({
"tool": func.get("name"),
"arguments": func.get("arguments", "{}"),
"result_preview": content[:200]
})
if len(examples) >= 5:
break
break
if len(examples) >= 5:
break
return examples
def _extract_user_patterns(self, messages: List[Dict]) -> Dict[str, Any]:
"""Extract user interaction patterns."""
user_messages = [m for m in messages if m.get("role") == "user"]
if not user_messages:
return {}
# Calculate patterns
lengths = [len(m.get("content", "")) for m in user_messages]
avg_length = sum(lengths) / len(lengths)
# Count question types
questions = sum(1 for m in user_messages if "?" in m.get("content", ""))
commands = sum(1 for m in user_messages if m.get("content", "").startswith(("/", "!")))
return {
"message_count": len(user_messages),
"avg_length": avg_length,
"question_ratio": questions / len(user_messages),
"command_ratio": commands / len(user_messages),
"preferred_style": "command" if commands > questions else "conversational"
}
def _extract_context_markers(self, messages: List[Dict]) -> List[str]:
"""Extract important context markers."""
markers = set()
for msg in messages:
content = msg.get("content", "")
# File paths
import re
paths = re.findall(r'[\w/\.]+\.[\w]+', content)
markers.update(p for p in paths if len(p) < 50)
# URLs
urls = re.findall(r'https?://[^\s]+', content)
markers.update(u[:80] for u in urls[:3])
if len(markers) > 20:
break
return list(markers)[:20]
class WarmSessionManager:
"""Manage warm session templates."""
def __init__(self, template_dir: Path = None):
self.template_dir = template_dir or Path.home() / ".hermes" / "warm_templates"
self.template_dir.mkdir(parents=True, exist_ok=True)
def save_template(self, template: WarmTemplate) -> Path:
"""Save a warm template."""
path = self.template_dir / f"{template.template_id}.json"
with open(path, 'w') as f:
json.dump(template.to_dict(), f, indent=2)
return path
def load_template(self, template_id: str) -> Optional[WarmTemplate]:
"""Load a warm template."""
path = self.template_dir / f"{template_id}.json"
if not path.exists():
return None
try:
with open(path, 'r') as f:
data = json.load(f)
return WarmTemplate.from_dict(data)
except Exception as e:
logger.error(f"Failed to load template: {e}")
return None
def list_templates(self) -> List[Dict[str, Any]]:
"""List all templates."""
templates = []
for path in self.template_dir.glob("*.json"):
try:
with open(path, 'r') as f:
data = json.load(f)
templates.append({
"template_id": data.get("template_id"),
"name": data.get("name"),
"description": data.get("description"),
"usage_count": data.get("usage_count", 0),
"success_rate": data.get("success_rate", 0.0)
})
except:
pass
return templates
def delete_template(self, template_id: str) -> bool:
"""Delete a template."""
path = self.template_dir / f"{template_id}.json"
if path.exists():
path.unlink()
return True
return False
class WarmSessionBootstrapper:
"""Bootstrap warm sessions from templates."""
def __init__(self, manager: WarmSessionManager = None):
self.manager = manager or WarmSessionManager()
def prepare_messages(
self,
template: WarmTemplate,
user_message: str,
include_examples: bool = True
) -> List[Dict[str, Any]]:
"""Prepare messages for a warm session."""
messages = []
# Add warm context as system message
warm_context = self._build_warm_context(template.seed)
if warm_context:
messages.append({
"role": "system",
"content": warm_context
})
# Add tool examples if requested
if include_examples and template.seed.tool_examples:
example_messages = self._create_example_messages(template.seed.tool_examples)
messages.extend(example_messages)
# Add the actual user message
messages.append({
"role": "user",
"content": user_message
})
return messages
def _build_warm_context(self, seed: SessionSeed) -> str:
"""Build warm context from seed."""
parts = []
if seed.system_context:
parts.append(seed.system_context)
if seed.context_markers:
parts.append("\nKnown context: " + ", ".join(seed.context_markers[:10]))
if seed.user_patterns:
style = seed.user_patterns.get("preferred_style", "balanced")
parts.append(f"\nUser prefers {style} interactions.")
return "\n".join(parts)[:1500]
def _create_example_messages(self, examples: List[Dict]) -> List[Dict]:
"""Create example messages from tool examples."""
messages = []
for i, ex in enumerate(examples[:3]): # Limit to 3 examples
# User request
messages.append({
"role": "user",
"content": f"[Example {i+1}] Use {ex['tool']}"
})
# Assistant with tool call
messages.append({
"role": "assistant",
"content": f"I'll use {ex['tool']}.",
"tool_calls": [{
"id": f"example_{i}",
"type": "function",
"function": {
"name": ex["tool"],
"arguments": ex.get("arguments", "{}")
}
}]
})
# Tool result
messages.append({
"role": "tool",
"tool_call_id": f"example_{i}",
"content": ex.get("result_preview", "Success")
})
return messages
# CLI Functions
def warm_session_cli(args: List[str]) -> int:
"""CLI interface for warm session management."""
import argparse
parser = argparse.ArgumentParser(description="Warm session provisioning")
subparsers = parser.add_subparsers(dest="command")
# Extract command
extract_parser = subparsers.add_parser("extract", help="Extract template from session")
extract_parser.add_argument("session_id", help="Session ID to extract from")
extract_parser.add_argument("--name", "-n", required=True, help="Template name")
extract_parser.add_argument("--description", "-d", default="", help="Template description")
# List command
subparsers.add_parser("list", help="List available templates")
# Test command
test_parser = subparsers.add_parser("test", help="Test warm session creation")
test_parser.add_argument("template_id", help="Template ID")
test_parser.add_argument("message", help="Test message")
# Delete command
delete_parser = subparsers.add_parser("delete", help="Delete a template")
delete_parser.add_argument("template_id", help="Template ID to delete")
parsed = parser.parse_args(args)
if not parsed.command:
parser.print_help()
return 1
manager = WarmSessionManager()
if parsed.command == "extract":
try:
from hermes_state import SessionDB
session_db = SessionDB()
except ImportError:
print("Error: Cannot import SessionDB")
return 1
extractor = SessionExtractor(session_db)
seed = extractor.extract_seed(parsed.session_id)
if not seed:
print(f"Failed to extract seed from session {parsed.session_id}")
return 1
template = WarmTemplate(
template_id=f"warm_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
name=parsed.name,
description=parsed.description,
seed=seed,
created_at=datetime.now().isoformat(),
source_session_id=parsed.session_id
)
path = manager.save_template(template)
print(f"Created template: {template.template_id}")
print(f"Saved to: {path}")
print(f"Tool examples: {len(seed.tool_examples)}")
print(f"Context markers: {len(seed.context_markers)}")
return 0
elif parsed.command == "list":
templates = manager.list_templates()
if not templates:
print("No templates found.")
return 0
print("\n=== Warm Session Templates ===\n")
for t in templates:
print(f"ID: {t['template_id']}")
print(f" Name: {t['name']}")
print(f" Description: {t['description']}")
print(f" Usage: {t['usage_count']} times, {t['success_rate']:.0%} success")
print()
return 0
elif parsed.command == "test":
template = manager.load_template(parsed.template_id)
if not template:
print(f"Template {parsed.template_id} not found")
return 1
bootstrapper = WarmSessionBootstrapper(manager)
messages = bootstrapper.prepare_messages(template, parsed.message)
print(f"\n=== Warm Session Test: {template.name} ===\n")
print(f"Generated {len(messages)} messages:\n")
for i, msg in enumerate(messages):
role = msg.get("role", "unknown")
content = msg.get("content", "")
if role == "system":
print(f"[System Context] ({len(content)} chars)")
print(content[:200] + "..." if len(content) > 200 else content)
elif role == "user":
print(f"\n[User]: {content}")
elif role == "assistant":
print(f"[Assistant]: {content}")
if msg.get("tool_calls"):
for tc in msg["tool_calls"]:
func = tc.get("function", {})
print(f" -> {func.get('name')}({func.get('arguments', '{}')[:50]})")
elif role == "tool":
print(f" [Result]: {content[:100]}...")
return 0
elif parsed.command == "delete":
if manager.delete_template(parsed.template_id):
print(f"Deleted template: {parsed.template_id}")
return 0
else:
print(f"Template {parsed.template_id} not found")
return 1
return 1
if __name__ == "__main__":
import sys
sys.exit(warm_session_cli(sys.argv[1:]))