1
0
This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Timmy-time-dashboard/src/infrastructure/models/router.py
2026-03-24 01:53:25 +00:00

428 lines
17 KiB
Python

"""Three-tier model router — Local 8B / Local 70B / Cloud API Cascade.
Selects the cheapest-sufficient LLM for each request using a heuristic
task-complexity classifier. Tier 3 (Cloud API) is only used when Tier 2
fails or the budget guard allows it.
Tiers
-----
Tier 1 — LOCAL_FAST (Llama 3.1 8B / Hermes 3 8B via Ollama, free, ~0.3-1 s)
Navigation, basic interactions, simple decisions.
Tier 2 — LOCAL_HEAVY (Hermes 3/4 70B via Ollama, free, ~5-10 s for 200 tok)
Quest planning, dialogue strategy, complex reasoning.
Tier 3 — CLOUD_API (Claude / GPT-4o, paid ~$5-15/hr heavy use)
Recovery from Tier 2 failures, novel situations, multi-step planning.
Routing logic
-------------
1. Classify the task using keyword / length / context heuristics (no LLM call).
2. Route to the appropriate tier.
3. On Tier-1 low-quality response → auto-escalate to Tier 2.
4. On Tier-2 failure or explicit ``require_cloud=True`` → Tier 3 (if budget allows).
5. Log tier used, model, latency, estimated cost for every request.
References:
- Issue #882 — Model Tiering Router: Local 8B / Hermes 70B / Cloud API Cascade
"""
import asyncio
import logging
import re
import time
from enum import StrEnum
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
# ── Tier definitions ──────────────────────────────────────────────────────────
class TierLabel(StrEnum):
"""Three cost-sorted model tiers."""
LOCAL_FAST = "local_fast" # 8B local, always hot, free
LOCAL_HEAVY = "local_heavy" # 70B local, free but slower
CLOUD_API = "cloud_api" # Paid cloud backend (Claude / GPT-4o)
# ── Default model assignments (overridable via Settings) ──────────────────────
_DEFAULT_TIER_MODELS: dict[TierLabel, str] = {
TierLabel.LOCAL_FAST: "llama3.1:8b",
TierLabel.LOCAL_HEAVY: "hermes3:70b",
TierLabel.CLOUD_API: "claude-haiku-4-5",
}
# ── Classification vocabulary ─────────────────────────────────────────────────
# Patterns that indicate a Tier-1 (simple) task
_T1_WORDS: frozenset[str] = frozenset(
{
"go", "move", "walk", "run",
"north", "south", "east", "west", "up", "down", "left", "right",
"yes", "no", "ok", "okay",
"open", "close", "take", "drop", "look",
"pick", "use", "wait", "rest", "save",
"attack", "flee", "jump", "crouch",
"status", "ping", "list", "show", "get", "check",
}
)
# Patterns that indicate a Tier-2 or Tier-3 task
_T2_PHRASES: tuple[str, ...] = (
"plan", "strategy", "optimize", "optimise",
"quest", "stuck", "recover",
"negotiate", "persuade", "faction", "reputation",
"analyze", "analyse", "evaluate", "decide",
"complex", "multi-step", "long-term",
"how do i", "what should i do", "help me figure",
"what is the best", "recommend", "best way",
"explain", "describe in detail", "walk me through",
"compare", "design", "implement", "refactor",
"debug", "diagnose", "root cause",
)
# Low-quality response detection patterns
_LOW_QUALITY_PATTERNS: tuple[re.Pattern, ...] = (
re.compile(r"i\s+don'?t\s+know", re.IGNORECASE),
re.compile(r"i'm\s+not\s+sure", re.IGNORECASE),
re.compile(r"i\s+cannot\s+(help|assist|answer)", re.IGNORECASE),
re.compile(r"i\s+apologize", re.IGNORECASE),
re.compile(r"as an ai", re.IGNORECASE),
re.compile(r"i\s+don'?t\s+have\s+(enough|sufficient)\s+information", re.IGNORECASE),
)
# Response is definitely low-quality if shorter than this many characters
_LOW_QUALITY_MIN_CHARS = 20
# Response is suspicious if shorter than this many chars for a complex task
_ESCALATION_MIN_CHARS = 60
def classify_tier(task: str, context: dict | None = None) -> TierLabel:
"""Classify a task to the cheapest-sufficient model tier.
Classification priority (highest wins):
1. ``context["require_cloud"] = True`` → CLOUD_API
2. Any Tier-2 phrase or stuck/recovery signal → LOCAL_HEAVY
3. Short task with only Tier-1 words, no active context → LOCAL_FAST
4. Default → LOCAL_HEAVY (safe fallback for unknown tasks)
Args:
task: Natural-language task or user input.
context: Optional context dict. Recognised keys:
``require_cloud`` (bool), ``stuck`` (bool),
``require_t2`` (bool), ``active_quests`` (list),
``dialogue_active`` (bool), ``combat_active`` (bool).
Returns:
The cheapest ``TierLabel`` sufficient for the task.
"""
ctx = context or {}
task_lower = task.lower()
words = set(task_lower.split())
# ── Explicit cloud override ──────────────────────────────────────────────
if ctx.get("require_cloud"):
logger.debug("classify_tier → CLOUD_API (explicit require_cloud)")
return TierLabel.CLOUD_API
# ── Tier-2 / complexity signals ──────────────────────────────────────────
t2_phrase_hit = any(phrase in task_lower for phrase in _T2_PHRASES)
t2_word_hit = bool(words & {"plan", "strategy", "optimize", "optimise", "quest",
"stuck", "recover", "analyze", "analyse", "evaluate"})
is_stuck = bool(ctx.get("stuck"))
require_t2 = bool(ctx.get("require_t2"))
long_input = len(task) > 300 # long tasks warrant more capable model
deep_context = (
len(ctx.get("active_quests", [])) >= 3
or ctx.get("dialogue_active")
)
if t2_phrase_hit or t2_word_hit or is_stuck or require_t2 or long_input or deep_context:
logger.debug(
"classify_tier → LOCAL_HEAVY (phrase=%s word=%s stuck=%s explicit=%s long=%s ctx=%s)",
t2_phrase_hit, t2_word_hit, is_stuck, require_t2, long_input, deep_context,
)
return TierLabel.LOCAL_HEAVY
# ── Tier-1 signals ───────────────────────────────────────────────────────
t1_word_hit = bool(words & _T1_WORDS)
task_short = len(task.split()) <= 8
no_active_context = (
not ctx.get("active_quests")
and not ctx.get("dialogue_active")
and not ctx.get("combat_active")
)
if t1_word_hit and task_short and no_active_context:
logger.debug(
"classify_tier → LOCAL_FAST (words=%s short=%s)", t1_word_hit, task_short
)
return TierLabel.LOCAL_FAST
# ── Default: LOCAL_HEAVY (safe for anything unclassified) ────────────────
logger.debug("classify_tier → LOCAL_HEAVY (default)")
return TierLabel.LOCAL_HEAVY
def _is_low_quality(content: str, tier: TierLabel) -> bool:
"""Return True if the response looks like it should be escalated.
Used for automatic Tier-1 → Tier-2 escalation.
Args:
content: LLM response text.
tier: The tier that produced the response.
Returns:
True if the response is likely too low-quality to be useful.
"""
if not content or not content.strip():
return True
stripped = content.strip()
# Too short to be useful
if len(stripped) < _LOW_QUALITY_MIN_CHARS:
return True
# Insufficient for a supposedly complex-enough task
if tier == TierLabel.LOCAL_FAST and len(stripped) < _ESCALATION_MIN_CHARS:
return True
# Matches known "I can't help" patterns
for pattern in _LOW_QUALITY_PATTERNS:
if pattern.search(stripped):
return True
return False
class TieredModelRouter:
"""Routes LLM requests across the Local 8B / Local 70B / Cloud API tiers.
Wraps CascadeRouter with:
- Heuristic tier classification via ``classify_tier()``
- Automatic Tier-1 → Tier-2 escalation on low-quality responses
- Cloud-tier budget guard via ``BudgetTracker``
- Per-request logging: tier, model, latency, estimated cost
Usage::
router = TieredModelRouter()
result = await router.route(
task="Walk to the next room",
context={},
)
print(result["content"], result["tier"]) # "Move north.", "local_fast"
# Force heavy tier
result = await router.route(
task="Plan the optimal path to become Hortator",
context={"require_t2": True},
)
"""
def __init__(
self,
cascade: Any | None = None,
budget_tracker: Any | None = None,
tier_models: dict[TierLabel, str] | None = None,
auto_escalate: bool = True,
) -> None:
"""Initialise the tiered router.
Args:
cascade: CascadeRouter instance. If ``None``, the
singleton from ``get_router()`` is used lazily.
budget_tracker: BudgetTracker instance. If ``None``, the
singleton from ``get_budget_tracker()`` is used.
tier_models: Override default model names per tier.
auto_escalate: When ``True``, low-quality Tier-1 responses
automatically retry on Tier-2.
"""
self._cascade = cascade
self._budget = budget_tracker
self._tier_models: dict[TierLabel, str] = dict(_DEFAULT_TIER_MODELS)
self._auto_escalate = auto_escalate
# Apply settings-level overrides (can still be overridden per-instance)
if settings.tier_local_fast_model:
self._tier_models[TierLabel.LOCAL_FAST] = settings.tier_local_fast_model
if settings.tier_local_heavy_model:
self._tier_models[TierLabel.LOCAL_HEAVY] = settings.tier_local_heavy_model
if settings.tier_cloud_model:
self._tier_models[TierLabel.CLOUD_API] = settings.tier_cloud_model
if tier_models:
self._tier_models.update(tier_models)
# ── Lazy singletons ──────────────────────────────────────────────────────
def _get_cascade(self) -> Any:
if self._cascade is None:
from infrastructure.router.cascade import get_router
self._cascade = get_router()
return self._cascade
def _get_budget(self) -> Any:
if self._budget is None:
from infrastructure.models.budget import get_budget_tracker
self._budget = get_budget_tracker()
return self._budget
# ── Public interface ─────────────────────────────────────────────────────
def classify(self, task: str, context: dict | None = None) -> TierLabel:
"""Classify a task without routing. Useful for telemetry."""
return classify_tier(task, context)
async def route(
self,
task: str,
context: dict | None = None,
messages: list[dict] | None = None,
temperature: float = 0.3,
max_tokens: int | None = None,
) -> dict:
"""Route a task to the appropriate model tier.
Builds a minimal messages list if ``messages`` is not provided.
The result always includes a ``tier`` key indicating which tier
ultimately handled the request.
Args:
task: Natural-language task description.
context: Task context dict (see ``classify_tier()``).
messages: Pre-built OpenAI-compatible messages list. If
provided, ``task`` is only used for classification.
temperature: Sampling temperature (default 0.3).
max_tokens: Maximum tokens to generate.
Returns:
Dict with at minimum: ``content``, ``provider``, ``model``,
``tier``, ``latency_ms``. May include ``cost_usd`` when a
cloud request is recorded.
Raises:
RuntimeError: If all available tiers are exhausted.
"""
ctx = context or {}
tier = self.classify(task, ctx)
msgs = messages or [{"role": "user", "content": task}]
# ── Tier 1 attempt ───────────────────────────────────────────────────
if tier == TierLabel.LOCAL_FAST:
result = await self._complete_tier(
TierLabel.LOCAL_FAST, msgs, temperature, max_tokens
)
if self._auto_escalate and _is_low_quality(result.get("content", ""), TierLabel.LOCAL_FAST):
logger.info(
"TieredModelRouter: Tier-1 response low quality, escalating to Tier-2 "
"(task=%r content_len=%d)",
task[:80],
len(result.get("content", "")),
)
tier = TierLabel.LOCAL_HEAVY
result = await self._complete_tier(
TierLabel.LOCAL_HEAVY, msgs, temperature, max_tokens
)
return result
# ── Tier 2 attempt ───────────────────────────────────────────────────
if tier == TierLabel.LOCAL_HEAVY:
try:
return await self._complete_tier(
TierLabel.LOCAL_HEAVY, msgs, temperature, max_tokens
)
except Exception as exc:
logger.warning(
"TieredModelRouter: Tier-2 failed (%s) — escalating to cloud", exc
)
tier = TierLabel.CLOUD_API
# ── Tier 3 (Cloud) ───────────────────────────────────────────────────
budget = self._get_budget()
if not budget.cloud_allowed():
raise RuntimeError(
"Cloud API tier requested but budget limit reached — "
"increase tier_cloud_daily_budget_usd or tier_cloud_monthly_budget_usd"
)
result = await self._complete_tier(
TierLabel.CLOUD_API, msgs, temperature, max_tokens
)
# Record cloud spend if token info is available
usage = result.get("usage", {})
if usage:
cost = budget.record_spend(
provider=result.get("provider", "unknown"),
model=result.get("model", self._tier_models[TierLabel.CLOUD_API]),
tokens_in=usage.get("prompt_tokens", 0),
tokens_out=usage.get("completion_tokens", 0),
tier=TierLabel.CLOUD_API,
)
result["cost_usd"] = cost
return result
# ── Internal helpers ─────────────────────────────────────────────────────
async def _complete_tier(
self,
tier: TierLabel,
messages: list[dict],
temperature: float,
max_tokens: int | None,
) -> dict:
"""Dispatch a single inference request for the given tier."""
model = self._tier_models[tier]
cascade = self._get_cascade()
start = time.monotonic()
logger.info(
"TieredModelRouter: tier=%s model=%s messages=%d",
tier,
model,
len(messages),
)
result = await cascade.complete(
messages=messages,
model=model,
temperature=temperature,
max_tokens=max_tokens,
)
elapsed_ms = (time.monotonic() - start) * 1000
result["tier"] = tier
result.setdefault("latency_ms", elapsed_ms)
logger.info(
"TieredModelRouter: done tier=%s model=%s latency_ms=%.0f",
tier,
result.get("model", model),
elapsed_ms,
)
return result
# ── Module-level singleton ────────────────────────────────────────────────────
_tiered_router: TieredModelRouter | None = None
def get_tiered_router() -> TieredModelRouter:
"""Get or create the module-level TieredModelRouter singleton."""
global _tiered_router
if _tiered_router is None:
_tiered_router = TieredModelRouter()
return _tiered_router