Files
timmy-home/uniwizard/quality_scorer.py

643 lines
21 KiB
Python

"""
Uniwizard Backend Quality Scorer
Tracks per-backend performance metrics and provides intelligent routing recommendations.
Uses a rolling window of last 100 responses per backend across 5 task types.
"""
import sqlite3
import json
import time
from dataclasses import dataclass, asdict
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Optional, List, Dict, Tuple
from contextlib import contextmanager
class TaskType(Enum):
"""Task types for backend specialization tracking."""
CODE = "code"
REASONING = "reasoning"
RESEARCH = "research"
CREATIVE = "creative"
FAST_OPS = "fast_ops"
class ResponseStatus(Enum):
"""Status of a backend response."""
SUCCESS = "success"
ERROR = "error"
REFUSAL = "refusal"
TIMEOUT = "timeout"
# The 7 Uniwizard backends
BACKENDS = [
"anthropic",
"openai-codex",
"gemini",
"groq",
"grok",
"kimi-coding",
"openrouter",
]
# Default DB path
DEFAULT_DB_PATH = Path.home() / ".timmy" / "uniwizard" / "quality_scores.db"
@dataclass
class BackendScore:
"""Aggregated score card for a backend on a specific task type."""
backend: str
task_type: str
total_requests: int
success_count: int
error_count: int
refusal_count: int
timeout_count: int
avg_latency_ms: float
avg_ttft_ms: float
p95_latency_ms: float
score: float # Composite quality score (0-100)
@dataclass
class ResponseRecord:
"""Single response record for storage."""
id: Optional[int]
backend: str
task_type: str
status: str
latency_ms: float
ttft_ms: float # Time to first token
timestamp: float
metadata: Optional[str] # JSON string for extensibility
class QualityScorer:
"""
Tracks backend quality metrics with rolling windows.
Stores per-response data in SQLite, computes aggregated scores
on-demand for routing decisions.
"""
ROLLING_WINDOW_SIZE = 100
# Score weights for composite calculation
WEIGHTS = {
"success_rate": 0.35,
"low_error_rate": 0.20,
"low_refusal_rate": 0.15,
"low_timeout_rate": 0.10,
"low_latency": 0.20,
}
def __init__(self, db_path: Optional[Path] = None):
self.db_path = Path(db_path) if db_path else DEFAULT_DB_PATH
self._init_db()
@contextmanager
def _get_conn(self):
"""Get a database connection with proper cleanup."""
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
def _init_db(self):
"""Initialize the SQLite database schema."""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
with self._get_conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS responses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
backend TEXT NOT NULL,
task_type TEXT NOT NULL,
status TEXT NOT NULL,
latency_ms REAL NOT NULL,
ttft_ms REAL NOT NULL,
timestamp REAL NOT NULL,
metadata TEXT
)
""")
# Index for fast rolling window queries
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_backend_task_time
ON responses(backend, task_type, timestamp DESC)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_backend_time
ON responses(backend, timestamp DESC)
""")
def record_response(
self,
backend: str,
task_type: str,
status: ResponseStatus,
latency_ms: float,
ttft_ms: float,
metadata: Optional[Dict] = None
) -> None:
"""
Record a response from a backend.
Args:
backend: Backend name (must be in BACKENDS)
task_type: Task type string or TaskType enum
status: ResponseStatus (success/error/refusal/timeout)
latency_ms: Total response latency in milliseconds
ttft_ms: Time to first token in milliseconds
metadata: Optional dict with additional context
"""
if backend not in BACKENDS:
raise ValueError(f"Unknown backend: {backend}. Must be one of: {BACKENDS}")
task_str = task_type.value if isinstance(task_type, TaskType) else task_type
with self._get_conn() as conn:
conn.execute("""
INSERT INTO responses (backend, task_type, status, latency_ms, ttft_ms, timestamp, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
backend,
task_str,
status.value,
latency_ms,
ttft_ms,
time.time(),
json.dumps(metadata) if metadata else None
))
# Prune old records to maintain rolling window
self._prune_rolling_window(conn, backend, task_str)
def _prune_rolling_window(self, conn: sqlite3.Connection, backend: str, task_type: str) -> None:
"""Remove records beyond the rolling window size for this backend/task combo."""
# Get IDs to keep (most recent ROLLING_WINDOW_SIZE)
cursor = conn.execute("""
SELECT id FROM responses
WHERE backend = ? AND task_type = ?
ORDER BY timestamp DESC
LIMIT ? OFFSET ?
""", (backend, task_type, self.ROLLING_WINDOW_SIZE, self.ROLLING_WINDOW_SIZE))
ids_to_delete = [row[0] for row in cursor.fetchall()]
if ids_to_delete:
placeholders = ','.join('?' * len(ids_to_delete))
conn.execute(f"""
DELETE FROM responses
WHERE id IN ({placeholders})
""", ids_to_delete)
def get_backend_score(
self,
backend: str,
task_type: Optional[str] = None
) -> BackendScore:
"""
Get aggregated score for a backend, optionally filtered by task type.
Args:
backend: Backend name
task_type: Optional task type filter
Returns:
BackendScore with aggregated metrics
"""
if backend not in BACKENDS:
raise ValueError(f"Unknown backend: {backend}")
with self._get_conn() as conn:
if task_type:
row = conn.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successes,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as errors,
SUM(CASE WHEN status = 'refusal' THEN 1 ELSE 0 END) as refusals,
SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END) as timeouts,
AVG(latency_ms) as avg_latency,
AVG(ttft_ms) as avg_ttft,
MAX(latency_ms) as max_latency
FROM (
SELECT * FROM responses
WHERE backend = ? AND task_type = ?
ORDER BY timestamp DESC
LIMIT ?
)
""", (backend, task_type, self.ROLLING_WINDOW_SIZE)).fetchone()
else:
row = conn.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successes,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as errors,
SUM(CASE WHEN status = 'refusal' THEN 1 ELSE 0 END) as refusals,
SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END) as timeouts,
AVG(latency_ms) as avg_latency,
AVG(ttft_ms) as avg_ttft,
MAX(latency_ms) as max_latency
FROM (
SELECT * FROM responses
WHERE backend = ?
ORDER BY timestamp DESC
LIMIT ?
)
""", (backend, self.ROLLING_WINDOW_SIZE)).fetchone()
total = row[0] or 0
if total == 0:
return BackendScore(
backend=backend,
task_type=task_type or "all",
total_requests=0,
success_count=0,
error_count=0,
refusal_count=0,
timeout_count=0,
avg_latency_ms=0.0,
avg_ttft_ms=0.0,
p95_latency_ms=0.0,
score=0.0
)
successes = row[1] or 0
errors = row[2] or 0
refusals = row[3] or 0
timeouts = row[4] or 0
avg_latency = row[5] or 0.0
avg_ttft = row[6] or 0.0
# Calculate P95 latency
p95 = self._get_p95_latency(conn, backend, task_type)
# Calculate composite score
score = self._calculate_score(
total, successes, errors, refusals, timeouts, avg_latency
)
return BackendScore(
backend=backend,
task_type=task_type or "all",
total_requests=total,
success_count=successes,
error_count=errors,
refusal_count=refusals,
timeout_count=timeouts,
avg_latency_ms=round(avg_latency, 2),
avg_ttft_ms=round(avg_ttft, 2),
p95_latency_ms=round(p95, 2),
score=round(score, 2)
)
def _get_p95_latency(
self,
conn: sqlite3.Connection,
backend: str,
task_type: Optional[str]
) -> float:
"""Calculate P95 latency from rolling window."""
if task_type:
row = conn.execute("""
SELECT latency_ms FROM responses
WHERE backend = ? AND task_type = ?
ORDER BY timestamp DESC
LIMIT ?
""", (backend, task_type, self.ROLLING_WINDOW_SIZE)).fetchall()
else:
row = conn.execute("""
SELECT latency_ms FROM responses
WHERE backend = ?
ORDER BY timestamp DESC
LIMIT ?
""", (backend, self.ROLLING_WINDOW_SIZE)).fetchall()
if not row:
return 0.0
latencies = sorted([r[0] for r in row])
idx = int(len(latencies) * 0.95)
return latencies[min(idx, len(latencies) - 1)]
def _calculate_score(
self,
total: int,
successes: int,
errors: int,
refusals: int,
timeouts: int,
avg_latency: float
) -> float:
"""
Calculate composite quality score (0-100).
Higher is better. Considers success rate, error/refusal/timeout rates,
and normalized latency.
"""
if total == 0:
return 0.0
success_rate = successes / total
error_rate = errors / total
refusal_rate = refusals / total
timeout_rate = timeouts / total
# Normalize latency: assume 5000ms is "bad" (score 0), 100ms is "good" (score 1)
# Using exponential decay for latency scoring
latency_score = max(0, min(1, 1 - (avg_latency / 10000)))
score = (
self.WEIGHTS["success_rate"] * success_rate * 100 +
self.WEIGHTS["low_error_rate"] * (1 - error_rate) * 100 +
self.WEIGHTS["low_refusal_rate"] * (1 - refusal_rate) * 100 +
self.WEIGHTS["low_timeout_rate"] * (1 - timeout_rate) * 100 +
self.WEIGHTS["low_latency"] * latency_score * 100
)
return max(0, min(100, score))
def recommend_backend(
self,
task_type: Optional[str] = None,
min_samples: int = 5
) -> List[Tuple[str, float]]:
"""
Get ranked list of backends for a task type.
Args:
task_type: Optional task type to specialize for
min_samples: Minimum samples before considering a backend
Returns:
List of (backend_name, score) tuples, sorted by score descending
"""
scores = []
for backend in BACKENDS:
score_card = self.get_backend_score(backend, task_type)
# Require minimum samples for confident recommendations
if score_card.total_requests < min_samples:
# Penalize low-sample backends but still include them
adjusted_score = score_card.score * (score_card.total_requests / min_samples)
else:
adjusted_score = score_card.score
scores.append((backend, round(adjusted_score, 2)))
# Sort by score descending
scores.sort(key=lambda x: x[1], reverse=True)
return scores
def get_all_scores(
self,
task_type: Optional[str] = None
) -> Dict[str, BackendScore]:
"""Get score cards for all backends."""
return {
backend: self.get_backend_score(backend, task_type)
for backend in BACKENDS
}
def get_task_breakdown(self, backend: str) -> Dict[str, BackendScore]:
"""Get per-task-type scores for a single backend."""
if backend not in BACKENDS:
raise ValueError(f"Unknown backend: {backend}")
return {
task.value: self.get_backend_score(backend, task.value)
for task in TaskType
}
def get_stats(self) -> Dict:
"""Get overall database statistics."""
with self._get_conn() as conn:
total = conn.execute("SELECT COUNT(*) FROM responses").fetchone()[0]
by_backend = {}
for backend in BACKENDS:
count = conn.execute(
"SELECT COUNT(*) FROM responses WHERE backend = ?",
(backend,)
).fetchone()[0]
by_backend[backend] = count
by_task = {}
for task in TaskType:
count = conn.execute(
"SELECT COUNT(*) FROM responses WHERE task_type = ?",
(task.value,)
).fetchone()[0]
by_task[task.value] = count
oldest = conn.execute(
"SELECT MIN(timestamp) FROM responses"
).fetchone()[0]
newest = conn.execute(
"SELECT MAX(timestamp) FROM responses"
).fetchone()[0]
return {
"total_records": total,
"by_backend": by_backend,
"by_task_type": by_task,
"oldest_record": datetime.fromtimestamp(oldest).isoformat() if oldest else None,
"newest_record": datetime.fromtimestamp(newest).isoformat() if newest else None,
}
def clear_data(self) -> None:
"""Clear all recorded data (useful for testing)."""
with self._get_conn() as conn:
conn.execute("DELETE FROM responses")
def print_score_report(scorer: QualityScorer, task_type: Optional[str] = None) -> None:
"""
Print a formatted score report to console.
Args:
scorer: QualityScorer instance
task_type: Optional task type filter
"""
print("\n" + "=" * 80)
print(" UNIWIZARD BACKEND QUALITY SCORES")
print("=" * 80)
if task_type:
print(f"\n Task Type: {task_type.upper()}")
else:
print("\n Overall Performance (all task types)")
print("-" * 80)
scores = scorer.recommend_backend(task_type)
all_scores = scorer.get_all_scores(task_type)
# Header
print(f"\n {'Rank':<6} {'Backend':<16} {'Score':<8} {'Success':<10} {'Latency':<12} {'Samples':<8}")
print(" " + "-" * 72)
# Rankings
for rank, (backend, score) in enumerate(scores, 1):
card = all_scores[backend]
success_pct = (card.success_count / card.total_requests * 100) if card.total_requests > 0 else 0
bar_len = int(score / 5) # 20 chars = 100
bar = "" * bar_len + "" * (20 - bar_len)
print(f" {rank:<6} {backend:<16} {score:>6.1f} {success_pct:>6.1f}% {card.avg_latency_ms:>7.1f}ms {card.total_requests:>6}")
print(f" [{bar}]")
# Per-backend breakdown
print("\n" + "-" * 80)
print(" DETAILED BREAKDOWN")
print("-" * 80)
for backend in BACKENDS:
card = all_scores[backend]
if card.total_requests == 0:
print(f"\n {backend}: No data yet")
continue
print(f"\n {backend.upper()}:")
print(f" Requests: {card.total_requests} | "
f"Success: {card.success_count} | "
f"Errors: {card.error_count} | "
f"Refusals: {card.refusal_count} | "
f"Timeouts: {card.timeout_count}")
print(f" Avg Latency: {card.avg_latency_ms}ms | "
f"TTFT: {card.avg_ttft_ms}ms | "
f"P95: {card.p95_latency_ms}ms")
print(f" Quality Score: {card.score}/100")
# Recommendations
print("\n" + "=" * 80)
print(" RECOMMENDATIONS")
print("=" * 80)
recommendations = scorer.recommend_backend(task_type)
top_3 = [b for b, s in recommendations[:3] if s > 0]
if top_3:
print(f"\n Best backends{f' for {task_type}' if task_type else ''}:")
for i, backend in enumerate(top_3, 1):
score = next(s for b, s in recommendations if b == backend)
print(f" {i}. {backend} (score: {score})")
else:
print("\n Not enough data for recommendations yet.")
print("\n" + "=" * 80)
def print_full_report(scorer: QualityScorer) -> None:
"""Print a complete report including per-task-type breakdowns."""
print("\n" + "=" * 80)
print(" UNIWIZARD BACKEND QUALITY SCORECARD")
print("=" * 80)
stats = scorer.get_stats()
print(f"\n Database: {scorer.db_path}")
print(f" Total Records: {stats['total_records']}")
print(f" Date Range: {stats['oldest_record'] or 'N/A'} to {stats['newest_record'] or 'N/A'}")
# Overall scores
print_score_report(scorer)
# Per-task breakdown
print("\n" + "=" * 80)
print(" PER-TASK SPECIALIZATION")
print("=" * 80)
for task in TaskType:
print(f"\n{'' * 80}")
scores = scorer.recommend_backend(task.value)
print(f"\n {task.value.upper()}:")
for rank, (backend, score) in enumerate(scores[:3], 1):
if score > 0:
print(f" {rank}. {backend}: {score}")
print("\n" + "=" * 80)
# Convenience functions for CLI usage
def get_scorer(db_path: Optional[Path] = None) -> QualityScorer:
"""Get or create a QualityScorer instance."""
return QualityScorer(db_path)
def record(
backend: str,
task_type: str,
status: str,
latency_ms: float,
ttft_ms: float = 0.0,
metadata: Optional[Dict] = None
) -> None:
"""Convenience function to record a response."""
scorer = get_scorer()
scorer.record_response(
backend=backend,
task_type=task_type,
status=ResponseStatus(status),
latency_ms=latency_ms,
ttft_ms=ttft_ms,
metadata=metadata
)
def recommend(task_type: Optional[str] = None) -> List[Tuple[str, float]]:
"""Convenience function to get recommendations."""
scorer = get_scorer()
return scorer.recommend_backend(task_type)
def report(task_type: Optional[str] = None) -> None:
"""Convenience function to print report."""
scorer = get_scorer()
print_score_report(scorer, task_type)
def full_report() -> None:
"""Convenience function to print full report."""
scorer = get_scorer()
print_full_report(scorer)
if __name__ == "__main__":
# Demo mode - show empty report structure
scorer = QualityScorer()
# Add some demo data if empty
stats = scorer.get_stats()
if stats["total_records"] == 0:
print("Generating demo data...")
import random
for _ in range(50):
scorer.record_response(
backend=random.choice(BACKENDS),
task_type=random.choice([t.value for t in TaskType]),
status=random.choices(
[ResponseStatus.SUCCESS, ResponseStatus.ERROR, ResponseStatus.REFUSAL, ResponseStatus.TIMEOUT],
weights=[0.85, 0.08, 0.05, 0.02]
)[0],
latency_ms=random.gauss(1500, 500),
ttft_ms=random.gauss(200, 100)
)
full_report()