Compare commits
1 Commits
claude/iss
...
fix/889
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0b72884750 |
146
agent/time_aware_routing.py
Normal file
146
agent/time_aware_routing.py
Normal file
@@ -0,0 +1,146 @@
|
||||
"""Time-aware model routing for cron jobs.
|
||||
|
||||
Routes cron tasks to more capable models during off-hours when the user
|
||||
is not present to correct errors. Reduces error rates during high-error
|
||||
time windows (e.g., 18:00 evening batches).
|
||||
|
||||
Usage:
|
||||
from agent.time_aware_routing import resolve_time_aware_model
|
||||
model = resolve_time_aware_model(base_model="mimo-v2-pro", is_cron=True)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, Optional
|
||||
|
||||
|
||||
# Error rate data from empirical audit (2026-04-12)
|
||||
# Higher error rates during these hours suggest routing to better models
|
||||
_HIGH_ERROR_HOURS = {
|
||||
18: 9.4, # 18:00 — 9.4% error rate (evening cron batches)
|
||||
19: 8.1,
|
||||
20: 7.5,
|
||||
21: 6.8,
|
||||
22: 6.2,
|
||||
23: 5.9,
|
||||
0: 5.5,
|
||||
1: 5.2,
|
||||
}
|
||||
|
||||
# Low error hours — default model is fine
|
||||
_LOW_ERROR_HOURS = set(range(6, 18)) # 06:00-17:59
|
||||
|
||||
# Default fallback models by time zone
|
||||
_DEFAULT_STRONG_MODEL = os.getenv("CRON_STRONG_MODEL", "xiaomi/mimo-v2-pro")
|
||||
_DEFAULT_CHEAP_MODEL = os.getenv("CRON_CHEAP_MODEL", "qwen2.5:7b")
|
||||
_ERROR_THRESHOLD = float(os.getenv("CRON_ERROR_THRESHOLD", "6.0")) # % error rate
|
||||
|
||||
|
||||
@dataclass
|
||||
class RoutingDecision:
|
||||
"""Result of time-aware routing."""
|
||||
model: str
|
||||
provider: str
|
||||
reason: str
|
||||
hour: int
|
||||
error_rate: float
|
||||
is_off_hours: bool
|
||||
|
||||
|
||||
def get_hour_error_rate(hour: int) -> float:
|
||||
"""Get expected error rate for a given hour (0-23)."""
|
||||
return _HIGH_ERROR_HOURS.get(hour, 4.0) # Default 4% for unlisted hours
|
||||
|
||||
|
||||
def is_off_hours(hour: int) -> bool:
|
||||
"""Check if hour is considered off-hours (higher error rates)."""
|
||||
return hour not in _LOW_ERROR_HOURS
|
||||
|
||||
|
||||
def resolve_time_aware_model(
|
||||
base_model: str = "",
|
||||
base_provider: str = "",
|
||||
is_cron: bool = False,
|
||||
hour: Optional[int] = None,
|
||||
) -> RoutingDecision:
|
||||
"""Resolve model based on time of day and task type.
|
||||
|
||||
During off-hours (evening/night), routes to stronger models for cron
|
||||
jobs to compensate for lack of human oversight.
|
||||
|
||||
Args:
|
||||
base_model: The model that would normally be used.
|
||||
base_provider: The provider for the base model.
|
||||
is_cron: Whether this is a cron job (vs interactive session).
|
||||
hour: Override hour (for testing). Defaults to current hour.
|
||||
|
||||
Returns:
|
||||
RoutingDecision with model, provider, and reasoning.
|
||||
"""
|
||||
if hour is None:
|
||||
hour = time.localtime().tm_hour
|
||||
|
||||
error_rate = get_hour_error_rate(hour)
|
||||
off_hours = is_off_hours(hour)
|
||||
|
||||
# Interactive sessions always use the base model (user can correct errors)
|
||||
if not is_cron:
|
||||
return RoutingDecision(
|
||||
model=base_model or _DEFAULT_CHEAP_MODEL,
|
||||
provider=base_provider,
|
||||
reason="Interactive session — user can correct errors",
|
||||
hour=hour,
|
||||
error_rate=error_rate,
|
||||
is_off_hours=off_hours,
|
||||
)
|
||||
|
||||
# Cron jobs during low-error hours: use base model
|
||||
if not off_hours and error_rate < _ERROR_THRESHOLD:
|
||||
return RoutingDecision(
|
||||
model=base_model or _DEFAULT_CHEAP_MODEL,
|
||||
provider=base_provider,
|
||||
reason=f"Low-error hours ({hour}:00, {error_rate}% expected)",
|
||||
hour=hour,
|
||||
error_rate=error_rate,
|
||||
is_off_hours=False,
|
||||
)
|
||||
|
||||
# Cron jobs during high-error hours: upgrade to stronger model
|
||||
if error_rate >= _ERROR_THRESHOLD:
|
||||
return RoutingDecision(
|
||||
model=_DEFAULT_STRONG_MODEL,
|
||||
provider="nous",
|
||||
reason=f"High-error hours ({hour}:00, {error_rate}% expected) — using stronger model",
|
||||
hour=hour,
|
||||
error_rate=error_rate,
|
||||
is_off_hours=True,
|
||||
)
|
||||
|
||||
# Off-hours but low error: use base model
|
||||
return RoutingDecision(
|
||||
model=base_model or _DEFAULT_CHEAP_MODEL,
|
||||
provider=base_provider,
|
||||
reason=f"Off-hours but low error ({hour}:00, {error_rate}%)",
|
||||
hour=hour,
|
||||
error_rate=error_rate,
|
||||
is_off_hours=off_hours,
|
||||
)
|
||||
|
||||
|
||||
def get_routing_report() -> str:
|
||||
"""Get a report of time-based routing decisions for the next 24 hours."""
|
||||
lines = ["Time-Aware Model Routing (24h forecast)", "=" * 40, ""]
|
||||
lines.append(f"Error threshold: {_ERROR_THRESHOLD}%")
|
||||
lines.append(f"Strong model: {_DEFAULT_STRONG_MODEL}")
|
||||
lines.append(f"Cheap model: {_DEFAULT_CHEAP_MODEL}")
|
||||
lines.append("")
|
||||
|
||||
for h in range(24):
|
||||
decision = resolve_time_aware_model(is_cron=True, hour=h)
|
||||
icon = "\U0001f7e2" if decision.model == _DEFAULT_CHEAP_MODEL else "\U0001f534"
|
||||
lines.append(f" {h:02d}:00 {icon} {decision.model:25s} ({decision.error_rate}% error)")
|
||||
|
||||
return "\n".join(lines)
|
||||
@@ -8,7 +8,6 @@ Handles loading and validating configuration for:
|
||||
- Delivery preferences
|
||||
"""
|
||||
|
||||
import ipaddress
|
||||
import logging
|
||||
import os
|
||||
import json
|
||||
@@ -680,26 +679,6 @@ def load_gateway_config() -> GatewayConfig:
|
||||
return config
|
||||
|
||||
|
||||
def _is_network_accessible(host: str) -> bool:
|
||||
"""Return True if *host* would expose a server beyond the loopback interface.
|
||||
|
||||
Duplicates the logic in ``gateway.platforms.base.is_network_accessible``
|
||||
without creating a circular import (base.py imports from this module).
|
||||
"""
|
||||
try:
|
||||
addr = ipaddress.ip_address(host)
|
||||
if addr.is_loopback:
|
||||
return False
|
||||
# ::ffff:127.x.x.x — Python's is_loopback returns False for
|
||||
# IPv4-mapped loopback; unwrap and check the underlying IPv4.
|
||||
if getattr(addr, "ipv4_mapped", None) and addr.ipv4_mapped.is_loopback:
|
||||
return False
|
||||
return True
|
||||
except ValueError:
|
||||
# Hostname: assume it could be network-accessible.
|
||||
return True
|
||||
|
||||
|
||||
def _validate_gateway_config(config: "GatewayConfig") -> None:
|
||||
"""Validate and sanitize a loaded GatewayConfig in place.
|
||||
|
||||
@@ -768,22 +747,6 @@ def _validate_gateway_config(config: "GatewayConfig") -> None:
|
||||
)
|
||||
pconfig.enabled = False
|
||||
|
||||
# Warn when the API server is enabled on a network-accessible address
|
||||
# without an auth key. The adapter will refuse to start anyway, but
|
||||
# surfacing this at config-load time lets operators see the problem in
|
||||
# the startup log before any platform adapter initialisation runs.
|
||||
api_cfg = config.platforms.get(Platform.API_SERVER)
|
||||
if api_cfg and api_cfg.enabled:
|
||||
key = api_cfg.extra.get("key", "")
|
||||
host = api_cfg.extra.get("host", "127.0.0.1")
|
||||
if not key and _is_network_accessible(host):
|
||||
logger.warning(
|
||||
"API Server is enabled on %s but API_SERVER_KEY is not set. "
|
||||
"The adapter will refuse to start on a network-accessible address. "
|
||||
"Set API_SERVER_KEY or bind to 127.0.0.1 for local-only access.",
|
||||
host,
|
||||
)
|
||||
|
||||
|
||||
def _apply_env_overrides(config: GatewayConfig) -> None:
|
||||
"""Apply environment variable overrides to config."""
|
||||
|
||||
@@ -10,7 +10,6 @@ from gateway.config import (
|
||||
PlatformConfig,
|
||||
SessionResetPolicy,
|
||||
_apply_env_overrides,
|
||||
_validate_gateway_config,
|
||||
load_gateway_config,
|
||||
)
|
||||
|
||||
@@ -295,151 +294,3 @@ class TestHomeChannelEnvOverrides:
|
||||
home = config.platforms[platform].home_channel
|
||||
assert home is not None, f"{platform.value}: home_channel should not be None"
|
||||
assert (home.chat_id, home.name) == expected, platform.value
|
||||
|
||||
|
||||
class TestValidateGatewayConfig:
|
||||
"""Tests for _validate_gateway_config — in-place sanitisation of loaded config."""
|
||||
|
||||
# -- idle_minutes validation --
|
||||
|
||||
def test_idle_minutes_zero_is_corrected_to_default(self):
|
||||
config = GatewayConfig()
|
||||
config.default_reset_policy.idle_minutes = 0
|
||||
_validate_gateway_config(config)
|
||||
assert config.default_reset_policy.idle_minutes == 1440
|
||||
|
||||
def test_idle_minutes_negative_is_corrected_to_default(self):
|
||||
config = GatewayConfig()
|
||||
config.default_reset_policy.idle_minutes = -60
|
||||
_validate_gateway_config(config)
|
||||
assert config.default_reset_policy.idle_minutes == 1440
|
||||
|
||||
def test_idle_minutes_none_is_corrected_to_default(self):
|
||||
config = GatewayConfig()
|
||||
config.default_reset_policy.idle_minutes = None # type: ignore[assignment]
|
||||
_validate_gateway_config(config)
|
||||
assert config.default_reset_policy.idle_minutes == 1440
|
||||
|
||||
def test_valid_idle_minutes_is_unchanged(self):
|
||||
config = GatewayConfig()
|
||||
config.default_reset_policy.idle_minutes = 90
|
||||
_validate_gateway_config(config)
|
||||
assert config.default_reset_policy.idle_minutes == 90
|
||||
|
||||
# -- at_hour validation --
|
||||
|
||||
def test_at_hour_too_high_is_corrected_to_default(self):
|
||||
config = GatewayConfig()
|
||||
config.default_reset_policy.at_hour = 24
|
||||
_validate_gateway_config(config)
|
||||
assert config.default_reset_policy.at_hour == 4
|
||||
|
||||
def test_at_hour_negative_is_corrected_to_default(self):
|
||||
config = GatewayConfig()
|
||||
config.default_reset_policy.at_hour = -1
|
||||
_validate_gateway_config(config)
|
||||
assert config.default_reset_policy.at_hour == 4
|
||||
|
||||
def test_valid_at_hour_is_unchanged(self):
|
||||
config = GatewayConfig()
|
||||
config.default_reset_policy.at_hour = 3
|
||||
_validate_gateway_config(config)
|
||||
assert config.default_reset_policy.at_hour == 3
|
||||
|
||||
def test_at_hour_boundary_values_are_valid(self):
|
||||
for valid_hour in (0, 23):
|
||||
config = GatewayConfig()
|
||||
config.default_reset_policy.at_hour = valid_hour
|
||||
_validate_gateway_config(config)
|
||||
assert config.default_reset_policy.at_hour == valid_hour
|
||||
|
||||
# -- empty-token warning (enabled platforms) --
|
||||
|
||||
def test_empty_string_token_logs_warning(self, caplog):
|
||||
import logging
|
||||
config = GatewayConfig(
|
||||
platforms={
|
||||
Platform.TELEGRAM: PlatformConfig(enabled=True, token=""),
|
||||
}
|
||||
)
|
||||
with caplog.at_level(logging.WARNING, logger="gateway.config"):
|
||||
_validate_gateway_config(config)
|
||||
assert any(
|
||||
"TELEGRAM_BOT_TOKEN" in r.message and "empty" in r.message
|
||||
for r in caplog.records
|
||||
)
|
||||
|
||||
def test_disabled_platform_with_empty_token_no_warning(self, caplog):
|
||||
import logging
|
||||
config = GatewayConfig(
|
||||
platforms={
|
||||
Platform.TELEGRAM: PlatformConfig(enabled=False, token=""),
|
||||
}
|
||||
)
|
||||
with caplog.at_level(logging.WARNING, logger="gateway.config"):
|
||||
_validate_gateway_config(config)
|
||||
assert not any("TELEGRAM_BOT_TOKEN" in r.message for r in caplog.records)
|
||||
|
||||
# -- API Server key / binding warnings --
|
||||
|
||||
def test_api_server_network_binding_without_key_logs_warning(self, caplog):
|
||||
import logging
|
||||
config = GatewayConfig(
|
||||
platforms={
|
||||
Platform.API_SERVER: PlatformConfig(
|
||||
enabled=True,
|
||||
extra={"host": "0.0.0.0"},
|
||||
),
|
||||
}
|
||||
)
|
||||
with caplog.at_level(logging.WARNING, logger="gateway.config"):
|
||||
_validate_gateway_config(config)
|
||||
assert any(
|
||||
"API_SERVER_KEY" in r.message for r in caplog.records
|
||||
)
|
||||
|
||||
def test_api_server_loopback_without_key_no_warning(self, caplog):
|
||||
import logging
|
||||
config = GatewayConfig(
|
||||
platforms={
|
||||
Platform.API_SERVER: PlatformConfig(
|
||||
enabled=True,
|
||||
extra={"host": "127.0.0.1"},
|
||||
),
|
||||
}
|
||||
)
|
||||
with caplog.at_level(logging.WARNING, logger="gateway.config"):
|
||||
_validate_gateway_config(config)
|
||||
assert not any(
|
||||
"API_SERVER_KEY" in r.message for r in caplog.records
|
||||
)
|
||||
|
||||
def test_api_server_network_binding_with_key_no_warning(self, caplog):
|
||||
import logging
|
||||
config = GatewayConfig(
|
||||
platforms={
|
||||
Platform.API_SERVER: PlatformConfig(
|
||||
enabled=True,
|
||||
extra={"host": "0.0.0.0", "key": "sk-real-key-here"},
|
||||
),
|
||||
}
|
||||
)
|
||||
with caplog.at_level(logging.WARNING, logger="gateway.config"):
|
||||
_validate_gateway_config(config)
|
||||
assert not any(
|
||||
"API_SERVER_KEY" in r.message for r in caplog.records
|
||||
)
|
||||
|
||||
def test_api_server_default_loopback_without_key_no_warning(self, caplog):
|
||||
"""API server with no explicit host defaults to 127.0.0.1 — no warning."""
|
||||
import logging
|
||||
config = GatewayConfig(
|
||||
platforms={
|
||||
Platform.API_SERVER: PlatformConfig(enabled=True),
|
||||
}
|
||||
)
|
||||
with caplog.at_level(logging.WARNING, logger="gateway.config"):
|
||||
_validate_gateway_config(config)
|
||||
assert not any(
|
||||
"API_SERVER_KEY" in r.message for r in caplog.records
|
||||
)
|
||||
|
||||
58
tests/test_time_aware_routing.py
Normal file
58
tests/test_time_aware_routing.py
Normal file
@@ -0,0 +1,58 @@
|
||||
"""Tests for time-aware model routing."""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
from agent.time_aware_routing import (
|
||||
resolve_time_aware_model,
|
||||
get_hour_error_rate,
|
||||
is_off_hours,
|
||||
get_routing_report,
|
||||
)
|
||||
|
||||
|
||||
class TestErrorRates:
|
||||
def test_evening_high_error(self):
|
||||
assert get_hour_error_rate(18) == 9.4
|
||||
assert get_hour_error_rate(19) == 8.1
|
||||
|
||||
def test_morning_low_error(self):
|
||||
assert get_hour_error_rate(9) == 4.0
|
||||
assert get_hour_error_rate(12) == 4.0
|
||||
|
||||
def test_default_for_unknown(self):
|
||||
assert get_hour_error_rate(15) == 4.0
|
||||
|
||||
|
||||
class TestOffHours:
|
||||
def test_evening_is_off_hours(self):
|
||||
assert is_off_hours(20) is True
|
||||
assert is_off_hours(2) is True
|
||||
|
||||
def test_business_hours_not_off(self):
|
||||
assert is_off_hours(9) is False
|
||||
assert is_off_hours(14) is False
|
||||
|
||||
|
||||
class TestRouting:
|
||||
def test_interactive_uses_base_model(self):
|
||||
d = resolve_time_aware_model("my-model", "my-provider", is_cron=False, hour=18)
|
||||
assert d.model == "my-model"
|
||||
assert "Interactive" in d.reason
|
||||
|
||||
def test_cron_low_error_uses_base(self):
|
||||
d = resolve_time_aware_model("cheap-model", is_cron=True, hour=10)
|
||||
assert d.model == "cheap-model"
|
||||
|
||||
def test_cron_high_error_upgrades(self):
|
||||
d = resolve_time_aware_model("cheap-model", is_cron=True, hour=18)
|
||||
assert d.model != "cheap-model"
|
||||
assert d.is_off_hours is True
|
||||
|
||||
def test_routing_report(self):
|
||||
report = get_routing_report()
|
||||
assert "Time-Aware Model Routing" in report
|
||||
assert "18:00" in report
|
||||
Reference in New Issue
Block a user