Files
timmy-home/uniwizard/self_grader.py

770 lines
30 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Self-Grader Module for Timmy/UniWizard
Grades Hermes session logs to identify patterns in failures and track improvement.
Connects to quality scoring (#98) and adaptive routing (#88).
Author: Timmy (UniWizard)
"""
import json
import sqlite3
import re
from pathlib import Path
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Any, Tuple
from collections import defaultdict
import statistics
@dataclass
class SessionGrade:
"""Grade for a single session."""
session_id: str
session_file: str
graded_at: str
# Core metrics
task_completed: bool
tool_calls_efficient: int # 1-5 scale
response_quality: int # 1-5 scale
errors_recovered: bool
total_api_calls: int
# Additional metadata
model: str
platform: Optional[str]
session_start: str
duration_seconds: Optional[float]
task_summary: str
# Error analysis
total_errors: int
error_types: str # JSON list of error categories
tools_with_errors: str # JSON list of tool names
# Pattern flags
had_repeated_errors: bool
had_infinite_loop_risk: bool
had_user_clarification: bool
@dataclass
class WeeklyReport:
"""Weekly improvement report."""
week_start: str
week_end: str
total_sessions: int
avg_tool_efficiency: float
avg_response_quality: float
completion_rate: float
error_recovery_rate: float
# Patterns
worst_task_types: List[Tuple[str, float]]
most_error_prone_tools: List[Tuple[str, int]]
common_error_patterns: List[Tuple[str, int]]
# Trends
improvement_suggestions: List[str]
class SelfGrader:
"""Grades Hermes sessions and tracks improvement patterns."""
# Error pattern regexes
ERROR_PATTERNS = {
'file_not_found': re.compile(r'file.*not found|no such file|does not exist', re.I),
'permission_denied': re.compile(r'permission denied|access denied|unauthorized', re.I),
'timeout': re.compile(r'time(d)?\s*out|deadline exceeded', re.I),
'api_error': re.compile(r'api.*error|rate limit|too many requests', re.I),
'syntax_error': re.compile(r'syntax error|invalid syntax|parse error', re.I),
'command_failed': re.compile(r'exit_code.*[1-9]|command.*failed|failed to', re.I),
'network_error': re.compile(r'network|connection|unreachable|refused', re.I),
'tool_not_found': re.compile(r'tool.*not found|unknown tool|no tool named', re.I),
}
# Task type patterns
TASK_PATTERNS = {
'code_review': re.compile(r'code review|review.*code|review.*pr|pull request', re.I),
'debugging': re.compile(r'debug|fix.*bug|troubleshoot|error.*fix', re.I),
'feature_impl': re.compile(r'implement|add.*feature|build.*function', re.I),
'refactoring': re.compile(r'refactor|clean.*up|reorganize|restructure', re.I),
'documentation': re.compile(r'document|readme|docstring|comment', re.I),
'testing': re.compile(r'test|pytest|unit test|integration test', re.I),
'research': re.compile(r'research|investigate|look up|find.*about', re.I),
'deployment': re.compile(r'deploy|release|publish|push.*prod', re.I),
'data_analysis': re.compile(r'analyze.*data|process.*file|parse.*json|csv', re.I),
'infrastructure': re.compile(r'server|docker|kubernetes|terraform|ansible', re.I),
}
def __init__(self, grades_db_path: Optional[Path] = None,
sessions_dir: Optional[Path] = None):
"""Initialize the grader with database and sessions directory."""
self.grades_db_path = Path(grades_db_path) if grades_db_path else Path.home() / ".timmy" / "uniwizard" / "session_grades.db"
self.sessions_dir = Path(sessions_dir) if sessions_dir else Path.home() / ".hermes" / "sessions"
self._init_database()
def _init_database(self):
"""Initialize the SQLite database with schema."""
self.grades_db_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(self.grades_db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS session_grades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT UNIQUE NOT NULL,
session_file TEXT NOT NULL,
graded_at TEXT NOT NULL,
-- Core metrics
task_completed INTEGER NOT NULL,
tool_calls_efficient INTEGER NOT NULL,
response_quality INTEGER NOT NULL,
errors_recovered INTEGER NOT NULL,
total_api_calls INTEGER NOT NULL,
-- Metadata
model TEXT,
platform TEXT,
session_start TEXT,
duration_seconds REAL,
task_summary TEXT,
-- Error analysis
total_errors INTEGER NOT NULL,
error_types TEXT,
tools_with_errors TEXT,
-- Pattern flags
had_repeated_errors INTEGER NOT NULL,
had_infinite_loop_risk INTEGER NOT NULL,
had_user_clarification INTEGER NOT NULL
)
""")
# Index for efficient queries
conn.execute("CREATE INDEX IF NOT EXISTS idx_graded_at ON session_grades(graded_at)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_session_start ON session_grades(session_start)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_model ON session_grades(model)")
# Weekly reports table
conn.execute("""
CREATE TABLE IF NOT EXISTS weekly_reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
week_start TEXT UNIQUE NOT NULL,
week_end TEXT NOT NULL,
generated_at TEXT NOT NULL,
report_json TEXT NOT NULL
)
""")
conn.commit()
def grade_session_file(self, session_path: Path) -> Optional[SessionGrade]:
"""Grade a single session file."""
try:
with open(session_path) as f:
data = json.load(f)
except (json.JSONDecodeError, IOError) as e:
print(f"Error reading {session_path}: {e}")
return None
session_id = data.get('session_id', '')
messages = data.get('messages', [])
if not messages:
return None
# Analyze message flow
analysis = self._analyze_messages(messages)
# Calculate grades
task_completed = self._assess_task_completion(messages, analysis)
tool_efficiency = self._assess_tool_efficiency(analysis)
response_quality = self._assess_response_quality(messages, analysis)
errors_recovered = self._assess_error_recovery(messages, analysis)
# Extract task summary from first user message
task_summary = ""
for msg in messages:
if msg.get('role') == 'user':
task_summary = msg.get('content', '')[:200]
break
# Calculate duration if timestamps available
duration = None
if messages and 'timestamp' in messages[0] and 'timestamp' in messages[-1]:
try:
start = datetime.fromisoformat(messages[0]['timestamp'].replace('Z', '+00:00'))
end = datetime.fromisoformat(messages[-1]['timestamp'].replace('Z', '+00:00'))
duration = (end - start).total_seconds()
except (ValueError, KeyError):
pass
return SessionGrade(
session_id=session_id,
session_file=str(session_path.name),
graded_at=datetime.now().isoformat(),
task_completed=task_completed,
tool_calls_efficient=tool_efficiency,
response_quality=response_quality,
errors_recovered=errors_recovered,
total_api_calls=analysis['total_api_calls'],
model=data.get('model', 'unknown'),
platform=data.get('platform'),
session_start=data.get('session_start', ''),
duration_seconds=duration,
task_summary=task_summary,
total_errors=analysis['total_errors'],
error_types=json.dumps(list(analysis['error_types'])),
tools_with_errors=json.dumps(list(analysis['tools_with_errors'])),
had_repeated_errors=analysis['had_repeated_errors'],
had_infinite_loop_risk=analysis['had_infinite_loop_risk'],
had_user_clarification=analysis['had_user_clarification']
)
def _analyze_messages(self, messages: List[Dict]) -> Dict[str, Any]:
"""Analyze message flow to extract metrics."""
analysis = {
'total_api_calls': 0,
'total_errors': 0,
'error_types': set(),
'tools_with_errors': set(),
'tool_call_counts': defaultdict(int),
'error_sequences': [],
'had_repeated_errors': False,
'had_infinite_loop_risk': False,
'had_user_clarification': False,
'final_assistant_msg': None,
'consecutive_errors': 0,
'max_consecutive_errors': 0,
}
last_tool_was_error = False
for i, msg in enumerate(messages):
role = msg.get('role')
if role == 'assistant':
analysis['total_api_calls'] += 1
# Check for clarification requests
content = msg.get('content', '')
tool_calls = msg.get('tool_calls', [])
if tool_calls and tool_calls[0].get('function', {}).get('name') == 'clarify':
analysis['had_user_clarification'] = True
if 'clarify' in content.lower() and 'need clarification' in content.lower():
analysis['had_user_clarification'] = True
# Track tool calls
for tc in tool_calls:
tool_name = tc.get('function', {}).get('name', 'unknown')
analysis['tool_call_counts'][tool_name] += 1
# Track final assistant message
analysis['final_assistant_msg'] = msg
# Don't reset consecutive errors here - they continue until a tool succeeds
elif role == 'tool':
content = msg.get('content', '')
tool_name = msg.get('name', 'unknown')
# Check for errors
is_error = self._detect_error(content)
if is_error:
analysis['total_errors'] += 1
analysis['tools_with_errors'].add(tool_name)
# Classify error
error_type = self._classify_error(content)
analysis['error_types'].add(error_type)
# Track consecutive errors (consecutive tool messages with errors)
analysis['consecutive_errors'] += 1
analysis['max_consecutive_errors'] = max(
analysis['max_consecutive_errors'],
analysis['consecutive_errors']
)
last_tool_was_error = True
else:
# Reset consecutive errors on success
analysis['consecutive_errors'] = 0
last_tool_was_error = False
# Detect patterns
analysis['had_repeated_errors'] = analysis['max_consecutive_errors'] >= 3
analysis['had_infinite_loop_risk'] = (
analysis['max_consecutive_errors'] >= 5 or
analysis['total_api_calls'] > 50
)
return analysis
def _detect_error(self, content: str) -> bool:
"""Detect if tool result contains an error."""
if not content:
return False
content_lower = content.lower()
# Check for explicit error indicators
error_indicators = [
'"error":', '"error" :', 'error:', 'exception:',
'"exit_code": 1', '"exit_code": 2', '"exit_code": -1',
'traceback', 'failed', 'failure',
]
for indicator in error_indicators:
if indicator in content_lower:
return True
return False
def _classify_error(self, content: str) -> str:
"""Classify the type of error."""
content_lower = content.lower()
for error_type, pattern in self.ERROR_PATTERNS.items():
if pattern.search(content_lower):
return error_type
return 'unknown'
def _assess_task_completion(self, messages: List[Dict], analysis: Dict) -> bool:
"""Assess whether the task was likely completed."""
if not messages:
return False
# Check final assistant message
final_msg = analysis.get('final_assistant_msg')
if not final_msg:
return False
content = final_msg.get('content', '')
# Positive completion indicators
completion_phrases = [
'done', 'completed', 'success', 'finished', 'created',
'implemented', 'fixed', 'resolved', 'saved to', 'here is',
'here are', 'the result', 'output:', 'file:', 'pr:', 'pull request'
]
for phrase in completion_phrases:
if phrase in content.lower():
return True
# Check if there were many errors
if analysis['total_errors'] > analysis['total_api_calls'] * 0.3:
return False
# Check for explicit failure
failure_phrases = ['failed', 'unable to', 'could not', 'error:', 'sorry, i cannot']
for phrase in failure_phrases:
if phrase in content.lower()[:200]:
return False
return True
def _assess_tool_efficiency(self, analysis: Dict) -> int:
"""Rate tool call efficiency on 1-5 scale."""
tool_calls = analysis['total_api_calls']
errors = analysis['total_errors']
if tool_calls == 0:
return 3 # Neutral if no tool calls
error_rate = errors / tool_calls
# Score based on error rate and total calls
if error_rate == 0 and tool_calls <= 10:
return 5 # Perfect efficiency
elif error_rate <= 0.1 and tool_calls <= 15:
return 4 # Good efficiency
elif error_rate <= 0.25 and tool_calls <= 25:
return 3 # Average
elif error_rate <= 0.4:
return 2 # Poor
else:
return 1 # Very poor
def _assess_response_quality(self, messages: List[Dict], analysis: Dict) -> int:
"""Rate response quality on 1-5 scale."""
final_msg = analysis.get('final_assistant_msg')
if not final_msg:
return 1
content = final_msg.get('content', '')
content_len = len(content)
# Quality indicators
score = 3 # Start at average
# Length heuristics
if content_len > 500:
score += 1
if content_len > 1000:
score += 1
# Code blocks indicate substantive response
if '```' in content:
score += 1
# Links/references indicate thoroughness
if 'http' in content or 'see ' in content.lower():
score += 0.5
# Error penalties
if analysis['had_repeated_errors']:
score -= 1
if analysis['total_errors'] > 5:
score -= 1
# Loop risk is severe
if analysis['had_infinite_loop_risk']:
score -= 2
return max(1, min(5, int(score)))
def _assess_error_recovery(self, messages: List[Dict], analysis: Dict) -> bool:
"""Assess whether errors were successfully recovered from."""
if analysis['total_errors'] == 0:
return True # No errors to recover from
# If task completed despite errors, recovered
if self._assess_task_completion(messages, analysis):
return True
# If no repeated errors, likely recovered
if not analysis['had_repeated_errors']:
return True
return False
def save_grade(self, grade: SessionGrade) -> bool:
"""Save a grade to the database."""
try:
with sqlite3.connect(self.grades_db_path) as conn:
conn.execute("""
INSERT OR REPLACE INTO session_grades (
session_id, session_file, graded_at,
task_completed, tool_calls_efficient, response_quality,
errors_recovered, total_api_calls, model, platform,
session_start, duration_seconds, task_summary,
total_errors, error_types, tools_with_errors,
had_repeated_errors, had_infinite_loop_risk, had_user_clarification
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
grade.session_id, grade.session_file, grade.graded_at,
int(grade.task_completed), grade.tool_calls_efficient,
grade.response_quality, int(grade.errors_recovered),
grade.total_api_calls, grade.model, grade.platform,
grade.session_start, grade.duration_seconds, grade.task_summary,
grade.total_errors, grade.error_types, grade.tools_with_errors,
int(grade.had_repeated_errors), int(grade.had_infinite_loop_risk),
int(grade.had_user_clarification)
))
conn.commit()
return True
except sqlite3.Error as e:
print(f"Database error saving grade: {e}")
return False
def grade_latest_sessions(self, n: int = 10) -> List[SessionGrade]:
"""Grade the last N ungraded sessions."""
# Get recent session files
session_files = sorted(
[f for f in self.sessions_dir.glob("session_*.json")
if not f.name.endswith("sessions.json")],
key=lambda x: x.stat().st_mtime,
reverse=True
)
# Get already graded sessions
graded_ids = set()
try:
with sqlite3.connect(self.grades_db_path) as conn:
cursor = conn.execute("SELECT session_id FROM session_grades")
graded_ids = {row[0] for row in cursor.fetchall()}
except sqlite3.Error:
pass
# Grade ungraded sessions
grades = []
for sf in session_files[:n]:
# Extract session ID from filename
session_id = sf.stem.replace('session_', '')
if session_id in graded_ids:
continue
grade = self.grade_session_file(sf)
if grade:
if self.save_grade(grade):
grades.append(grade)
return grades
def identify_patterns(self, days: int = 7) -> Dict[str, Any]:
"""Identify patterns in recent graded sessions."""
since = (datetime.now() - timedelta(days=days)).isoformat()
with sqlite3.connect(self.grades_db_path) as conn:
# Overall stats
cursor = conn.execute("""
SELECT
COUNT(*),
AVG(tool_calls_efficient),
AVG(response_quality),
AVG(CASE WHEN task_completed THEN 1.0 ELSE 0.0 END),
AVG(CASE WHEN errors_recovered THEN 1.0 ELSE 0.0 END)
FROM session_grades
WHERE graded_at > ?
""", (since,))
row = cursor.fetchone()
stats = {
'total_sessions': row[0] or 0,
'avg_tool_efficiency': round(row[1] or 0, 2),
'avg_response_quality': round(row[2] or 0, 2),
'completion_rate': round((row[3] or 0) * 100, 1),
'error_recovery_rate': round((row[4] or 0) * 100, 1),
}
# Tool error analysis
cursor = conn.execute("""
SELECT tools_with_errors, COUNT(*)
FROM session_grades
WHERE graded_at > ? AND tools_with_errors != '[]'
GROUP BY tools_with_errors
""", (since,))
tool_errors = defaultdict(int)
for row in cursor.fetchall():
tools = json.loads(row[0])
for tool in tools:
tool_errors[tool] += row[1]
# Error type analysis
cursor = conn.execute("""
SELECT error_types, COUNT(*)
FROM session_grades
WHERE graded_at > ? AND error_types != '[]'
GROUP BY error_types
""", (since,))
error_types = defaultdict(int)
for row in cursor.fetchall():
types = json.loads(row[0])
for et in types:
error_types[et] += row[1]
# Task type performance (infer from task_summary)
cursor = conn.execute("""
SELECT task_summary, response_quality
FROM session_grades
WHERE graded_at > ?
""", (since,))
task_scores = defaultdict(list)
for row in cursor.fetchall():
summary = row[0] or ''
score = row[1]
task_type = self._infer_task_type(summary)
task_scores[task_type].append(score)
avg_task_scores = {
tt: round(sum(scores) / len(scores), 2)
for tt, scores in task_scores.items()
}
return {
**stats,
'tool_error_counts': dict(tool_errors),
'error_type_counts': dict(error_types),
'task_type_scores': avg_task_scores,
}
def _infer_task_type(self, summary: str) -> str:
"""Infer task type from summary text."""
for task_type, pattern in self.TASK_PATTERNS.items():
if pattern.search(summary):
return task_type
return 'general'
def generate_weekly_report(self) -> WeeklyReport:
"""Generate a weekly improvement report."""
# Calculate week boundaries (Monday to Sunday)
today = datetime.now()
monday = today - timedelta(days=today.weekday())
sunday = monday + timedelta(days=6)
patterns = self.identify_patterns(days=7)
# Find worst task types
task_scores = patterns.get('task_type_scores', {})
worst_tasks = sorted(task_scores.items(), key=lambda x: x[1])[:3]
# Find most error-prone tools
tool_errors = patterns.get('tool_error_counts', {})
worst_tools = sorted(tool_errors.items(), key=lambda x: x[1], reverse=True)[:3]
# Find common error patterns
error_types = patterns.get('error_type_counts', {})
common_errors = sorted(error_types.items(), key=lambda x: x[1], reverse=True)[:3]
# Generate suggestions
suggestions = self._generate_suggestions(patterns, worst_tasks, worst_tools, common_errors)
report = WeeklyReport(
week_start=monday.strftime('%Y-%m-%d'),
week_end=sunday.strftime('%Y-%m-%d'),
total_sessions=patterns['total_sessions'],
avg_tool_efficiency=patterns['avg_tool_efficiency'],
avg_response_quality=patterns['avg_response_quality'],
completion_rate=patterns['completion_rate'],
error_recovery_rate=patterns['error_recovery_rate'],
worst_task_types=worst_tasks,
most_error_prone_tools=worst_tools,
common_error_patterns=common_errors,
improvement_suggestions=suggestions
)
# Save report
with sqlite3.connect(self.grades_db_path) as conn:
conn.execute("""
INSERT OR REPLACE INTO weekly_reports
(week_start, week_end, generated_at, report_json)
VALUES (?, ?, ?, ?)
""", (
report.week_start,
report.week_end,
datetime.now().isoformat(),
json.dumps(asdict(report))
))
conn.commit()
return report
def _generate_suggestions(self, patterns: Dict, worst_tasks: List,
worst_tools: List, common_errors: List) -> List[str]:
"""Generate improvement suggestions based on patterns."""
suggestions = []
if patterns['completion_rate'] < 70:
suggestions.append("Task completion rate is below 70%. Consider adding pre-task planning steps.")
if patterns['avg_tool_efficiency'] < 3:
suggestions.append("Tool efficiency is low. Review error recovery patterns and add retry logic.")
if worst_tasks:
task_names = ', '.join([t[0] for t in worst_tasks])
suggestions.append(f"Lowest scoring task types: {task_names}. Consider skill enhancement.")
if worst_tools:
tool_names = ', '.join([t[0] for t in worst_tools])
suggestions.append(f"Most error-prone tools: {tool_names}. Review usage patterns.")
if common_errors:
error_names = ', '.join([e[0] for e in common_errors])
suggestions.append(f"Common error types: {error_names}. Add targeted error handling.")
if patterns['error_recovery_rate'] < 80:
suggestions.append("Error recovery rate needs improvement. Implement better fallback strategies.")
if not suggestions:
suggestions.append("Performance is stable. Focus on expanding task coverage.")
return suggestions
def get_grades_summary(self, days: int = 30) -> str:
"""Get a human-readable summary of recent grades."""
patterns = self.identify_patterns(days=days)
lines = [
f"=== Session Grades Summary (Last {days} days) ===",
"",
f"Total Sessions Graded: {patterns['total_sessions']}",
f"Average Tool Efficiency: {patterns['avg_tool_efficiency']}/5",
f"Average Response Quality: {patterns['avg_response_quality']}/5",
f"Task Completion Rate: {patterns['completion_rate']}%",
f"Error Recovery Rate: {patterns['error_recovery_rate']}%",
"",
]
if patterns.get('task_type_scores'):
lines.append("Task Type Performance:")
for task, score in sorted(patterns['task_type_scores'].items(), key=lambda x: -x[1]):
lines.append(f" - {task}: {score}/5")
lines.append("")
if patterns.get('tool_error_counts'):
lines.append("Tool Error Counts:")
for tool, count in sorted(patterns['tool_error_counts'].items(), key=lambda x: -x[1]):
lines.append(f" - {tool}: {count}")
lines.append("")
return '\n'.join(lines)
def main():
"""CLI entry point for self-grading."""
import argparse
parser = argparse.ArgumentParser(description='Grade Hermes sessions')
parser.add_argument('--grade-latest', '-g', type=int, metavar='N',
help='Grade the last N ungraded sessions')
parser.add_argument('--summary', '-s', action='store_true',
help='Show summary of recent grades')
parser.add_argument('--days', '-d', type=int, default=7,
help='Number of days for summary (default: 7)')
parser.add_argument('--report', '-r', action='store_true',
help='Generate weekly report')
parser.add_argument('--file', '-f', type=Path,
help='Grade a specific session file')
args = parser.parse_args()
grader = SelfGrader()
if args.file:
grade = grader.grade_session_file(args.file)
if grade:
grader.save_grade(grade)
print(f"Graded session: {grade.session_id}")
print(f" Task completed: {grade.task_completed}")
print(f" Tool efficiency: {grade.tool_calls_efficient}/5")
print(f" Response quality: {grade.response_quality}/5")
print(f" Errors recovered: {grade.errors_recovered}")
else:
print("Failed to grade session")
elif args.grade_latest:
grades = grader.grade_latest_sessions(args.grade_latest)
print(f"Graded {len(grades)} sessions")
for g in grades:
print(f" - {g.session_id}: quality={g.response_quality}/5, "
f"completed={g.task_completed}")
elif args.report:
report = grader.generate_weekly_report()
print(f"\n=== Weekly Report ({report.week_start} to {report.week_end}) ===")
print(f"Total Sessions: {report.total_sessions}")
print(f"Avg Tool Efficiency: {report.avg_tool_efficiency}/5")
print(f"Avg Response Quality: {report.avg_response_quality}/5")
print(f"Completion Rate: {report.completion_rate}%")
print(f"Error Recovery Rate: {report.error_recovery_rate}%")
print("\nSuggestions:")
for s in report.improvement_suggestions:
print(f" - {s}")
else:
print(grader.get_grades_summary(days=args.days))
if __name__ == '__main__':
main()