Compare commits

..

2 Commits

Author SHA1 Message Date
66b0febdfb fix(cron): expand _SCRIPT_FAILURE_PHRASES with SSH patterns
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 59s
Part of #457, Closes #350

Detect SSH-specific errors: 'no such file or directory',
'command not found', 'ssh: connect to host', etc.
2026-04-14 01:21:43 +00:00
6d79bf7783 feat(cron): SSH dispatch validation utilities
Part of #457, Closes #350

Provides SSHEnvironment that validates remote hermes binary
exists before dispatch, and DispatchResult with structured
failure reasons.
2026-04-14 01:21:39 +00:00
4 changed files with 221 additions and 597 deletions

View File

@@ -182,6 +182,15 @@ _SCRIPT_FAILURE_PHRASES = (
"exit status",
"non-zero exit",
"did not complete",
# SSH-specific failure patterns (#350)
"no such file or directory",
"command not found",
"hermes binary not found",
"hermes not found",
"ssh: connect to host",
"connection timed out",
"host key verification failed",
"no route to host",
"could not run",
"unable to execute",
"permission denied",

212
cron/ssh_dispatch.py Normal file
View File

@@ -0,0 +1,212 @@
"""
SSH dispatch utilities for cron jobs.
Provides validated remote execution so broken hermes binary paths
are caught before draining the dispatch queue.
Usage:
from cron.ssh_dispatch import SSHEnvironment, format_dispatch_report
ssh = SSHEnvironment(host="root@ezra", agent="allegro")
result = ssh.dispatch("cron tick")
if not result.success:
print(result.failure_reason)
"""
import subprocess
import shutil
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class DispatchResult:
"""Structured result of a remote command dispatch."""
host: str
command: str
success: bool
exit_code: Optional[int] = None
stdout: str = ""
stderr: str = ""
failure_reason: Optional[str] = None
duration_s: float = 0.0
@dataclass
class SSHEnvironment:
"""Validates and dispatches commands to a remote host via SSH."""
host: str # e.g. "root@ezra" or "192.168.1.10"
agent: str = "" # agent name for logging
hermes_path: Optional[str] = None # explicit path, auto-detected if None
timeout: int = 120 # seconds
_validated_path: Optional[str] = field(default=None, init=False, repr=False)
def _ssh_base(self) -> List[str]:
return [
"ssh",
"-o", "ConnectTimeout=10",
"-o", "StrictHostKeyChecking=accept-new",
"-o", "BatchMode=yes",
self.host,
]
def _probe_remote_binary(self, candidate: str) -> bool:
"""Check if a hermes binary exists and is executable on the remote host."""
try:
result = subprocess.run(
self._ssh_base() + [f"test -x {candidate}"],
capture_output=True, timeout=15,
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
def detect_hermes_binary(self) -> Optional[str]:
"""Find a working hermes binary on the remote host."""
if self._validated_path:
return self._validated_path
candidates = []
if self.hermes_path:
candidates.append(self.hermes_path)
# Common locations
candidates.extend([
"hermes", # on PATH
"~/.local/bin/hermes",
"/usr/local/bin/hermes",
f"~/wizards/{self.agent}/venv/bin/hermes" if self.agent else "",
f"/root/wizards/{self.agent}/venv/bin/hermes" if self.agent else "",
])
candidates = [c for c in candidates if c]
for candidate in candidates:
if self._probe_remote_binary(candidate):
self._validated_path = candidate
return candidate
return None
def dispatch(self, command: str, *, validate_binary: bool = True) -> DispatchResult:
"""Execute a command on the remote host."""
import time
start = time.monotonic()
if validate_binary:
binary = self.detect_hermes_binary()
if not binary:
return DispatchResult(
host=self.host,
command=command,
success=False,
failure_reason=f"No working hermes binary found on {self.host}",
duration_s=time.monotonic() - start,
)
try:
result = subprocess.run(
self._ssh_base() + [command],
capture_output=True,
timeout=self.timeout,
)
duration = time.monotonic() - start
stdout = result.stdout.decode("utf-8", errors="replace")
stderr = result.stderr.decode("utf-8", errors="replace")
failure_reason = None
if result.returncode != 0:
failure_reason = _classify_ssh_error(stderr, result.returncode)
return DispatchResult(
host=self.host,
command=command,
success=result.returncode == 0,
exit_code=result.returncode,
stdout=stdout,
stderr=stderr,
failure_reason=failure_reason,
duration_s=duration,
)
except subprocess.TimeoutExpired:
return DispatchResult(
host=self.host,
command=command,
success=False,
failure_reason=f"SSH command timed out after {self.timeout}s",
duration_s=time.monotonic() - start,
)
except FileNotFoundError:
return DispatchResult(
host=self.host,
command=command,
success=False,
failure_reason="ssh binary not found on local system",
duration_s=time.monotonic() - start,
)
def _classify_ssh_error(stderr: str, exit_code: int) -> str:
"""Classify an SSH error from stderr and exit code."""
lower = stderr.lower()
if "no such file or directory" in lower:
return f"Remote binary or file not found (exit {exit_code})"
if "command not found" in lower:
return f"Command not found on remote host (exit {exit_code})"
if "permission denied" in lower:
return f"Permission denied (exit {exit_code})"
if "connection timed out" in lower or "connection refused" in lower:
return f"SSH connection failed (exit {exit_code})"
if "host key verification failed" in lower:
return f"Host key verification failed (exit {exit_code})"
if "no route to host" in lower:
return f"No route to host (exit {exit_code})"
if exit_code == 127:
return f"Command not found (exit 127)"
if exit_code == 126:
return f"Command not executable (exit 126)"
return f"Command failed with exit code {exit_code}: {stderr[:200]}"
def dispatch_to_hosts(
hosts: List[str],
command: str,
agent: str = "",
timeout: int = 120,
) -> List[DispatchResult]:
"""Dispatch a command to multiple hosts and return results."""
results = []
for host in hosts:
ssh = SSHEnvironment(host=host, agent=agent, timeout=timeout)
result = ssh.dispatch(command)
results.append(result)
return results
def format_dispatch_report(results: List[DispatchResult]) -> str:
"""Format a human-readable report of dispatch results."""
lines = ["## Dispatch Report", ""]
succeeded = [r for r in results if r.success]
failed = [r for r in results if not r.success]
lines.append(f"**Total:** {len(results)} hosts | "
f"**OK:** {len(succeeded)} | **Failed:** {len(failed)}")
lines.append("")
for r in results:
status = "OK" if r.success else "FAIL"
lines.append(f"### {r.host} [{status}]")
lines.append(f"- Command: `{r.command}`")
lines.append(f"- Duration: {r.duration_s:.1f}s")
if r.exit_code is not None:
lines.append(f"- Exit code: {r.exit_code}")
if r.failure_reason:
lines.append(f"- **Failure:** {r.failure_reason}")
if r.stderr and not r.success:
lines.append(f"- Stderr: `{r.stderr[:300]}`")
lines.append("")
return "\n".join(lines)

View File

@@ -5258,29 +5258,6 @@ For more help on a command:
sessions_parser.set_defaults(func=cmd_sessions)
# Session quality analyzer command
quality_parser = subparsers.add_parser(
"quality",
help="Session quality analysis",
description="Analyze session quality metrics and identify issues"
)
quality_subparsers = quality_parser.add_subparsers(dest="quality_command")
# Analyze single session
quality_analyze = quality_subparsers.add_parser("analyze", help="Analyze a single session")
quality_analyze.add_argument("session_id", help="Session ID to analyze")
# Analyze multiple sessions
quality_batch = quality_subparsers.add_parser("batch", help="Analyze multiple sessions")
quality_batch.add_argument("session_ids", nargs="+", help="Session IDs to analyze")
# List sessions with quality issues
quality_subparsers.add_parser("list-issues", help="List sessions with quality issues")
quality_parser.set_defaults(func=cmd_quality)
# =========================================================================
# insights command
# =========================================================================
@@ -5621,39 +5598,3 @@ Examples:
if __name__ == "__main__":
main()
def cmd_quality(args):
"""Handle session quality analysis commands."""
from hermes_cli.colors import Colors, color
subcmd = getattr(args, 'quality_command', None)
if subcmd is None:
print(color("Session Quality Analysis", Colors.CYAN))
print("\nCommands:")
print(" hermes quality analyze SESSION_ID - Analyze a single session")
print(" hermes quality batch SESSION_IDS... - Analyze multiple sessions")
print(" hermes quality list-issues - List sessions with quality issues")
return 0
try:
from tools.session_quality_analyzer import quality_analyzer_cli
args_list = []
if subcmd == "analyze":
args_list = ["analyze", args.session_id]
elif subcmd == "batch":
args_list = ["batch"] + args.session_ids
elif subcmd == "list-issues":
args_list = ["list-issues"]
return quality_analyzer_cli(args_list)
except ImportError as e:
print(color(f"Error: Cannot import session_quality_analyzer module: {e}", Colors.RED))
return 1
except Exception as e:
print(color(f"Error: {e}", Colors.RED))
return 1

View File

@@ -1,538 +0,0 @@
"""
Session Quality Analyzer
Analyzes session quality metrics to identify patterns for improvement.
Addresses research questions from issue #327.
Issue: #327
"""
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict, field
import statistics
logger = logging.getLogger(__name__)
@dataclass
class QualityMetrics:
"""Quality metrics for a session."""
session_id: str
message_count: int = 0
tool_calls: int = 0
successful_tool_calls: int = 0
error_count: int = 0
user_corrections: int = 0
completion_time_seconds: float = 0.0
context_switches: int = 0 # Number of topic changes
avg_response_time: float = 0.0
token_usage: int = 0
@property
def error_rate(self) -> float:
"""Calculate error rate."""
if self.tool_calls == 0:
return 0.0
return self.error_count / self.tool_calls
@property
def success_rate(self) -> float:
"""Calculate success rate."""
if self.tool_calls == 0:
return 0.0
return self.successful_tool_calls / self.tool_calls
@property
def correction_rate(self) -> float:
"""Calculate user correction rate."""
if self.message_count == 0:
return 0.0
return self.user_corrections / self.message_count
@property
def efficiency_score(self) -> float:
"""Calculate efficiency score (0-1)."""
if self.message_count == 0:
return 0.0
# Factors:
# 1. High success rate (weight: 0.4)
# 2. Low error rate (weight: 0.3)
# 3. Low correction rate (weight: 0.2)
# 4. Reasonable message count (weight: 0.1)
success_score = self.success_rate * 0.4
error_score = (1 - self.error_rate) * 0.3
correction_score = (1 - min(1.0, self.correction_rate * 5)) * 0.2 # Scale correction rate
# Message count penalty for very long sessions
msg_score = 0.1
if self.message_count > 100:
msg_score = 0.05
elif self.message_count > 50:
msg_score = 0.08
return success_score + error_score + correction_score + msg_score
def to_dict(self) -> Dict[str, Any]:
return {
"session_id": self.session_id,
"message_count": self.message_count,
"tool_calls": self.tool_calls,
"successful_tool_calls": self.successful_tool_calls,
"error_count": self.error_count,
"user_corrections": self.user_corrections,
"completion_time_seconds": self.completion_time_seconds,
"context_switches": self.context_switches,
"avg_response_time": self.avg_response_time,
"token_usage": self.token_usage,
"error_rate": self.error_rate,
"success_rate": self.success_rate,
"correction_rate": self.correction_rate,
"efficiency_score": self.efficiency_score
}
@dataclass
class QualityIssue:
"""Issue discovered during quality analysis."""
issue_id: str
session_id: str
issue_type: str # "high_error_rate", "frequent_corrections", "context_loss", etc.
severity: str # "low", "medium", "high", "critical"
description: str
evidence: Dict[str, Any] = field(default_factory=dict)
discovered_at: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class SessionAnalysis:
"""Complete analysis of a session."""
session_id: str
metrics: QualityMetrics
issues: List[QualityIssue] = field(default_factory=list)
patterns: List[str] = field(default_factory=list)
recommendations: List[str] = field(default_factory=list)
analyzed_at: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> Dict[str, Any]:
return {
"session_id": self.session_id,
"metrics": self.metrics.to_dict(),
"issues": [i.to_dict() for i in self.issues],
"patterns": self.patterns,
"recommendations": self.recommendations,
"analyzed_at": self.analyzed_at
}
class SessionQualityAnalyzer:
"""Analyze session quality and identify issues."""
def __init__(self, session_db=None):
self.session_db = session_db
def analyze_session(self, session_id: str) -> Optional[SessionAnalysis]:
"""Analyze a single session."""
if not self.session_db:
return None
try:
messages = self.session_db.get_messages(session_id)
if not messages:
return None
# Calculate metrics
metrics = self._calculate_metrics(session_id, messages)
# Identify issues
issues = self._identify_issues(metrics, messages)
# Identify patterns
patterns = self._identify_patterns(messages)
# Generate recommendations
recommendations = self._generate_recommendations(metrics, issues, patterns)
return SessionAnalysis(
session_id=session_id,
metrics=metrics,
issues=issues,
patterns=patterns,
recommendations=recommendations
)
except Exception as e:
logger.error(f"Failed to analyze session: {e}")
return None
def _calculate_metrics(self, session_id: str, messages: List[Dict]) -> QualityMetrics:
"""Calculate quality metrics for a session."""
tool_calls = 0
successful_tool_calls = 0
error_count = 0
user_corrections = 0
for i, msg in enumerate(messages):
# Count tool calls
if msg.get("role") == "assistant" and msg.get("tool_calls"):
tool_calls += len(msg["tool_calls"])
# Check tool results
if msg.get("role") == "tool":
content = msg.get("content", "").lower()
if "error" in content or "failed" in content:
error_count += 1
else:
successful_tool_calls += 1
# Count user corrections (user message after error)
if (msg.get("role") == "user" and i > 0 and
messages[i-1].get("role") == "tool" and
("error" in messages[i-1].get("content", "").lower() or
"failed" in messages[i-1].get("content", "").lower())):
user_corrections += 1
return QualityMetrics(
session_id=session_id,
message_count=len(messages),
tool_calls=tool_calls,
successful_tool_calls=successful_tool_calls,
error_count=error_count,
user_corrections=user_corrections
)
def _identify_issues(self, metrics: QualityMetrics, messages: List[Dict]) -> List[QualityIssue]:
"""Identify quality issues."""
issues = []
# High error rate
if metrics.error_rate > 0.2: # >20% errors
issues.append(QualityIssue(
issue_id=f"high_error_{metrics.session_id}",
session_id=metrics.session_id,
issue_type="high_error_rate",
severity="high" if metrics.error_rate > 0.3 else "medium",
description=f"High error rate: {metrics.error_rate:.1%}",
evidence={"error_rate": metrics.error_rate, "error_count": metrics.error_count}
))
# Frequent corrections
if metrics.correction_rate > 0.1: # >10% corrections
issues.append(QualityIssue(
issue_id=f"frequent_corrections_{metrics.session_id}",
session_id=metrics.session_id,
issue_type="frequent_corrections",
severity="medium",
description=f"Frequent user corrections: {metrics.correction_rate:.1%}",
evidence={"correction_rate": metrics.correction_rate, "corrections": metrics.user_corrections}
))
# Context loss (look for repeated questions)
repeated_questions = self._find_repeated_questions(messages)
if repeated_questions > 2:
issues.append(QualityIssue(
issue_id=f"context_loss_{metrics.session_id}",
session_id=metrics.session_id,
issue_type="context_loss",
severity="medium",
description=f"Possible context loss: {repeated_questions} repeated questions",
evidence={"repeated_questions": repeated_questions}
))
# Low efficiency
if metrics.efficiency_score < 0.5:
issues.append(QualityIssue(
issue_id=f"low_efficiency_{metrics.session_id}",
session_id=metrics.session_id,
issue_type="low_efficiency",
severity="low",
description=f"Low efficiency score: {metrics.efficiency_score:.2f}",
evidence={"efficiency_score": metrics.efficiency_score}
))
return issues
def _find_repeated_questions(self, messages: List[Dict]) -> int:
"""Find repeated questions in user messages."""
user_messages = [m.get("content", "").lower() for m in messages if m.get("role") == "user"]
# Simple heuristic: look for similar messages
repeated = 0
seen = set()
for msg in user_messages:
# Normalize message
normalized = " ".join(msg.split()[:10]) # First 10 words
if normalized in seen:
repeated += 1
else:
seen.add(normalized)
return repeated
def _identify_patterns(self, messages: List[Dict]) -> List[str]:
"""Identify patterns in the session."""
patterns = []
# Analyze tool usage
tool_usage = {}
for msg in messages:
if msg.get("role") == "assistant" and msg.get("tool_calls"):
for tc in msg["tool_calls"]:
tool_name = tc.get("function", {}).get("name", "unknown")
tool_usage[tool_name] = tool_usage.get(tool_name, 0) + 1
if tool_usage:
most_used = max(tool_usage.items(), key=lambda x: x[1])
patterns.append(f"Most used tool: {most_used[0]} ({most_used[1]} calls)")
# Analyze user message patterns
user_messages = [m.get("content", "") for m in messages if m.get("role") == "user"]
if user_messages:
avg_length = sum(len(m) for m in user_messages) / len(user_messages)
patterns.append(f"Average user message length: {avg_length:.0f} characters")
questions = sum(1 for m in user_messages if "?" in m)
patterns.append(f"Question ratio: {questions/len(user_messages):.0%}")
return patterns
def _generate_recommendations(
self,
metrics: QualityMetrics,
issues: List[QualityIssue],
patterns: List[str]
) -> List[str]:
"""Generate recommendations for improvement."""
recommendations = []
if metrics.error_rate > 0.2:
recommendations.append("Consider using more specific prompts to reduce errors")
if metrics.correction_rate > 0.1:
recommendations.append("Provide clearer instructions to reduce need for corrections")
if metrics.message_count > 100:
recommendations.append("Consider breaking long sessions into smaller focused sessions")
if metrics.efficiency_score < 0.5:
recommendations.append("Session efficiency is low - consider using templates or warm sessions")
return recommendations
def analyze_multiple_sessions(self, session_ids: List[str]) -> Dict[str, Any]:
"""Analyze multiple sessions and provide aggregate insights."""
analyses = []
for session_id in session_ids:
analysis = self.analyze_session(session_id)
if analysis:
analyses.append(analysis)
if not analyses:
return {"error": "No sessions analyzed"}
# Aggregate metrics
all_metrics = [a.metrics for a in analyses]
avg_error_rate = statistics.mean([m.error_rate for m in all_metrics])
avg_success_rate = statistics.mean([m.success_rate for m in all_metrics])
avg_efficiency = statistics.mean([m.efficiency_score for m in all_metrics])
# Collect all issues
all_issues = []
for a in analyses:
all_issues.extend(a.issues)
# Group issues by type
issues_by_type = {}
for issue in all_issues:
if issue.issue_type not in issues_by_type:
issues_by_type[issue.issue_type] = []
issues_by_type[issue.issue_type].append(issue)
return {
"sessions_analyzed": len(analyses),
"aggregate_metrics": {
"avg_error_rate": avg_error_rate,
"avg_success_rate": avg_success_rate,
"avg_efficiency": avg_efficiency
},
"issues_summary": {
issue_type: len(issues)
for issue_type, issues in issues_by_type.items()
},
"top_issues": [
{
"type": issue.issue_type,
"severity": issue.severity,
"description": issue.description,
"session_id": issue.session_id
}
for issue in sorted(all_issues, key=lambda x: x.severity == "critical", reverse=True)[:5]
],
"recommendations": self._generate_aggregate_recommendations(analyses)
}
def _generate_aggregate_recommendations(self, analyses: List[SessionAnalysis]) -> List[str]:
"""Generate recommendations based on aggregate analysis."""
recommendations = []
# Check for common issues
high_error_sessions = [a for a in analyses if a.metrics.error_rate > 0.2]
if len(high_error_sessions) > len(analyses) * 0.3: # >30% sessions have high errors
recommendations.append("Systematic issue: Many sessions have high error rates. Consider improving tool documentation or prompts.")
# Check for efficiency
low_efficiency = [a for a in analyses if a.metrics.efficiency_score < 0.5]
if len(low_efficiency) > len(analyses) * 0.5: # >50% sessions have low efficiency
recommendations.append("Consider implementing warm session provisioning to improve session efficiency.")
return recommendations
# CLI Interface
def quality_analyzer_cli(args: List[str]) -> int:
"""CLI interface for session quality analysis."""
import argparse
parser = argparse.ArgumentParser(description="Session quality analyzer")
subparsers = parser.add_subparsers(dest="command")
# Analyze single session
analyze_parser = subparsers.add_parser("analyze", help="Analyze a single session")
analyze_parser.add_argument("session_id", help="Session ID to analyze")
# Analyze multiple sessions
batch_parser = subparsers.add_parser("batch", help="Analyze multiple sessions")
batch_parser.add_argument("session_ids", nargs="+", help="Session IDs to analyze")
# List sessions with quality issues
subparsers.add_parser("list-issues", help="List sessions with quality issues")
parsed = parser.parse_args(args)
if not parsed.command:
parser.print_help()
return 1
try:
from hermes_state import SessionDB
session_db = SessionDB()
except ImportError:
print("Error: Cannot import SessionDB")
return 1
analyzer = SessionQualityAnalyzer(session_db)
if parsed.command == "analyze":
analysis = analyzer.analyze_session(parsed.session_id)
if not analysis:
print(f"Failed to analyze session {parsed.session_id}")
return 1
print(f"\n=== Session Quality Analysis: {parsed.session_id ===\n")
print(f"Messages: {analysis.metrics.message_count}")
print(f"Tool calls: {analysis.metrics.tool_calls}")
print(f"Error rate: {analysis.metrics.error_rate:.1%}")
print(f"Success rate: {analysis.metrics.success_rate:.1%}")
print(f"Efficiency score: {analysis.metrics.efficiency_score:.2f}")
if analysis.issues:
print(f"\nIssues found: {len(analysis.issues)}")
for issue in analysis.issues:
print(f" [{issue.severity.upper()}] {issue.description}")
if analysis.patterns:
print("\nPatterns:")
for pattern in analysis.patterns:
print(f" {pattern}")
if analysis.recommendations:
print("\nRecommendations:")
for rec in analysis.recommendations:
print(f" {rec}")
return 0
elif parsed.command == "batch":
result = analyzer.analyze_multiple_sessions(parsed.session_ids)
if "error" in result:
print(f"Error: {result['error']}")
return 1
print(f"\n=== Batch Analysis: {result['sessions_analyzed']} sessions ===\n")
agg = result.get("aggregate_metrics", {})
print(f"Average error rate: {agg.get('avg_error_rate', 0):.1%}")
print(f"Average success rate: {agg.get('avg_success_rate', 0):.1%}")
print(f"Average efficiency: {agg.get('avg_efficiency', 0):.2f}")
issues = result.get("issues_summary", {})
if issues:
print("\nIssues summary:")
for issue_type, count in issues.items():
print(f" {issue_type}: {count}")
top_issues = result.get("top_issues", [])
if top_issues:
print("\nTop issues:")
for issue in top_issues:
print(f" [{issue['severity'].upper()}] {issue['description']} (Session: {issue['session_id']})")
recommendations = result.get("recommendations", [])
if recommendations:
print("\nRecommendations:")
for rec in recommendations:
print(f" {rec}")
return 0
elif parsed.command == "list-issues":
# Get recent sessions
try:
sessions = session_db.get_messages.__self__.execute_write(
"SELECT id FROM sessions ORDER BY started_at DESC LIMIT 20"
)
if not sessions:
print("No sessions found")
return 0
session_ids = [s[0] for s in sessions]
result = analyzer.analyze_multiple_sessions(session_ids)
if "error" in result:
print(f"Error: {result['error']}")
return 1
print(f"\n=== Sessions with Quality Issues (last 20 sessions) ===\n")
for issue in result.get("top_issues", []):
print(f"Session: {issue['session_id']}")
print(f" [{issue['severity'].upper()}] {issue['description']}")
print()
except Exception as e:
print(f"Error: {e}")
return 1
return 0
return 1
if __name__ == "__main__":
import sys
sys.exit(quality_analyzer_cli(sys.argv[1:]))