#!/usr/bin/env python3 """ Self-Grader Module for Timmy/UniWizard Grades Hermes session logs to identify patterns in failures and track improvement. Connects to quality scoring (#98) and adaptive routing (#88). Author: Timmy (UniWizard) """ import json import sqlite3 import re from pathlib import Path from dataclasses import dataclass, asdict from datetime import datetime, timedelta from typing import List, Dict, Optional, Any, Tuple from collections import defaultdict import statistics @dataclass class SessionGrade: """Grade for a single session.""" session_id: str session_file: str graded_at: str # Core metrics task_completed: bool tool_calls_efficient: int # 1-5 scale response_quality: int # 1-5 scale errors_recovered: bool total_api_calls: int # Additional metadata model: str platform: Optional[str] session_start: str duration_seconds: Optional[float] task_summary: str # Error analysis total_errors: int error_types: str # JSON list of error categories tools_with_errors: str # JSON list of tool names # Pattern flags had_repeated_errors: bool had_infinite_loop_risk: bool had_user_clarification: bool @dataclass class WeeklyReport: """Weekly improvement report.""" week_start: str week_end: str total_sessions: int avg_tool_efficiency: float avg_response_quality: float completion_rate: float error_recovery_rate: float # Patterns worst_task_types: List[Tuple[str, float]] most_error_prone_tools: List[Tuple[str, int]] common_error_patterns: List[Tuple[str, int]] # Trends improvement_suggestions: List[str] class SelfGrader: """Grades Hermes sessions and tracks improvement patterns.""" # Error pattern regexes ERROR_PATTERNS = { 'file_not_found': re.compile(r'file.*not found|no such file|does not exist', re.I), 'permission_denied': re.compile(r'permission denied|access denied|unauthorized', re.I), 'timeout': re.compile(r'time(d)?\s*out|deadline exceeded', re.I), 'api_error': re.compile(r'api.*error|rate limit|too many requests', re.I), 'syntax_error': re.compile(r'syntax error|invalid syntax|parse error', re.I), 'command_failed': re.compile(r'exit_code.*[1-9]|command.*failed|failed to', re.I), 'network_error': re.compile(r'network|connection|unreachable|refused', re.I), 'tool_not_found': re.compile(r'tool.*not found|unknown tool|no tool named', re.I), } # Task type patterns TASK_PATTERNS = { 'code_review': re.compile(r'code review|review.*code|review.*pr|pull request', re.I), 'debugging': re.compile(r'debug|fix.*bug|troubleshoot|error.*fix', re.I), 'feature_impl': re.compile(r'implement|add.*feature|build.*function', re.I), 'refactoring': re.compile(r'refactor|clean.*up|reorganize|restructure', re.I), 'documentation': re.compile(r'document|readme|docstring|comment', re.I), 'testing': re.compile(r'test|pytest|unit test|integration test', re.I), 'research': re.compile(r'research|investigate|look up|find.*about', re.I), 'deployment': re.compile(r'deploy|release|publish|push.*prod', re.I), 'data_analysis': re.compile(r'analyze.*data|process.*file|parse.*json|csv', re.I), 'infrastructure': re.compile(r'server|docker|kubernetes|terraform|ansible', re.I), } def __init__(self, grades_db_path: Optional[Path] = None, sessions_dir: Optional[Path] = None): """Initialize the grader with database and sessions directory.""" self.grades_db_path = Path(grades_db_path) if grades_db_path else Path.home() / ".timmy" / "uniwizard" / "session_grades.db" self.sessions_dir = Path(sessions_dir) if sessions_dir else Path.home() / ".hermes" / "sessions" self._init_database() def _init_database(self): """Initialize the SQLite database with schema.""" self.grades_db_path.parent.mkdir(parents=True, exist_ok=True) with sqlite3.connect(self.grades_db_path) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS session_grades ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT UNIQUE NOT NULL, session_file TEXT NOT NULL, graded_at TEXT NOT NULL, -- Core metrics task_completed INTEGER NOT NULL, tool_calls_efficient INTEGER NOT NULL, response_quality INTEGER NOT NULL, errors_recovered INTEGER NOT NULL, total_api_calls INTEGER NOT NULL, -- Metadata model TEXT, platform TEXT, session_start TEXT, duration_seconds REAL, task_summary TEXT, -- Error analysis total_errors INTEGER NOT NULL, error_types TEXT, tools_with_errors TEXT, -- Pattern flags had_repeated_errors INTEGER NOT NULL, had_infinite_loop_risk INTEGER NOT NULL, had_user_clarification INTEGER NOT NULL ) """) # Index for efficient queries conn.execute("CREATE INDEX IF NOT EXISTS idx_graded_at ON session_grades(graded_at)") conn.execute("CREATE INDEX IF NOT EXISTS idx_session_start ON session_grades(session_start)") conn.execute("CREATE INDEX IF NOT EXISTS idx_model ON session_grades(model)") # Weekly reports table conn.execute(""" CREATE TABLE IF NOT EXISTS weekly_reports ( id INTEGER PRIMARY KEY AUTOINCREMENT, week_start TEXT UNIQUE NOT NULL, week_end TEXT NOT NULL, generated_at TEXT NOT NULL, report_json TEXT NOT NULL ) """) conn.commit() def grade_session_file(self, session_path: Path) -> Optional[SessionGrade]: """Grade a single session file.""" try: with open(session_path) as f: data = json.load(f) except (json.JSONDecodeError, IOError) as e: print(f"Error reading {session_path}: {e}") return None session_id = data.get('session_id', '') messages = data.get('messages', []) if not messages: return None # Analyze message flow analysis = self._analyze_messages(messages) # Calculate grades task_completed = self._assess_task_completion(messages, analysis) tool_efficiency = self._assess_tool_efficiency(analysis) response_quality = self._assess_response_quality(messages, analysis) errors_recovered = self._assess_error_recovery(messages, analysis) # Extract task summary from first user message task_summary = "" for msg in messages: if msg.get('role') == 'user': task_summary = msg.get('content', '')[:200] break # Calculate duration if timestamps available duration = None if messages and 'timestamp' in messages[0] and 'timestamp' in messages[-1]: try: start = datetime.fromisoformat(messages[0]['timestamp'].replace('Z', '+00:00')) end = datetime.fromisoformat(messages[-1]['timestamp'].replace('Z', '+00:00')) duration = (end - start).total_seconds() except (ValueError, KeyError): pass return SessionGrade( session_id=session_id, session_file=str(session_path.name), graded_at=datetime.now().isoformat(), task_completed=task_completed, tool_calls_efficient=tool_efficiency, response_quality=response_quality, errors_recovered=errors_recovered, total_api_calls=analysis['total_api_calls'], model=data.get('model', 'unknown'), platform=data.get('platform'), session_start=data.get('session_start', ''), duration_seconds=duration, task_summary=task_summary, total_errors=analysis['total_errors'], error_types=json.dumps(list(analysis['error_types'])), tools_with_errors=json.dumps(list(analysis['tools_with_errors'])), had_repeated_errors=analysis['had_repeated_errors'], had_infinite_loop_risk=analysis['had_infinite_loop_risk'], had_user_clarification=analysis['had_user_clarification'] ) def _analyze_messages(self, messages: List[Dict]) -> Dict[str, Any]: """Analyze message flow to extract metrics.""" analysis = { 'total_api_calls': 0, 'total_errors': 0, 'error_types': set(), 'tools_with_errors': set(), 'tool_call_counts': defaultdict(int), 'error_sequences': [], 'had_repeated_errors': False, 'had_infinite_loop_risk': False, 'had_user_clarification': False, 'final_assistant_msg': None, 'consecutive_errors': 0, 'max_consecutive_errors': 0, } last_tool_was_error = False for i, msg in enumerate(messages): role = msg.get('role') if role == 'assistant': analysis['total_api_calls'] += 1 # Check for clarification requests content = msg.get('content', '') tool_calls = msg.get('tool_calls', []) if tool_calls and tool_calls[0].get('function', {}).get('name') == 'clarify': analysis['had_user_clarification'] = True if 'clarify' in content.lower() and 'need clarification' in content.lower(): analysis['had_user_clarification'] = True # Track tool calls for tc in tool_calls: tool_name = tc.get('function', {}).get('name', 'unknown') analysis['tool_call_counts'][tool_name] += 1 # Track final assistant message analysis['final_assistant_msg'] = msg # Don't reset consecutive errors here - they continue until a tool succeeds elif role == 'tool': content = msg.get('content', '') tool_name = msg.get('name', 'unknown') # Check for errors is_error = self._detect_error(content) if is_error: analysis['total_errors'] += 1 analysis['tools_with_errors'].add(tool_name) # Classify error error_type = self._classify_error(content) analysis['error_types'].add(error_type) # Track consecutive errors (consecutive tool messages with errors) analysis['consecutive_errors'] += 1 analysis['max_consecutive_errors'] = max( analysis['max_consecutive_errors'], analysis['consecutive_errors'] ) last_tool_was_error = True else: # Reset consecutive errors on success analysis['consecutive_errors'] = 0 last_tool_was_error = False # Detect patterns analysis['had_repeated_errors'] = analysis['max_consecutive_errors'] >= 3 analysis['had_infinite_loop_risk'] = ( analysis['max_consecutive_errors'] >= 5 or analysis['total_api_calls'] > 50 ) return analysis def _detect_error(self, content: str) -> bool: """Detect if tool result contains an error.""" if not content: return False content_lower = content.lower() # Check for explicit error indicators error_indicators = [ '"error":', '"error" :', 'error:', 'exception:', '"exit_code": 1', '"exit_code": 2', '"exit_code": -1', 'traceback', 'failed', 'failure', ] for indicator in error_indicators: if indicator in content_lower: return True return False def _classify_error(self, content: str) -> str: """Classify the type of error.""" content_lower = content.lower() for error_type, pattern in self.ERROR_PATTERNS.items(): if pattern.search(content_lower): return error_type return 'unknown' def _assess_task_completion(self, messages: List[Dict], analysis: Dict) -> bool: """Assess whether the task was likely completed.""" if not messages: return False # Check final assistant message final_msg = analysis.get('final_assistant_msg') if not final_msg: return False content = final_msg.get('content', '') # Positive completion indicators completion_phrases = [ 'done', 'completed', 'success', 'finished', 'created', 'implemented', 'fixed', 'resolved', 'saved to', 'here is', 'here are', 'the result', 'output:', 'file:', 'pr:', 'pull request' ] for phrase in completion_phrases: if phrase in content.lower(): return True # Check if there were many errors if analysis['total_errors'] > analysis['total_api_calls'] * 0.3: return False # Check for explicit failure failure_phrases = ['failed', 'unable to', 'could not', 'error:', 'sorry, i cannot'] for phrase in failure_phrases: if phrase in content.lower()[:200]: return False return True def _assess_tool_efficiency(self, analysis: Dict) -> int: """Rate tool call efficiency on 1-5 scale.""" tool_calls = analysis['total_api_calls'] errors = analysis['total_errors'] if tool_calls == 0: return 3 # Neutral if no tool calls error_rate = errors / tool_calls # Score based on error rate and total calls if error_rate == 0 and tool_calls <= 10: return 5 # Perfect efficiency elif error_rate <= 0.1 and tool_calls <= 15: return 4 # Good efficiency elif error_rate <= 0.25 and tool_calls <= 25: return 3 # Average elif error_rate <= 0.4: return 2 # Poor else: return 1 # Very poor def _assess_response_quality(self, messages: List[Dict], analysis: Dict) -> int: """Rate response quality on 1-5 scale.""" final_msg = analysis.get('final_assistant_msg') if not final_msg: return 1 content = final_msg.get('content', '') content_len = len(content) # Quality indicators score = 3 # Start at average # Length heuristics if content_len > 500: score += 1 if content_len > 1000: score += 1 # Code blocks indicate substantive response if '```' in content: score += 1 # Links/references indicate thoroughness if 'http' in content or 'see ' in content.lower(): score += 0.5 # Error penalties if analysis['had_repeated_errors']: score -= 1 if analysis['total_errors'] > 5: score -= 1 # Loop risk is severe if analysis['had_infinite_loop_risk']: score -= 2 return max(1, min(5, int(score))) def _assess_error_recovery(self, messages: List[Dict], analysis: Dict) -> bool: """Assess whether errors were successfully recovered from.""" if analysis['total_errors'] == 0: return True # No errors to recover from # If task completed despite errors, recovered if self._assess_task_completion(messages, analysis): return True # If no repeated errors, likely recovered if not analysis['had_repeated_errors']: return True return False def save_grade(self, grade: SessionGrade) -> bool: """Save a grade to the database.""" try: with sqlite3.connect(self.grades_db_path) as conn: conn.execute(""" INSERT OR REPLACE INTO session_grades ( session_id, session_file, graded_at, task_completed, tool_calls_efficient, response_quality, errors_recovered, total_api_calls, model, platform, session_start, duration_seconds, task_summary, total_errors, error_types, tools_with_errors, had_repeated_errors, had_infinite_loop_risk, had_user_clarification ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( grade.session_id, grade.session_file, grade.graded_at, int(grade.task_completed), grade.tool_calls_efficient, grade.response_quality, int(grade.errors_recovered), grade.total_api_calls, grade.model, grade.platform, grade.session_start, grade.duration_seconds, grade.task_summary, grade.total_errors, grade.error_types, grade.tools_with_errors, int(grade.had_repeated_errors), int(grade.had_infinite_loop_risk), int(grade.had_user_clarification) )) conn.commit() return True except sqlite3.Error as e: print(f"Database error saving grade: {e}") return False def grade_latest_sessions(self, n: int = 10) -> List[SessionGrade]: """Grade the last N ungraded sessions.""" # Get recent session files session_files = sorted( [f for f in self.sessions_dir.glob("session_*.json") if not f.name.endswith("sessions.json")], key=lambda x: x.stat().st_mtime, reverse=True ) # Get already graded sessions graded_ids = set() try: with sqlite3.connect(self.grades_db_path) as conn: cursor = conn.execute("SELECT session_id FROM session_grades") graded_ids = {row[0] for row in cursor.fetchall()} except sqlite3.Error: pass # Grade ungraded sessions grades = [] for sf in session_files[:n]: # Extract session ID from filename session_id = sf.stem.replace('session_', '') if session_id in graded_ids: continue grade = self.grade_session_file(sf) if grade: if self.save_grade(grade): grades.append(grade) return grades def identify_patterns(self, days: int = 7) -> Dict[str, Any]: """Identify patterns in recent graded sessions.""" since = (datetime.now() - timedelta(days=days)).isoformat() with sqlite3.connect(self.grades_db_path) as conn: # Overall stats cursor = conn.execute(""" SELECT COUNT(*), AVG(tool_calls_efficient), AVG(response_quality), AVG(CASE WHEN task_completed THEN 1.0 ELSE 0.0 END), AVG(CASE WHEN errors_recovered THEN 1.0 ELSE 0.0 END) FROM session_grades WHERE graded_at > ? """, (since,)) row = cursor.fetchone() stats = { 'total_sessions': row[0] or 0, 'avg_tool_efficiency': round(row[1] or 0, 2), 'avg_response_quality': round(row[2] or 0, 2), 'completion_rate': round((row[3] or 0) * 100, 1), 'error_recovery_rate': round((row[4] or 0) * 100, 1), } # Tool error analysis cursor = conn.execute(""" SELECT tools_with_errors, COUNT(*) FROM session_grades WHERE graded_at > ? AND tools_with_errors != '[]' GROUP BY tools_with_errors """, (since,)) tool_errors = defaultdict(int) for row in cursor.fetchall(): tools = json.loads(row[0]) for tool in tools: tool_errors[tool] += row[1] # Error type analysis cursor = conn.execute(""" SELECT error_types, COUNT(*) FROM session_grades WHERE graded_at > ? AND error_types != '[]' GROUP BY error_types """, (since,)) error_types = defaultdict(int) for row in cursor.fetchall(): types = json.loads(row[0]) for et in types: error_types[et] += row[1] # Task type performance (infer from task_summary) cursor = conn.execute(""" SELECT task_summary, response_quality FROM session_grades WHERE graded_at > ? """, (since,)) task_scores = defaultdict(list) for row in cursor.fetchall(): summary = row[0] or '' score = row[1] task_type = self._infer_task_type(summary) task_scores[task_type].append(score) avg_task_scores = { tt: round(sum(scores) / len(scores), 2) for tt, scores in task_scores.items() } return { **stats, 'tool_error_counts': dict(tool_errors), 'error_type_counts': dict(error_types), 'task_type_scores': avg_task_scores, } def _infer_task_type(self, summary: str) -> str: """Infer task type from summary text.""" for task_type, pattern in self.TASK_PATTERNS.items(): if pattern.search(summary): return task_type return 'general' def generate_weekly_report(self) -> WeeklyReport: """Generate a weekly improvement report.""" # Calculate week boundaries (Monday to Sunday) today = datetime.now() monday = today - timedelta(days=today.weekday()) sunday = monday + timedelta(days=6) patterns = self.identify_patterns(days=7) # Find worst task types task_scores = patterns.get('task_type_scores', {}) worst_tasks = sorted(task_scores.items(), key=lambda x: x[1])[:3] # Find most error-prone tools tool_errors = patterns.get('tool_error_counts', {}) worst_tools = sorted(tool_errors.items(), key=lambda x: x[1], reverse=True)[:3] # Find common error patterns error_types = patterns.get('error_type_counts', {}) common_errors = sorted(error_types.items(), key=lambda x: x[1], reverse=True)[:3] # Generate suggestions suggestions = self._generate_suggestions(patterns, worst_tasks, worst_tools, common_errors) report = WeeklyReport( week_start=monday.strftime('%Y-%m-%d'), week_end=sunday.strftime('%Y-%m-%d'), total_sessions=patterns['total_sessions'], avg_tool_efficiency=patterns['avg_tool_efficiency'], avg_response_quality=patterns['avg_response_quality'], completion_rate=patterns['completion_rate'], error_recovery_rate=patterns['error_recovery_rate'], worst_task_types=worst_tasks, most_error_prone_tools=worst_tools, common_error_patterns=common_errors, improvement_suggestions=suggestions ) # Save report with sqlite3.connect(self.grades_db_path) as conn: conn.execute(""" INSERT OR REPLACE INTO weekly_reports (week_start, week_end, generated_at, report_json) VALUES (?, ?, ?, ?) """, ( report.week_start, report.week_end, datetime.now().isoformat(), json.dumps(asdict(report)) )) conn.commit() return report def _generate_suggestions(self, patterns: Dict, worst_tasks: List, worst_tools: List, common_errors: List) -> List[str]: """Generate improvement suggestions based on patterns.""" suggestions = [] if patterns['completion_rate'] < 70: suggestions.append("Task completion rate is below 70%. Consider adding pre-task planning steps.") if patterns['avg_tool_efficiency'] < 3: suggestions.append("Tool efficiency is low. Review error recovery patterns and add retry logic.") if worst_tasks: task_names = ', '.join([t[0] for t in worst_tasks]) suggestions.append(f"Lowest scoring task types: {task_names}. Consider skill enhancement.") if worst_tools: tool_names = ', '.join([t[0] for t in worst_tools]) suggestions.append(f"Most error-prone tools: {tool_names}. Review usage patterns.") if common_errors: error_names = ', '.join([e[0] for e in common_errors]) suggestions.append(f"Common error types: {error_names}. Add targeted error handling.") if patterns['error_recovery_rate'] < 80: suggestions.append("Error recovery rate needs improvement. Implement better fallback strategies.") if not suggestions: suggestions.append("Performance is stable. Focus on expanding task coverage.") return suggestions def get_grades_summary(self, days: int = 30) -> str: """Get a human-readable summary of recent grades.""" patterns = self.identify_patterns(days=days) lines = [ f"=== Session Grades Summary (Last {days} days) ===", "", f"Total Sessions Graded: {patterns['total_sessions']}", f"Average Tool Efficiency: {patterns['avg_tool_efficiency']}/5", f"Average Response Quality: {patterns['avg_response_quality']}/5", f"Task Completion Rate: {patterns['completion_rate']}%", f"Error Recovery Rate: {patterns['error_recovery_rate']}%", "", ] if patterns.get('task_type_scores'): lines.append("Task Type Performance:") for task, score in sorted(patterns['task_type_scores'].items(), key=lambda x: -x[1]): lines.append(f" - {task}: {score}/5") lines.append("") if patterns.get('tool_error_counts'): lines.append("Tool Error Counts:") for tool, count in sorted(patterns['tool_error_counts'].items(), key=lambda x: -x[1]): lines.append(f" - {tool}: {count}") lines.append("") return '\n'.join(lines) def main(): """CLI entry point for self-grading.""" import argparse parser = argparse.ArgumentParser(description='Grade Hermes sessions') parser.add_argument('--grade-latest', '-g', type=int, metavar='N', help='Grade the last N ungraded sessions') parser.add_argument('--summary', '-s', action='store_true', help='Show summary of recent grades') parser.add_argument('--days', '-d', type=int, default=7, help='Number of days for summary (default: 7)') parser.add_argument('--report', '-r', action='store_true', help='Generate weekly report') parser.add_argument('--file', '-f', type=Path, help='Grade a specific session file') args = parser.parse_args() grader = SelfGrader() if args.file: grade = grader.grade_session_file(args.file) if grade: grader.save_grade(grade) print(f"Graded session: {grade.session_id}") print(f" Task completed: {grade.task_completed}") print(f" Tool efficiency: {grade.tool_calls_efficient}/5") print(f" Response quality: {grade.response_quality}/5") print(f" Errors recovered: {grade.errors_recovered}") else: print("Failed to grade session") elif args.grade_latest: grades = grader.grade_latest_sessions(args.grade_latest) print(f"Graded {len(grades)} sessions") for g in grades: print(f" - {g.session_id}: quality={g.response_quality}/5, " f"completed={g.task_completed}") elif args.report: report = grader.generate_weekly_report() print(f"\n=== Weekly Report ({report.week_start} to {report.week_end}) ===") print(f"Total Sessions: {report.total_sessions}") print(f"Avg Tool Efficiency: {report.avg_tool_efficiency}/5") print(f"Avg Response Quality: {report.avg_response_quality}/5") print(f"Completion Rate: {report.completion_rate}%") print(f"Error Recovery Rate: {report.error_recovery_rate}%") print("\nSuggestions:") for s in report.improvement_suggestions: print(f" - {s}") else: print(grader.get_grades_summary(days=args.days)) if __name__ == '__main__': main()