Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0b72884750 |
146
agent/time_aware_routing.py
Normal file
146
agent/time_aware_routing.py
Normal file
@@ -0,0 +1,146 @@
|
||||
"""Time-aware model routing for cron jobs.
|
||||
|
||||
Routes cron tasks to more capable models during off-hours when the user
|
||||
is not present to correct errors. Reduces error rates during high-error
|
||||
time windows (e.g., 18:00 evening batches).
|
||||
|
||||
Usage:
|
||||
from agent.time_aware_routing import resolve_time_aware_model
|
||||
model = resolve_time_aware_model(base_model="mimo-v2-pro", is_cron=True)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, Optional
|
||||
|
||||
|
||||
# Error rate data from empirical audit (2026-04-12)
|
||||
# Higher error rates during these hours suggest routing to better models
|
||||
_HIGH_ERROR_HOURS = {
|
||||
18: 9.4, # 18:00 — 9.4% error rate (evening cron batches)
|
||||
19: 8.1,
|
||||
20: 7.5,
|
||||
21: 6.8,
|
||||
22: 6.2,
|
||||
23: 5.9,
|
||||
0: 5.5,
|
||||
1: 5.2,
|
||||
}
|
||||
|
||||
# Low error hours — default model is fine
|
||||
_LOW_ERROR_HOURS = set(range(6, 18)) # 06:00-17:59
|
||||
|
||||
# Default fallback models by time zone
|
||||
_DEFAULT_STRONG_MODEL = os.getenv("CRON_STRONG_MODEL", "xiaomi/mimo-v2-pro")
|
||||
_DEFAULT_CHEAP_MODEL = os.getenv("CRON_CHEAP_MODEL", "qwen2.5:7b")
|
||||
_ERROR_THRESHOLD = float(os.getenv("CRON_ERROR_THRESHOLD", "6.0")) # % error rate
|
||||
|
||||
|
||||
@dataclass
|
||||
class RoutingDecision:
|
||||
"""Result of time-aware routing."""
|
||||
model: str
|
||||
provider: str
|
||||
reason: str
|
||||
hour: int
|
||||
error_rate: float
|
||||
is_off_hours: bool
|
||||
|
||||
|
||||
def get_hour_error_rate(hour: int) -> float:
|
||||
"""Get expected error rate for a given hour (0-23)."""
|
||||
return _HIGH_ERROR_HOURS.get(hour, 4.0) # Default 4% for unlisted hours
|
||||
|
||||
|
||||
def is_off_hours(hour: int) -> bool:
|
||||
"""Check if hour is considered off-hours (higher error rates)."""
|
||||
return hour not in _LOW_ERROR_HOURS
|
||||
|
||||
|
||||
def resolve_time_aware_model(
|
||||
base_model: str = "",
|
||||
base_provider: str = "",
|
||||
is_cron: bool = False,
|
||||
hour: Optional[int] = None,
|
||||
) -> RoutingDecision:
|
||||
"""Resolve model based on time of day and task type.
|
||||
|
||||
During off-hours (evening/night), routes to stronger models for cron
|
||||
jobs to compensate for lack of human oversight.
|
||||
|
||||
Args:
|
||||
base_model: The model that would normally be used.
|
||||
base_provider: The provider for the base model.
|
||||
is_cron: Whether this is a cron job (vs interactive session).
|
||||
hour: Override hour (for testing). Defaults to current hour.
|
||||
|
||||
Returns:
|
||||
RoutingDecision with model, provider, and reasoning.
|
||||
"""
|
||||
if hour is None:
|
||||
hour = time.localtime().tm_hour
|
||||
|
||||
error_rate = get_hour_error_rate(hour)
|
||||
off_hours = is_off_hours(hour)
|
||||
|
||||
# Interactive sessions always use the base model (user can correct errors)
|
||||
if not is_cron:
|
||||
return RoutingDecision(
|
||||
model=base_model or _DEFAULT_CHEAP_MODEL,
|
||||
provider=base_provider,
|
||||
reason="Interactive session — user can correct errors",
|
||||
hour=hour,
|
||||
error_rate=error_rate,
|
||||
is_off_hours=off_hours,
|
||||
)
|
||||
|
||||
# Cron jobs during low-error hours: use base model
|
||||
if not off_hours and error_rate < _ERROR_THRESHOLD:
|
||||
return RoutingDecision(
|
||||
model=base_model or _DEFAULT_CHEAP_MODEL,
|
||||
provider=base_provider,
|
||||
reason=f"Low-error hours ({hour}:00, {error_rate}% expected)",
|
||||
hour=hour,
|
||||
error_rate=error_rate,
|
||||
is_off_hours=False,
|
||||
)
|
||||
|
||||
# Cron jobs during high-error hours: upgrade to stronger model
|
||||
if error_rate >= _ERROR_THRESHOLD:
|
||||
return RoutingDecision(
|
||||
model=_DEFAULT_STRONG_MODEL,
|
||||
provider="nous",
|
||||
reason=f"High-error hours ({hour}:00, {error_rate}% expected) — using stronger model",
|
||||
hour=hour,
|
||||
error_rate=error_rate,
|
||||
is_off_hours=True,
|
||||
)
|
||||
|
||||
# Off-hours but low error: use base model
|
||||
return RoutingDecision(
|
||||
model=base_model or _DEFAULT_CHEAP_MODEL,
|
||||
provider=base_provider,
|
||||
reason=f"Off-hours but low error ({hour}:00, {error_rate}%)",
|
||||
hour=hour,
|
||||
error_rate=error_rate,
|
||||
is_off_hours=off_hours,
|
||||
)
|
||||
|
||||
|
||||
def get_routing_report() -> str:
|
||||
"""Get a report of time-based routing decisions for the next 24 hours."""
|
||||
lines = ["Time-Aware Model Routing (24h forecast)", "=" * 40, ""]
|
||||
lines.append(f"Error threshold: {_ERROR_THRESHOLD}%")
|
||||
lines.append(f"Strong model: {_DEFAULT_STRONG_MODEL}")
|
||||
lines.append(f"Cheap model: {_DEFAULT_CHEAP_MODEL}")
|
||||
lines.append("")
|
||||
|
||||
for h in range(24):
|
||||
decision = resolve_time_aware_model(is_cron=True, hour=h)
|
||||
icon = "\U0001f7e2" if decision.model == _DEFAULT_CHEAP_MODEL else "\U0001f534"
|
||||
lines.append(f" {h:02d}:00 {icon} {decision.model:25s} ({decision.error_rate}% error)")
|
||||
|
||||
return "\n".join(lines)
|
||||
@@ -1,156 +0,0 @@
|
||||
"""Tool fixation detection — break repetitive tool calling loops.
|
||||
|
||||
Detects when the agent latches onto one tool and calls it repeatedly
|
||||
without making progress. Injects a nudge prompt to break the loop.
|
||||
|
||||
Usage:
|
||||
from agent.tool_fixation_detector import ToolFixationDetector
|
||||
detector = ToolFixationDetector()
|
||||
nudge = detector.record("execute_code")
|
||||
if nudge:
|
||||
# Inject nudge into conversation
|
||||
messages.append({"role": "system", "content": nudge})
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
|
||||
# Default thresholds
|
||||
_DEFAULT_THRESHOLD = int(os.getenv("TOOL_FIXATION_THRESHOLD", "5"))
|
||||
_DEFAULT_WINDOW = int(os.getenv("TOOL_FIXATION_WINDOW", "10"))
|
||||
|
||||
|
||||
@dataclass
|
||||
class FixationEvent:
|
||||
"""Record of a fixation detection."""
|
||||
tool_name: str
|
||||
streak_length: int
|
||||
threshold: int
|
||||
nudge_sent: bool = False
|
||||
|
||||
|
||||
class ToolFixationDetector:
|
||||
"""Detects and breaks tool fixation loops.
|
||||
|
||||
Tracks the sequence of tool calls and detects when the same tool
|
||||
is called N times consecutively. When detected, returns a nudge
|
||||
prompt to inject into the conversation.
|
||||
"""
|
||||
|
||||
def __init__(self, threshold: int = 0, window: int = 0):
|
||||
self.threshold = threshold or _DEFAULT_THRESHOLD
|
||||
self.window = window or _DEFAULT_WINDOW
|
||||
self._history: List[str] = []
|
||||
self._current_streak: str = ""
|
||||
self._streak_count: int = 0
|
||||
self._nudges_sent: int = 0
|
||||
self._events: List[FixationEvent] = []
|
||||
|
||||
@property
|
||||
def nudges_sent(self) -> int:
|
||||
return self._nudges_sent
|
||||
|
||||
@property
|
||||
def events(self) -> List[FixationEvent]:
|
||||
return list(self._events)
|
||||
|
||||
def record(self, tool_name: str) -> Optional[str]:
|
||||
"""Record a tool call and return nudge prompt if fixation detected.
|
||||
|
||||
Args:
|
||||
tool_name: Name of the tool that was called.
|
||||
|
||||
Returns:
|
||||
Nudge prompt string if fixation detected, None otherwise.
|
||||
"""
|
||||
self._history.append(tool_name)
|
||||
|
||||
# Trim history to window
|
||||
if len(self._history) > self.window:
|
||||
self._history = self._history[-self.window:]
|
||||
|
||||
# Update streak
|
||||
if tool_name == self._current_streak:
|
||||
self._streak_count += 1
|
||||
else:
|
||||
self._current_streak = tool_name
|
||||
self._streak_count = 1
|
||||
|
||||
# Check for fixation
|
||||
if self._streak_count >= self.threshold:
|
||||
event = FixationEvent(
|
||||
tool_name=tool_name,
|
||||
streak_length=self._streak_count,
|
||||
threshold=self.threshold,
|
||||
nudge_sent=True,
|
||||
)
|
||||
self._events.append(event)
|
||||
self._nudges_sent += 1
|
||||
|
||||
return self._build_nudge(tool_name, self._streak_count)
|
||||
|
||||
return None
|
||||
|
||||
def _build_nudge(self, tool_name: str, count: int) -> str:
|
||||
"""Build a nudge prompt to break the fixation loop."""
|
||||
return (
|
||||
f"[SYSTEM: You have called `{tool_name}` {count} times in a row "
|
||||
f"without switching tools. This suggests a fixation loop. "
|
||||
f"Consider:\n"
|
||||
f"1. Is the tool returning an error? Read the error carefully.\n"
|
||||
f"2. Is there a different tool that could help?\n"
|
||||
f"3. Should you ask the user for clarification?\n"
|
||||
f"4. Is the task actually complete?\n"
|
||||
f"Break the loop by trying a different approach.]"
|
||||
)
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset the detector state."""
|
||||
self._history.clear()
|
||||
self._current_streak = ""
|
||||
self._streak_count = 0
|
||||
|
||||
def get_streak_info(self) -> dict:
|
||||
"""Get current streak information."""
|
||||
return {
|
||||
"current_tool": self._current_streak,
|
||||
"streak_count": self._streak_count,
|
||||
"threshold": self.threshold,
|
||||
"at_threshold": self._streak_count >= self.threshold,
|
||||
"nudges_sent": self._nudges_sent,
|
||||
}
|
||||
|
||||
def format_report(self) -> str:
|
||||
"""Format fixation events as a report."""
|
||||
if not self._events:
|
||||
return "No tool fixation detected."
|
||||
|
||||
lines = [
|
||||
f"Tool Fixation Report ({len(self._events)} events)",
|
||||
"=" * 40,
|
||||
]
|
||||
for e in self._events:
|
||||
lines.append(f" {e.tool_name}: {e.streak_length} consecutive calls (threshold: {e.threshold})")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# Singleton
|
||||
_detector: Optional[ToolFixationDetector] = None
|
||||
|
||||
|
||||
def get_fixation_detector() -> ToolFixationDetector:
|
||||
"""Get or create the singleton detector."""
|
||||
global _detector
|
||||
if _detector is None:
|
||||
_detector = ToolFixationDetector()
|
||||
return _detector
|
||||
|
||||
|
||||
def reset_fixation_detector() -> None:
|
||||
"""Reset the singleton."""
|
||||
global _detector
|
||||
_detector = None
|
||||
58
tests/test_time_aware_routing.py
Normal file
58
tests/test_time_aware_routing.py
Normal file
@@ -0,0 +1,58 @@
|
||||
"""Tests for time-aware model routing."""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
from agent.time_aware_routing import (
|
||||
resolve_time_aware_model,
|
||||
get_hour_error_rate,
|
||||
is_off_hours,
|
||||
get_routing_report,
|
||||
)
|
||||
|
||||
|
||||
class TestErrorRates:
|
||||
def test_evening_high_error(self):
|
||||
assert get_hour_error_rate(18) == 9.4
|
||||
assert get_hour_error_rate(19) == 8.1
|
||||
|
||||
def test_morning_low_error(self):
|
||||
assert get_hour_error_rate(9) == 4.0
|
||||
assert get_hour_error_rate(12) == 4.0
|
||||
|
||||
def test_default_for_unknown(self):
|
||||
assert get_hour_error_rate(15) == 4.0
|
||||
|
||||
|
||||
class TestOffHours:
|
||||
def test_evening_is_off_hours(self):
|
||||
assert is_off_hours(20) is True
|
||||
assert is_off_hours(2) is True
|
||||
|
||||
def test_business_hours_not_off(self):
|
||||
assert is_off_hours(9) is False
|
||||
assert is_off_hours(14) is False
|
||||
|
||||
|
||||
class TestRouting:
|
||||
def test_interactive_uses_base_model(self):
|
||||
d = resolve_time_aware_model("my-model", "my-provider", is_cron=False, hour=18)
|
||||
assert d.model == "my-model"
|
||||
assert "Interactive" in d.reason
|
||||
|
||||
def test_cron_low_error_uses_base(self):
|
||||
d = resolve_time_aware_model("cheap-model", is_cron=True, hour=10)
|
||||
assert d.model == "cheap-model"
|
||||
|
||||
def test_cron_high_error_upgrades(self):
|
||||
d = resolve_time_aware_model("cheap-model", is_cron=True, hour=18)
|
||||
assert d.model != "cheap-model"
|
||||
assert d.is_off_hours is True
|
||||
|
||||
def test_routing_report(self):
|
||||
report = get_routing_report()
|
||||
assert "Time-Aware Model Routing" in report
|
||||
assert "18:00" in report
|
||||
@@ -1,76 +0,0 @@
|
||||
"""Tests for tool fixation detection."""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
from agent.tool_fixation_detector import ToolFixationDetector, get_fixation_detector
|
||||
|
||||
|
||||
class TestFixationDetection:
|
||||
def test_no_fixation_below_threshold(self):
|
||||
d = ToolFixationDetector(threshold=5)
|
||||
for i in range(4):
|
||||
assert d.record("execute_code") is None
|
||||
|
||||
def test_fixation_at_threshold(self):
|
||||
d = ToolFixationDetector(threshold=3)
|
||||
d.record("execute_code")
|
||||
d.record("execute_code")
|
||||
nudge = d.record("execute_code")
|
||||
assert nudge is not None
|
||||
assert "execute_code" in nudge
|
||||
assert "3 times" in nudge
|
||||
|
||||
def test_fixation_above_threshold(self):
|
||||
d = ToolFixationDetector(threshold=3)
|
||||
d.record("execute_code")
|
||||
d.record("execute_code")
|
||||
d.record("execute_code") # threshold hit
|
||||
nudge = d.record("execute_code") # still nudging
|
||||
assert nudge is not None
|
||||
|
||||
def test_streak_resets_on_different_tool(self):
|
||||
d = ToolFixationDetector(threshold=3)
|
||||
d.record("execute_code")
|
||||
d.record("execute_code")
|
||||
d.record("terminal") # breaks streak
|
||||
assert d._streak_count == 1
|
||||
assert d._current_streak == "terminal"
|
||||
|
||||
def test_nudges_sent_counter(self):
|
||||
d = ToolFixationDetector(threshold=2)
|
||||
d.record("a")
|
||||
d.record("a") # nudge 1
|
||||
d.record("a") # nudge 2
|
||||
assert d.nudges_sent == 2
|
||||
|
||||
def test_events_recorded(self):
|
||||
d = ToolFixationDetector(threshold=2)
|
||||
d.record("x")
|
||||
d.record("x")
|
||||
assert len(d.events) == 1
|
||||
assert d.events[0].tool_name == "x"
|
||||
assert d.events[0].streak_length == 2
|
||||
|
||||
def test_report(self):
|
||||
d = ToolFixationDetector(threshold=2)
|
||||
d.record("x")
|
||||
d.record("x")
|
||||
report = d.format_report()
|
||||
assert "x" in report
|
||||
|
||||
def test_reset(self):
|
||||
d = ToolFixationDetector(threshold=2)
|
||||
d.record("x")
|
||||
d.record("x")
|
||||
d.reset()
|
||||
assert d._streak_count == 0
|
||||
assert d._current_streak == ""
|
||||
|
||||
def test_singleton(self):
|
||||
d1 = get_fixation_detector()
|
||||
d2 = get_fixation_detector()
|
||||
assert d1 is d2
|
||||
Reference in New Issue
Block a user