Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| bdd0f2709b | |||
| c6f2855745 |
146
agent/provider_preflight.py
Normal file
146
agent/provider_preflight.py
Normal file
@@ -0,0 +1,146 @@
|
||||
"""Provider Preflight — Poka-yoke validation of provider/model config.
|
||||
|
||||
Validates provider and model configuration before session start.
|
||||
Prevents wasted context on misconfigured providers.
|
||||
|
||||
Usage:
|
||||
from agent.provider_preflight import preflight_check
|
||||
result = preflight_check(provider="openrouter", model="xiaomi/mimo-v2-pro")
|
||||
if not result["valid"]:
|
||||
print(result["error"])
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Provider -> required env var
|
||||
PROVIDER_KEYS = {
|
||||
"openrouter": "OPENROUTER_API_KEY",
|
||||
"anthropic": "ANTHROPIC_API_KEY",
|
||||
"openai": "OPENAI_API_KEY",
|
||||
"nous": "NOUS_API_KEY",
|
||||
"ollama": None, # Local, no key needed
|
||||
"local": None,
|
||||
}
|
||||
|
||||
|
||||
def check_provider_key(provider: str) -> Dict[str, Any]:
|
||||
"""Check if provider has a valid API key configured."""
|
||||
provider_lower = provider.lower().strip()
|
||||
|
||||
env_var = None
|
||||
for known, key in PROVIDER_KEYS.items():
|
||||
if known in provider_lower:
|
||||
env_var = key
|
||||
break
|
||||
|
||||
if env_var is None:
|
||||
# Unknown provider — assume OK (custom/local)
|
||||
return {"valid": True, "provider": provider, "key_status": "unknown"}
|
||||
|
||||
if env_var is None:
|
||||
# Local provider, no key needed
|
||||
return {"valid": True, "provider": provider, "key_status": "not_required"}
|
||||
|
||||
key_value = os.getenv(env_var, "").strip()
|
||||
if not key_value:
|
||||
return {
|
||||
"valid": False,
|
||||
"provider": provider,
|
||||
"key_status": "missing",
|
||||
"error": f"{env_var} is not set. Provider '{provider}' will fail.",
|
||||
"fix": f"Set {env_var} in ~/.hermes/.env",
|
||||
}
|
||||
|
||||
if len(key_value) < 10:
|
||||
return {
|
||||
"valid": False,
|
||||
"provider": provider,
|
||||
"key_status": "too_short",
|
||||
"error": f"{env_var} is suspiciously short ({len(key_value)} chars). May be invalid.",
|
||||
"fix": f"Verify {env_var} value in ~/.hermes/.env",
|
||||
}
|
||||
|
||||
return {"valid": True, "provider": provider, "key_status": "set"}
|
||||
|
||||
|
||||
def check_model_availability(model: str, provider: str) -> Dict[str, Any]:
|
||||
"""Check if model is likely available for provider."""
|
||||
if not model:
|
||||
return {"valid": False, "error": "No model specified"}
|
||||
|
||||
# Basic sanity checks
|
||||
model_lower = model.lower()
|
||||
|
||||
# Anthropic models should use anthropic provider
|
||||
if "claude" in model_lower and "anthropic" not in provider.lower():
|
||||
return {
|
||||
"valid": True, # Allow but warn
|
||||
"warning": f"Model '{model}' usually runs on Anthropic provider, not '{provider}'",
|
||||
}
|
||||
|
||||
# Ollama models
|
||||
ollama_indicators = ["llama", "mistral", "qwen", "gemma", "phi", "hermes"]
|
||||
if any(x in model_lower for x in ollama_indicators) and ":" not in model:
|
||||
return {
|
||||
"valid": True,
|
||||
"warning": f"Model '{model}' may need a version tag for Ollama (e.g., {model}:latest)",
|
||||
}
|
||||
|
||||
return {"valid": True}
|
||||
|
||||
|
||||
def preflight_check(
|
||||
provider: str = "",
|
||||
model: str = "",
|
||||
fallback_provider: str = "",
|
||||
fallback_model: str = "",
|
||||
) -> Dict[str, Any]:
|
||||
"""Full pre-flight check for provider/model configuration.
|
||||
|
||||
Returns:
|
||||
Dict with valid (bool), errors (list), warnings (list).
|
||||
"""
|
||||
errors = []
|
||||
warnings = []
|
||||
|
||||
# Check primary provider
|
||||
if provider:
|
||||
result = check_provider_key(provider)
|
||||
if not result["valid"]:
|
||||
errors.append(result.get("error", f"Provider {provider} invalid"))
|
||||
|
||||
# Check primary model
|
||||
if model:
|
||||
result = check_model_availability(model, provider)
|
||||
if not result["valid"]:
|
||||
errors.append(result.get("error", f"Model {model} invalid"))
|
||||
elif result.get("warning"):
|
||||
warnings.append(result["warning"])
|
||||
|
||||
# Check fallback
|
||||
if fallback_provider:
|
||||
result = check_provider_key(fallback_provider)
|
||||
if not result["valid"]:
|
||||
warnings.append(f"Fallback provider {fallback_provider} also invalid: {result.get('error','')}")
|
||||
|
||||
if fallback_model:
|
||||
result = check_model_availability(fallback_model, fallback_provider)
|
||||
if not result["valid"]:
|
||||
warnings.append(f"Fallback model {fallback_model} invalid")
|
||||
elif result.get("warning"):
|
||||
warnings.append(result["warning"])
|
||||
|
||||
return {
|
||||
"valid": len(errors) == 0,
|
||||
"errors": errors,
|
||||
"warnings": warnings,
|
||||
"provider": provider,
|
||||
"model": model,
|
||||
}
|
||||
@@ -1,146 +0,0 @@
|
||||
"""Time-aware model routing for cron jobs.
|
||||
|
||||
Routes cron tasks to more capable models during off-hours when the user
|
||||
is not present to correct errors. Reduces error rates during high-error
|
||||
time windows (e.g., 18:00 evening batches).
|
||||
|
||||
Usage:
|
||||
from agent.time_aware_routing import resolve_time_aware_model
|
||||
model = resolve_time_aware_model(base_model="mimo-v2-pro", is_cron=True)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, Optional
|
||||
|
||||
|
||||
# Error rate data from empirical audit (2026-04-12)
|
||||
# Higher error rates during these hours suggest routing to better models
|
||||
_HIGH_ERROR_HOURS = {
|
||||
18: 9.4, # 18:00 — 9.4% error rate (evening cron batches)
|
||||
19: 8.1,
|
||||
20: 7.5,
|
||||
21: 6.8,
|
||||
22: 6.2,
|
||||
23: 5.9,
|
||||
0: 5.5,
|
||||
1: 5.2,
|
||||
}
|
||||
|
||||
# Low error hours — default model is fine
|
||||
_LOW_ERROR_HOURS = set(range(6, 18)) # 06:00-17:59
|
||||
|
||||
# Default fallback models by time zone
|
||||
_DEFAULT_STRONG_MODEL = os.getenv("CRON_STRONG_MODEL", "xiaomi/mimo-v2-pro")
|
||||
_DEFAULT_CHEAP_MODEL = os.getenv("CRON_CHEAP_MODEL", "qwen2.5:7b")
|
||||
_ERROR_THRESHOLD = float(os.getenv("CRON_ERROR_THRESHOLD", "6.0")) # % error rate
|
||||
|
||||
|
||||
@dataclass
|
||||
class RoutingDecision:
|
||||
"""Result of time-aware routing."""
|
||||
model: str
|
||||
provider: str
|
||||
reason: str
|
||||
hour: int
|
||||
error_rate: float
|
||||
is_off_hours: bool
|
||||
|
||||
|
||||
def get_hour_error_rate(hour: int) -> float:
|
||||
"""Get expected error rate for a given hour (0-23)."""
|
||||
return _HIGH_ERROR_HOURS.get(hour, 4.0) # Default 4% for unlisted hours
|
||||
|
||||
|
||||
def is_off_hours(hour: int) -> bool:
|
||||
"""Check if hour is considered off-hours (higher error rates)."""
|
||||
return hour not in _LOW_ERROR_HOURS
|
||||
|
||||
|
||||
def resolve_time_aware_model(
|
||||
base_model: str = "",
|
||||
base_provider: str = "",
|
||||
is_cron: bool = False,
|
||||
hour: Optional[int] = None,
|
||||
) -> RoutingDecision:
|
||||
"""Resolve model based on time of day and task type.
|
||||
|
||||
During off-hours (evening/night), routes to stronger models for cron
|
||||
jobs to compensate for lack of human oversight.
|
||||
|
||||
Args:
|
||||
base_model: The model that would normally be used.
|
||||
base_provider: The provider for the base model.
|
||||
is_cron: Whether this is a cron job (vs interactive session).
|
||||
hour: Override hour (for testing). Defaults to current hour.
|
||||
|
||||
Returns:
|
||||
RoutingDecision with model, provider, and reasoning.
|
||||
"""
|
||||
if hour is None:
|
||||
hour = time.localtime().tm_hour
|
||||
|
||||
error_rate = get_hour_error_rate(hour)
|
||||
off_hours = is_off_hours(hour)
|
||||
|
||||
# Interactive sessions always use the base model (user can correct errors)
|
||||
if not is_cron:
|
||||
return RoutingDecision(
|
||||
model=base_model or _DEFAULT_CHEAP_MODEL,
|
||||
provider=base_provider,
|
||||
reason="Interactive session — user can correct errors",
|
||||
hour=hour,
|
||||
error_rate=error_rate,
|
||||
is_off_hours=off_hours,
|
||||
)
|
||||
|
||||
# Cron jobs during low-error hours: use base model
|
||||
if not off_hours and error_rate < _ERROR_THRESHOLD:
|
||||
return RoutingDecision(
|
||||
model=base_model or _DEFAULT_CHEAP_MODEL,
|
||||
provider=base_provider,
|
||||
reason=f"Low-error hours ({hour}:00, {error_rate}% expected)",
|
||||
hour=hour,
|
||||
error_rate=error_rate,
|
||||
is_off_hours=False,
|
||||
)
|
||||
|
||||
# Cron jobs during high-error hours: upgrade to stronger model
|
||||
if error_rate >= _ERROR_THRESHOLD:
|
||||
return RoutingDecision(
|
||||
model=_DEFAULT_STRONG_MODEL,
|
||||
provider="nous",
|
||||
reason=f"High-error hours ({hour}:00, {error_rate}% expected) — using stronger model",
|
||||
hour=hour,
|
||||
error_rate=error_rate,
|
||||
is_off_hours=True,
|
||||
)
|
||||
|
||||
# Off-hours but low error: use base model
|
||||
return RoutingDecision(
|
||||
model=base_model or _DEFAULT_CHEAP_MODEL,
|
||||
provider=base_provider,
|
||||
reason=f"Off-hours but low error ({hour}:00, {error_rate}%)",
|
||||
hour=hour,
|
||||
error_rate=error_rate,
|
||||
is_off_hours=off_hours,
|
||||
)
|
||||
|
||||
|
||||
def get_routing_report() -> str:
|
||||
"""Get a report of time-based routing decisions for the next 24 hours."""
|
||||
lines = ["Time-Aware Model Routing (24h forecast)", "=" * 40, ""]
|
||||
lines.append(f"Error threshold: {_ERROR_THRESHOLD}%")
|
||||
lines.append(f"Strong model: {_DEFAULT_STRONG_MODEL}")
|
||||
lines.append(f"Cheap model: {_DEFAULT_CHEAP_MODEL}")
|
||||
lines.append("")
|
||||
|
||||
for h in range(24):
|
||||
decision = resolve_time_aware_model(is_cron=True, hour=h)
|
||||
icon = "\U0001f7e2" if decision.model == _DEFAULT_CHEAP_MODEL else "\U0001f534"
|
||||
lines.append(f" {h:02d}:00 {icon} {decision.model:25s} ({decision.error_rate}% error)")
|
||||
|
||||
return "\n".join(lines)
|
||||
@@ -1,58 +0,0 @@
|
||||
"""Tests for time-aware model routing."""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
from agent.time_aware_routing import (
|
||||
resolve_time_aware_model,
|
||||
get_hour_error_rate,
|
||||
is_off_hours,
|
||||
get_routing_report,
|
||||
)
|
||||
|
||||
|
||||
class TestErrorRates:
|
||||
def test_evening_high_error(self):
|
||||
assert get_hour_error_rate(18) == 9.4
|
||||
assert get_hour_error_rate(19) == 8.1
|
||||
|
||||
def test_morning_low_error(self):
|
||||
assert get_hour_error_rate(9) == 4.0
|
||||
assert get_hour_error_rate(12) == 4.0
|
||||
|
||||
def test_default_for_unknown(self):
|
||||
assert get_hour_error_rate(15) == 4.0
|
||||
|
||||
|
||||
class TestOffHours:
|
||||
def test_evening_is_off_hours(self):
|
||||
assert is_off_hours(20) is True
|
||||
assert is_off_hours(2) is True
|
||||
|
||||
def test_business_hours_not_off(self):
|
||||
assert is_off_hours(9) is False
|
||||
assert is_off_hours(14) is False
|
||||
|
||||
|
||||
class TestRouting:
|
||||
def test_interactive_uses_base_model(self):
|
||||
d = resolve_time_aware_model("my-model", "my-provider", is_cron=False, hour=18)
|
||||
assert d.model == "my-model"
|
||||
assert "Interactive" in d.reason
|
||||
|
||||
def test_cron_low_error_uses_base(self):
|
||||
d = resolve_time_aware_model("cheap-model", is_cron=True, hour=10)
|
||||
assert d.model == "cheap-model"
|
||||
|
||||
def test_cron_high_error_upgrades(self):
|
||||
d = resolve_time_aware_model("cheap-model", is_cron=True, hour=18)
|
||||
assert d.model != "cheap-model"
|
||||
assert d.is_off_hours is True
|
||||
|
||||
def test_routing_report(self):
|
||||
report = get_routing_report()
|
||||
assert "Time-Aware Model Routing" in report
|
||||
assert "18:00" in report
|
||||
@@ -44,6 +44,34 @@ from typing import Dict, Any, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _format_error(
|
||||
message: str,
|
||||
skill_name: str = None,
|
||||
file_path: str = None,
|
||||
suggestion: str = None,
|
||||
context: dict = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Format an error with rich context for better debugging."""
|
||||
parts = [message]
|
||||
if skill_name:
|
||||
parts.append(f"Skill: {skill_name}")
|
||||
if file_path:
|
||||
parts.append(f"File: {file_path}")
|
||||
if suggestion:
|
||||
parts.append(f"Suggestion: {suggestion}")
|
||||
if context:
|
||||
for key, value in context.items():
|
||||
parts.append(f"{key}: {value}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": " | ".join(parts),
|
||||
"skill_name": skill_name,
|
||||
"file_path": file_path,
|
||||
"suggestion": suggestion,
|
||||
}
|
||||
|
||||
|
||||
# Import security scanner — agent-created skills get the same scrutiny as
|
||||
# community hub installs.
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user