Compare commits
2 Commits
triage/327
...
fix/457-ss
| Author | SHA1 | Date | |
|---|---|---|---|
| 66b0febdfb | |||
| 6d79bf7783 |
@@ -182,6 +182,15 @@ _SCRIPT_FAILURE_PHRASES = (
|
||||
"exit status",
|
||||
"non-zero exit",
|
||||
"did not complete",
|
||||
# SSH-specific failure patterns (#350)
|
||||
"no such file or directory",
|
||||
"command not found",
|
||||
"hermes binary not found",
|
||||
"hermes not found",
|
||||
"ssh: connect to host",
|
||||
"connection timed out",
|
||||
"host key verification failed",
|
||||
"no route to host",
|
||||
"could not run",
|
||||
"unable to execute",
|
||||
"permission denied",
|
||||
|
||||
212
cron/ssh_dispatch.py
Normal file
212
cron/ssh_dispatch.py
Normal file
@@ -0,0 +1,212 @@
|
||||
"""
|
||||
SSH dispatch utilities for cron jobs.
|
||||
|
||||
Provides validated remote execution so broken hermes binary paths
|
||||
are caught before draining the dispatch queue.
|
||||
|
||||
Usage:
|
||||
from cron.ssh_dispatch import SSHEnvironment, format_dispatch_report
|
||||
|
||||
ssh = SSHEnvironment(host="root@ezra", agent="allegro")
|
||||
result = ssh.dispatch("cron tick")
|
||||
if not result.success:
|
||||
print(result.failure_reason)
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import shutil
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class DispatchResult:
|
||||
"""Structured result of a remote command dispatch."""
|
||||
host: str
|
||||
command: str
|
||||
success: bool
|
||||
exit_code: Optional[int] = None
|
||||
stdout: str = ""
|
||||
stderr: str = ""
|
||||
failure_reason: Optional[str] = None
|
||||
duration_s: float = 0.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class SSHEnvironment:
|
||||
"""Validates and dispatches commands to a remote host via SSH."""
|
||||
|
||||
host: str # e.g. "root@ezra" or "192.168.1.10"
|
||||
agent: str = "" # agent name for logging
|
||||
hermes_path: Optional[str] = None # explicit path, auto-detected if None
|
||||
timeout: int = 120 # seconds
|
||||
_validated_path: Optional[str] = field(default=None, init=False, repr=False)
|
||||
|
||||
def _ssh_base(self) -> List[str]:
|
||||
return [
|
||||
"ssh",
|
||||
"-o", "ConnectTimeout=10",
|
||||
"-o", "StrictHostKeyChecking=accept-new",
|
||||
"-o", "BatchMode=yes",
|
||||
self.host,
|
||||
]
|
||||
|
||||
def _probe_remote_binary(self, candidate: str) -> bool:
|
||||
"""Check if a hermes binary exists and is executable on the remote host."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
self._ssh_base() + [f"test -x {candidate}"],
|
||||
capture_output=True, timeout=15,
|
||||
)
|
||||
return result.returncode == 0
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError):
|
||||
return False
|
||||
|
||||
def detect_hermes_binary(self) -> Optional[str]:
|
||||
"""Find a working hermes binary on the remote host."""
|
||||
if self._validated_path:
|
||||
return self._validated_path
|
||||
|
||||
candidates = []
|
||||
if self.hermes_path:
|
||||
candidates.append(self.hermes_path)
|
||||
|
||||
# Common locations
|
||||
candidates.extend([
|
||||
"hermes", # on PATH
|
||||
"~/.local/bin/hermes",
|
||||
"/usr/local/bin/hermes",
|
||||
f"~/wizards/{self.agent}/venv/bin/hermes" if self.agent else "",
|
||||
f"/root/wizards/{self.agent}/venv/bin/hermes" if self.agent else "",
|
||||
])
|
||||
candidates = [c for c in candidates if c]
|
||||
|
||||
for candidate in candidates:
|
||||
if self._probe_remote_binary(candidate):
|
||||
self._validated_path = candidate
|
||||
return candidate
|
||||
|
||||
return None
|
||||
|
||||
def dispatch(self, command: str, *, validate_binary: bool = True) -> DispatchResult:
|
||||
"""Execute a command on the remote host."""
|
||||
import time
|
||||
start = time.monotonic()
|
||||
|
||||
if validate_binary:
|
||||
binary = self.detect_hermes_binary()
|
||||
if not binary:
|
||||
return DispatchResult(
|
||||
host=self.host,
|
||||
command=command,
|
||||
success=False,
|
||||
failure_reason=f"No working hermes binary found on {self.host}",
|
||||
duration_s=time.monotonic() - start,
|
||||
)
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
self._ssh_base() + [command],
|
||||
capture_output=True,
|
||||
timeout=self.timeout,
|
||||
)
|
||||
duration = time.monotonic() - start
|
||||
stdout = result.stdout.decode("utf-8", errors="replace")
|
||||
stderr = result.stderr.decode("utf-8", errors="replace")
|
||||
|
||||
failure_reason = None
|
||||
if result.returncode != 0:
|
||||
failure_reason = _classify_ssh_error(stderr, result.returncode)
|
||||
|
||||
return DispatchResult(
|
||||
host=self.host,
|
||||
command=command,
|
||||
success=result.returncode == 0,
|
||||
exit_code=result.returncode,
|
||||
stdout=stdout,
|
||||
stderr=stderr,
|
||||
failure_reason=failure_reason,
|
||||
duration_s=duration,
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
return DispatchResult(
|
||||
host=self.host,
|
||||
command=command,
|
||||
success=False,
|
||||
failure_reason=f"SSH command timed out after {self.timeout}s",
|
||||
duration_s=time.monotonic() - start,
|
||||
)
|
||||
except FileNotFoundError:
|
||||
return DispatchResult(
|
||||
host=self.host,
|
||||
command=command,
|
||||
success=False,
|
||||
failure_reason="ssh binary not found on local system",
|
||||
duration_s=time.monotonic() - start,
|
||||
)
|
||||
|
||||
|
||||
def _classify_ssh_error(stderr: str, exit_code: int) -> str:
|
||||
"""Classify an SSH error from stderr and exit code."""
|
||||
lower = stderr.lower()
|
||||
|
||||
if "no such file or directory" in lower:
|
||||
return f"Remote binary or file not found (exit {exit_code})"
|
||||
if "command not found" in lower:
|
||||
return f"Command not found on remote host (exit {exit_code})"
|
||||
if "permission denied" in lower:
|
||||
return f"Permission denied (exit {exit_code})"
|
||||
if "connection timed out" in lower or "connection refused" in lower:
|
||||
return f"SSH connection failed (exit {exit_code})"
|
||||
if "host key verification failed" in lower:
|
||||
return f"Host key verification failed (exit {exit_code})"
|
||||
if "no route to host" in lower:
|
||||
return f"No route to host (exit {exit_code})"
|
||||
if exit_code == 127:
|
||||
return f"Command not found (exit 127)"
|
||||
if exit_code == 126:
|
||||
return f"Command not executable (exit 126)"
|
||||
|
||||
return f"Command failed with exit code {exit_code}: {stderr[:200]}"
|
||||
|
||||
|
||||
def dispatch_to_hosts(
|
||||
hosts: List[str],
|
||||
command: str,
|
||||
agent: str = "",
|
||||
timeout: int = 120,
|
||||
) -> List[DispatchResult]:
|
||||
"""Dispatch a command to multiple hosts and return results."""
|
||||
results = []
|
||||
for host in hosts:
|
||||
ssh = SSHEnvironment(host=host, agent=agent, timeout=timeout)
|
||||
result = ssh.dispatch(command)
|
||||
results.append(result)
|
||||
return results
|
||||
|
||||
|
||||
def format_dispatch_report(results: List[DispatchResult]) -> str:
|
||||
"""Format a human-readable report of dispatch results."""
|
||||
lines = ["## Dispatch Report", ""]
|
||||
|
||||
succeeded = [r for r in results if r.success]
|
||||
failed = [r for r in results if not r.success]
|
||||
|
||||
lines.append(f"**Total:** {len(results)} hosts | "
|
||||
f"**OK:** {len(succeeded)} | **Failed:** {len(failed)}")
|
||||
lines.append("")
|
||||
|
||||
for r in results:
|
||||
status = "OK" if r.success else "FAIL"
|
||||
lines.append(f"### {r.host} [{status}]")
|
||||
lines.append(f"- Command: `{r.command}`")
|
||||
lines.append(f"- Duration: {r.duration_s:.1f}s")
|
||||
if r.exit_code is not None:
|
||||
lines.append(f"- Exit code: {r.exit_code}")
|
||||
if r.failure_reason:
|
||||
lines.append(f"- **Failure:** {r.failure_reason}")
|
||||
if r.stderr and not r.success:
|
||||
lines.append(f"- Stderr: `{r.stderr[:300]}`")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
@@ -5258,29 +5258,6 @@ For more help on a command:
|
||||
|
||||
sessions_parser.set_defaults(func=cmd_sessions)
|
||||
|
||||
|
||||
# Session quality analyzer command
|
||||
quality_parser = subparsers.add_parser(
|
||||
"quality",
|
||||
help="Session quality analysis",
|
||||
description="Analyze session quality metrics and identify issues"
|
||||
)
|
||||
quality_subparsers = quality_parser.add_subparsers(dest="quality_command")
|
||||
|
||||
# Analyze single session
|
||||
quality_analyze = quality_subparsers.add_parser("analyze", help="Analyze a single session")
|
||||
quality_analyze.add_argument("session_id", help="Session ID to analyze")
|
||||
|
||||
# Analyze multiple sessions
|
||||
quality_batch = quality_subparsers.add_parser("batch", help="Analyze multiple sessions")
|
||||
quality_batch.add_argument("session_ids", nargs="+", help="Session IDs to analyze")
|
||||
|
||||
# List sessions with quality issues
|
||||
quality_subparsers.add_parser("list-issues", help="List sessions with quality issues")
|
||||
|
||||
quality_parser.set_defaults(func=cmd_quality)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# insights command
|
||||
# =========================================================================
|
||||
@@ -5621,39 +5598,3 @@ Examples:
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
|
||||
def cmd_quality(args):
|
||||
"""Handle session quality analysis commands."""
|
||||
from hermes_cli.colors import Colors, color
|
||||
|
||||
subcmd = getattr(args, 'quality_command', None)
|
||||
|
||||
if subcmd is None:
|
||||
print(color("Session Quality Analysis", Colors.CYAN))
|
||||
print("\nCommands:")
|
||||
print(" hermes quality analyze SESSION_ID - Analyze a single session")
|
||||
print(" hermes quality batch SESSION_IDS... - Analyze multiple sessions")
|
||||
print(" hermes quality list-issues - List sessions with quality issues")
|
||||
return 0
|
||||
|
||||
try:
|
||||
from tools.session_quality_analyzer import quality_analyzer_cli
|
||||
|
||||
args_list = []
|
||||
if subcmd == "analyze":
|
||||
args_list = ["analyze", args.session_id]
|
||||
elif subcmd == "batch":
|
||||
args_list = ["batch"] + args.session_ids
|
||||
elif subcmd == "list-issues":
|
||||
args_list = ["list-issues"]
|
||||
|
||||
return quality_analyzer_cli(args_list)
|
||||
|
||||
except ImportError as e:
|
||||
print(color(f"Error: Cannot import session_quality_analyzer module: {e}", Colors.RED))
|
||||
return 1
|
||||
except Exception as e:
|
||||
print(color(f"Error: {e}", Colors.RED))
|
||||
return 1
|
||||
|
||||
|
||||
@@ -1,538 +0,0 @@
|
||||
"""
|
||||
Session Quality Analyzer
|
||||
|
||||
Analyzes session quality metrics to identify patterns for improvement.
|
||||
Addresses research questions from issue #327.
|
||||
|
||||
Issue: #327
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from dataclasses import dataclass, asdict, field
|
||||
import statistics
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class QualityMetrics:
|
||||
"""Quality metrics for a session."""
|
||||
session_id: str
|
||||
message_count: int = 0
|
||||
tool_calls: int = 0
|
||||
successful_tool_calls: int = 0
|
||||
error_count: int = 0
|
||||
user_corrections: int = 0
|
||||
completion_time_seconds: float = 0.0
|
||||
context_switches: int = 0 # Number of topic changes
|
||||
avg_response_time: float = 0.0
|
||||
token_usage: int = 0
|
||||
|
||||
@property
|
||||
def error_rate(self) -> float:
|
||||
"""Calculate error rate."""
|
||||
if self.tool_calls == 0:
|
||||
return 0.0
|
||||
return self.error_count / self.tool_calls
|
||||
|
||||
@property
|
||||
def success_rate(self) -> float:
|
||||
"""Calculate success rate."""
|
||||
if self.tool_calls == 0:
|
||||
return 0.0
|
||||
return self.successful_tool_calls / self.tool_calls
|
||||
|
||||
@property
|
||||
def correction_rate(self) -> float:
|
||||
"""Calculate user correction rate."""
|
||||
if self.message_count == 0:
|
||||
return 0.0
|
||||
return self.user_corrections / self.message_count
|
||||
|
||||
@property
|
||||
def efficiency_score(self) -> float:
|
||||
"""Calculate efficiency score (0-1)."""
|
||||
if self.message_count == 0:
|
||||
return 0.0
|
||||
|
||||
# Factors:
|
||||
# 1. High success rate (weight: 0.4)
|
||||
# 2. Low error rate (weight: 0.3)
|
||||
# 3. Low correction rate (weight: 0.2)
|
||||
# 4. Reasonable message count (weight: 0.1)
|
||||
|
||||
success_score = self.success_rate * 0.4
|
||||
error_score = (1 - self.error_rate) * 0.3
|
||||
correction_score = (1 - min(1.0, self.correction_rate * 5)) * 0.2 # Scale correction rate
|
||||
|
||||
# Message count penalty for very long sessions
|
||||
msg_score = 0.1
|
||||
if self.message_count > 100:
|
||||
msg_score = 0.05
|
||||
elif self.message_count > 50:
|
||||
msg_score = 0.08
|
||||
|
||||
return success_score + error_score + correction_score + msg_score
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"session_id": self.session_id,
|
||||
"message_count": self.message_count,
|
||||
"tool_calls": self.tool_calls,
|
||||
"successful_tool_calls": self.successful_tool_calls,
|
||||
"error_count": self.error_count,
|
||||
"user_corrections": self.user_corrections,
|
||||
"completion_time_seconds": self.completion_time_seconds,
|
||||
"context_switches": self.context_switches,
|
||||
"avg_response_time": self.avg_response_time,
|
||||
"token_usage": self.token_usage,
|
||||
"error_rate": self.error_rate,
|
||||
"success_rate": self.success_rate,
|
||||
"correction_rate": self.correction_rate,
|
||||
"efficiency_score": self.efficiency_score
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class QualityIssue:
|
||||
"""Issue discovered during quality analysis."""
|
||||
issue_id: str
|
||||
session_id: str
|
||||
issue_type: str # "high_error_rate", "frequent_corrections", "context_loss", etc.
|
||||
severity: str # "low", "medium", "high", "critical"
|
||||
description: str
|
||||
evidence: Dict[str, Any] = field(default_factory=dict)
|
||||
discovered_at: str = field(default_factory=lambda: datetime.now().isoformat())
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SessionAnalysis:
|
||||
"""Complete analysis of a session."""
|
||||
session_id: str
|
||||
metrics: QualityMetrics
|
||||
issues: List[QualityIssue] = field(default_factory=list)
|
||||
patterns: List[str] = field(default_factory=list)
|
||||
recommendations: List[str] = field(default_factory=list)
|
||||
analyzed_at: str = field(default_factory=lambda: datetime.now().isoformat())
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"session_id": self.session_id,
|
||||
"metrics": self.metrics.to_dict(),
|
||||
"issues": [i.to_dict() for i in self.issues],
|
||||
"patterns": self.patterns,
|
||||
"recommendations": self.recommendations,
|
||||
"analyzed_at": self.analyzed_at
|
||||
}
|
||||
|
||||
|
||||
class SessionQualityAnalyzer:
|
||||
"""Analyze session quality and identify issues."""
|
||||
|
||||
def __init__(self, session_db=None):
|
||||
self.session_db = session_db
|
||||
|
||||
def analyze_session(self, session_id: str) -> Optional[SessionAnalysis]:
|
||||
"""Analyze a single session."""
|
||||
if not self.session_db:
|
||||
return None
|
||||
|
||||
try:
|
||||
messages = self.session_db.get_messages(session_id)
|
||||
if not messages:
|
||||
return None
|
||||
|
||||
# Calculate metrics
|
||||
metrics = self._calculate_metrics(session_id, messages)
|
||||
|
||||
# Identify issues
|
||||
issues = self._identify_issues(metrics, messages)
|
||||
|
||||
# Identify patterns
|
||||
patterns = self._identify_patterns(messages)
|
||||
|
||||
# Generate recommendations
|
||||
recommendations = self._generate_recommendations(metrics, issues, patterns)
|
||||
|
||||
return SessionAnalysis(
|
||||
session_id=session_id,
|
||||
metrics=metrics,
|
||||
issues=issues,
|
||||
patterns=patterns,
|
||||
recommendations=recommendations
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to analyze session: {e}")
|
||||
return None
|
||||
|
||||
def _calculate_metrics(self, session_id: str, messages: List[Dict]) -> QualityMetrics:
|
||||
"""Calculate quality metrics for a session."""
|
||||
tool_calls = 0
|
||||
successful_tool_calls = 0
|
||||
error_count = 0
|
||||
user_corrections = 0
|
||||
|
||||
for i, msg in enumerate(messages):
|
||||
# Count tool calls
|
||||
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
||||
tool_calls += len(msg["tool_calls"])
|
||||
|
||||
# Check tool results
|
||||
if msg.get("role") == "tool":
|
||||
content = msg.get("content", "").lower()
|
||||
if "error" in content or "failed" in content:
|
||||
error_count += 1
|
||||
else:
|
||||
successful_tool_calls += 1
|
||||
|
||||
# Count user corrections (user message after error)
|
||||
if (msg.get("role") == "user" and i > 0 and
|
||||
messages[i-1].get("role") == "tool" and
|
||||
("error" in messages[i-1].get("content", "").lower() or
|
||||
"failed" in messages[i-1].get("content", "").lower())):
|
||||
user_corrections += 1
|
||||
|
||||
return QualityMetrics(
|
||||
session_id=session_id,
|
||||
message_count=len(messages),
|
||||
tool_calls=tool_calls,
|
||||
successful_tool_calls=successful_tool_calls,
|
||||
error_count=error_count,
|
||||
user_corrections=user_corrections
|
||||
)
|
||||
|
||||
def _identify_issues(self, metrics: QualityMetrics, messages: List[Dict]) -> List[QualityIssue]:
|
||||
"""Identify quality issues."""
|
||||
issues = []
|
||||
|
||||
# High error rate
|
||||
if metrics.error_rate > 0.2: # >20% errors
|
||||
issues.append(QualityIssue(
|
||||
issue_id=f"high_error_{metrics.session_id}",
|
||||
session_id=metrics.session_id,
|
||||
issue_type="high_error_rate",
|
||||
severity="high" if metrics.error_rate > 0.3 else "medium",
|
||||
description=f"High error rate: {metrics.error_rate:.1%}",
|
||||
evidence={"error_rate": metrics.error_rate, "error_count": metrics.error_count}
|
||||
))
|
||||
|
||||
# Frequent corrections
|
||||
if metrics.correction_rate > 0.1: # >10% corrections
|
||||
issues.append(QualityIssue(
|
||||
issue_id=f"frequent_corrections_{metrics.session_id}",
|
||||
session_id=metrics.session_id,
|
||||
issue_type="frequent_corrections",
|
||||
severity="medium",
|
||||
description=f"Frequent user corrections: {metrics.correction_rate:.1%}",
|
||||
evidence={"correction_rate": metrics.correction_rate, "corrections": metrics.user_corrections}
|
||||
))
|
||||
|
||||
# Context loss (look for repeated questions)
|
||||
repeated_questions = self._find_repeated_questions(messages)
|
||||
if repeated_questions > 2:
|
||||
issues.append(QualityIssue(
|
||||
issue_id=f"context_loss_{metrics.session_id}",
|
||||
session_id=metrics.session_id,
|
||||
issue_type="context_loss",
|
||||
severity="medium",
|
||||
description=f"Possible context loss: {repeated_questions} repeated questions",
|
||||
evidence={"repeated_questions": repeated_questions}
|
||||
))
|
||||
|
||||
# Low efficiency
|
||||
if metrics.efficiency_score < 0.5:
|
||||
issues.append(QualityIssue(
|
||||
issue_id=f"low_efficiency_{metrics.session_id}",
|
||||
session_id=metrics.session_id,
|
||||
issue_type="low_efficiency",
|
||||
severity="low",
|
||||
description=f"Low efficiency score: {metrics.efficiency_score:.2f}",
|
||||
evidence={"efficiency_score": metrics.efficiency_score}
|
||||
))
|
||||
|
||||
return issues
|
||||
|
||||
def _find_repeated_questions(self, messages: List[Dict]) -> int:
|
||||
"""Find repeated questions in user messages."""
|
||||
user_messages = [m.get("content", "").lower() for m in messages if m.get("role") == "user"]
|
||||
|
||||
# Simple heuristic: look for similar messages
|
||||
repeated = 0
|
||||
seen = set()
|
||||
|
||||
for msg in user_messages:
|
||||
# Normalize message
|
||||
normalized = " ".join(msg.split()[:10]) # First 10 words
|
||||
if normalized in seen:
|
||||
repeated += 1
|
||||
else:
|
||||
seen.add(normalized)
|
||||
|
||||
return repeated
|
||||
|
||||
def _identify_patterns(self, messages: List[Dict]) -> List[str]:
|
||||
"""Identify patterns in the session."""
|
||||
patterns = []
|
||||
|
||||
# Analyze tool usage
|
||||
tool_usage = {}
|
||||
for msg in messages:
|
||||
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
||||
for tc in msg["tool_calls"]:
|
||||
tool_name = tc.get("function", {}).get("name", "unknown")
|
||||
tool_usage[tool_name] = tool_usage.get(tool_name, 0) + 1
|
||||
|
||||
if tool_usage:
|
||||
most_used = max(tool_usage.items(), key=lambda x: x[1])
|
||||
patterns.append(f"Most used tool: {most_used[0]} ({most_used[1]} calls)")
|
||||
|
||||
# Analyze user message patterns
|
||||
user_messages = [m.get("content", "") for m in messages if m.get("role") == "user"]
|
||||
|
||||
if user_messages:
|
||||
avg_length = sum(len(m) for m in user_messages) / len(user_messages)
|
||||
patterns.append(f"Average user message length: {avg_length:.0f} characters")
|
||||
|
||||
questions = sum(1 for m in user_messages if "?" in m)
|
||||
patterns.append(f"Question ratio: {questions/len(user_messages):.0%}")
|
||||
|
||||
return patterns
|
||||
|
||||
def _generate_recommendations(
|
||||
self,
|
||||
metrics: QualityMetrics,
|
||||
issues: List[QualityIssue],
|
||||
patterns: List[str]
|
||||
) -> List[str]:
|
||||
"""Generate recommendations for improvement."""
|
||||
recommendations = []
|
||||
|
||||
if metrics.error_rate > 0.2:
|
||||
recommendations.append("Consider using more specific prompts to reduce errors")
|
||||
|
||||
if metrics.correction_rate > 0.1:
|
||||
recommendations.append("Provide clearer instructions to reduce need for corrections")
|
||||
|
||||
if metrics.message_count > 100:
|
||||
recommendations.append("Consider breaking long sessions into smaller focused sessions")
|
||||
|
||||
if metrics.efficiency_score < 0.5:
|
||||
recommendations.append("Session efficiency is low - consider using templates or warm sessions")
|
||||
|
||||
return recommendations
|
||||
|
||||
def analyze_multiple_sessions(self, session_ids: List[str]) -> Dict[str, Any]:
|
||||
"""Analyze multiple sessions and provide aggregate insights."""
|
||||
analyses = []
|
||||
|
||||
for session_id in session_ids:
|
||||
analysis = self.analyze_session(session_id)
|
||||
if analysis:
|
||||
analyses.append(analysis)
|
||||
|
||||
if not analyses:
|
||||
return {"error": "No sessions analyzed"}
|
||||
|
||||
# Aggregate metrics
|
||||
all_metrics = [a.metrics for a in analyses]
|
||||
|
||||
avg_error_rate = statistics.mean([m.error_rate for m in all_metrics])
|
||||
avg_success_rate = statistics.mean([m.success_rate for m in all_metrics])
|
||||
avg_efficiency = statistics.mean([m.efficiency_score for m in all_metrics])
|
||||
|
||||
# Collect all issues
|
||||
all_issues = []
|
||||
for a in analyses:
|
||||
all_issues.extend(a.issues)
|
||||
|
||||
# Group issues by type
|
||||
issues_by_type = {}
|
||||
for issue in all_issues:
|
||||
if issue.issue_type not in issues_by_type:
|
||||
issues_by_type[issue.issue_type] = []
|
||||
issues_by_type[issue.issue_type].append(issue)
|
||||
|
||||
return {
|
||||
"sessions_analyzed": len(analyses),
|
||||
"aggregate_metrics": {
|
||||
"avg_error_rate": avg_error_rate,
|
||||
"avg_success_rate": avg_success_rate,
|
||||
"avg_efficiency": avg_efficiency
|
||||
},
|
||||
"issues_summary": {
|
||||
issue_type: len(issues)
|
||||
for issue_type, issues in issues_by_type.items()
|
||||
},
|
||||
"top_issues": [
|
||||
{
|
||||
"type": issue.issue_type,
|
||||
"severity": issue.severity,
|
||||
"description": issue.description,
|
||||
"session_id": issue.session_id
|
||||
}
|
||||
for issue in sorted(all_issues, key=lambda x: x.severity == "critical", reverse=True)[:5]
|
||||
],
|
||||
"recommendations": self._generate_aggregate_recommendations(analyses)
|
||||
}
|
||||
|
||||
def _generate_aggregate_recommendations(self, analyses: List[SessionAnalysis]) -> List[str]:
|
||||
"""Generate recommendations based on aggregate analysis."""
|
||||
recommendations = []
|
||||
|
||||
# Check for common issues
|
||||
high_error_sessions = [a for a in analyses if a.metrics.error_rate > 0.2]
|
||||
if len(high_error_sessions) > len(analyses) * 0.3: # >30% sessions have high errors
|
||||
recommendations.append("Systematic issue: Many sessions have high error rates. Consider improving tool documentation or prompts.")
|
||||
|
||||
# Check for efficiency
|
||||
low_efficiency = [a for a in analyses if a.metrics.efficiency_score < 0.5]
|
||||
if len(low_efficiency) > len(analyses) * 0.5: # >50% sessions have low efficiency
|
||||
recommendations.append("Consider implementing warm session provisioning to improve session efficiency.")
|
||||
|
||||
return recommendations
|
||||
|
||||
|
||||
# CLI Interface
|
||||
def quality_analyzer_cli(args: List[str]) -> int:
|
||||
"""CLI interface for session quality analysis."""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Session quality analyzer")
|
||||
subparsers = parser.add_subparsers(dest="command")
|
||||
|
||||
# Analyze single session
|
||||
analyze_parser = subparsers.add_parser("analyze", help="Analyze a single session")
|
||||
analyze_parser.add_argument("session_id", help="Session ID to analyze")
|
||||
|
||||
# Analyze multiple sessions
|
||||
batch_parser = subparsers.add_parser("batch", help="Analyze multiple sessions")
|
||||
batch_parser.add_argument("session_ids", nargs="+", help="Session IDs to analyze")
|
||||
|
||||
# List sessions with quality issues
|
||||
subparsers.add_parser("list-issues", help="List sessions with quality issues")
|
||||
|
||||
parsed = parser.parse_args(args)
|
||||
|
||||
if not parsed.command:
|
||||
parser.print_help()
|
||||
return 1
|
||||
|
||||
try:
|
||||
from hermes_state import SessionDB
|
||||
session_db = SessionDB()
|
||||
except ImportError:
|
||||
print("Error: Cannot import SessionDB")
|
||||
return 1
|
||||
|
||||
analyzer = SessionQualityAnalyzer(session_db)
|
||||
|
||||
if parsed.command == "analyze":
|
||||
analysis = analyzer.analyze_session(parsed.session_id)
|
||||
|
||||
if not analysis:
|
||||
print(f"Failed to analyze session {parsed.session_id}")
|
||||
return 1
|
||||
|
||||
print(f"\n=== Session Quality Analysis: {parsed.session_id ===\n")
|
||||
print(f"Messages: {analysis.metrics.message_count}")
|
||||
print(f"Tool calls: {analysis.metrics.tool_calls}")
|
||||
print(f"Error rate: {analysis.metrics.error_rate:.1%}")
|
||||
print(f"Success rate: {analysis.metrics.success_rate:.1%}")
|
||||
print(f"Efficiency score: {analysis.metrics.efficiency_score:.2f}")
|
||||
|
||||
if analysis.issues:
|
||||
print(f"\nIssues found: {len(analysis.issues)}")
|
||||
for issue in analysis.issues:
|
||||
print(f" [{issue.severity.upper()}] {issue.description}")
|
||||
|
||||
if analysis.patterns:
|
||||
print("\nPatterns:")
|
||||
for pattern in analysis.patterns:
|
||||
print(f" {pattern}")
|
||||
|
||||
if analysis.recommendations:
|
||||
print("\nRecommendations:")
|
||||
for rec in analysis.recommendations:
|
||||
print(f" {rec}")
|
||||
|
||||
return 0
|
||||
|
||||
elif parsed.command == "batch":
|
||||
result = analyzer.analyze_multiple_sessions(parsed.session_ids)
|
||||
|
||||
if "error" in result:
|
||||
print(f"Error: {result['error']}")
|
||||
return 1
|
||||
|
||||
print(f"\n=== Batch Analysis: {result['sessions_analyzed']} sessions ===\n")
|
||||
|
||||
agg = result.get("aggregate_metrics", {})
|
||||
print(f"Average error rate: {agg.get('avg_error_rate', 0):.1%}")
|
||||
print(f"Average success rate: {agg.get('avg_success_rate', 0):.1%}")
|
||||
print(f"Average efficiency: {agg.get('avg_efficiency', 0):.2f}")
|
||||
|
||||
issues = result.get("issues_summary", {})
|
||||
if issues:
|
||||
print("\nIssues summary:")
|
||||
for issue_type, count in issues.items():
|
||||
print(f" {issue_type}: {count}")
|
||||
|
||||
top_issues = result.get("top_issues", [])
|
||||
if top_issues:
|
||||
print("\nTop issues:")
|
||||
for issue in top_issues:
|
||||
print(f" [{issue['severity'].upper()}] {issue['description']} (Session: {issue['session_id']})")
|
||||
|
||||
recommendations = result.get("recommendations", [])
|
||||
if recommendations:
|
||||
print("\nRecommendations:")
|
||||
for rec in recommendations:
|
||||
print(f" {rec}")
|
||||
|
||||
return 0
|
||||
|
||||
elif parsed.command == "list-issues":
|
||||
# Get recent sessions
|
||||
try:
|
||||
sessions = session_db.get_messages.__self__.execute_write(
|
||||
"SELECT id FROM sessions ORDER BY started_at DESC LIMIT 20"
|
||||
)
|
||||
|
||||
if not sessions:
|
||||
print("No sessions found")
|
||||
return 0
|
||||
|
||||
session_ids = [s[0] for s in sessions]
|
||||
result = analyzer.analyze_multiple_sessions(session_ids)
|
||||
|
||||
if "error" in result:
|
||||
print(f"Error: {result['error']}")
|
||||
return 1
|
||||
|
||||
print(f"\n=== Sessions with Quality Issues (last 20 sessions) ===\n")
|
||||
|
||||
for issue in result.get("top_issues", []):
|
||||
print(f"Session: {issue['session_id']}")
|
||||
print(f" [{issue['severity'].upper()}] {issue['description']}")
|
||||
print()
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
return 1
|
||||
|
||||
return 0
|
||||
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
sys.exit(quality_analyzer_cli(sys.argv[1:]))
|
||||
Reference in New Issue
Block a user