From 08d337e03dc9e3074975b24f66b6041a8e3d5fea Mon Sep 17 00:00:00 2001 From: "Claude (Opus 4.6)" Date: Mon, 23 Mar 2026 19:45:56 +0000 Subject: [PATCH] [claude] Implement three-tier metabolic LLM router (#966) (#1160) Co-authored-by: Claude (Opus 4.6) Co-committed-by: Claude (Opus 4.6) --- src/infrastructure/router/__init__.py | 15 + src/infrastructure/router/metabolic.py | 381 +++++++++++++++++ tests/infrastructure/test_metabolic_router.py | 386 ++++++++++++++++++ 3 files changed, 782 insertions(+) create mode 100644 src/infrastructure/router/metabolic.py create mode 100644 tests/infrastructure/test_metabolic_router.py diff --git a/src/infrastructure/router/__init__.py b/src/infrastructure/router/__init__.py index dfe39c12..f7f7ac25 100644 --- a/src/infrastructure/router/__init__.py +++ b/src/infrastructure/router/__init__.py @@ -3,6 +3,14 @@ from .api import router from .cascade import CascadeRouter, Provider, ProviderStatus, get_router from .history import HealthHistoryStore, get_history_store +from .metabolic import ( + DEFAULT_TIER_MODELS, + MetabolicRouter, + ModelTier, + build_prompt, + classify_complexity, + get_metabolic_router, +) __all__ = [ "CascadeRouter", @@ -12,4 +20,11 @@ __all__ = [ "router", "HealthHistoryStore", "get_history_store", + # Metabolic router + "MetabolicRouter", + "ModelTier", + "DEFAULT_TIER_MODELS", + "classify_complexity", + "build_prompt", + "get_metabolic_router", ] diff --git a/src/infrastructure/router/metabolic.py b/src/infrastructure/router/metabolic.py new file mode 100644 index 00000000..7e3fceb3 --- /dev/null +++ b/src/infrastructure/router/metabolic.py @@ -0,0 +1,381 @@ +"""Three-tier metabolic LLM router. + +Routes queries to the cheapest-sufficient model tier using MLX for all +inference on Apple Silicon GPU: + + T1 — Routine (Qwen3-8B Q6_K, ~45-55 tok/s): Simple navigation, basic choices. + T2 — Medium (Qwen3-14B Q5_K_M, ~20-28 tok/s): Dialogue, inventory management. + T3 — Complex (Qwen3-32B Q4_K_M, ~8-12 tok/s): Quest planning, stuck recovery. + +Memory budget: + - T1+T2 always loaded (~8.5 GB combined) + - T3 loaded on demand (+20 GB) — game pauses during inference + +Design notes: + - 70% of game ticks never reach the LLM (handled upstream by behavior trees) + - T3 pauses the game world before inference and unpauses after (graceful if no world) + - All inference via vllm-mlx / Ollama — local-first, no cloud for game ticks + +References: + - Issue #966 — Three-Tier Metabolic LLM Router + - Issue #1063 — Best Local Uncensored Agent Model for M3 Max 36GB + - Issue #1075 — Claude Quota Monitor + Metabolic Protocol +""" + +import asyncio +import logging +from enum import StrEnum +from typing import Any + +logger = logging.getLogger(__name__) + + +class ModelTier(StrEnum): + """Three metabolic model tiers ordered by cost and capability. + + Tier selection is driven by classify_complexity(). The cheapest + sufficient tier is always chosen — T1 handles routine tasks, T2 + handles dialogue and management, T3 handles planning and recovery. + """ + + T1_ROUTINE = "t1_routine" # Fast, cheap — Qwen3-8B, always loaded + T2_MEDIUM = "t2_medium" # Balanced — Qwen3-14B, always loaded + T3_COMPLEX = "t3_complex" # Deep — Qwen3-32B, loaded on demand, pauses game + + +# ── Classification vocabulary ──────────────────────────────────────────────── + +# T1: single-action navigation and binary-choice words +_T1_KEYWORDS = frozenset( + { + "go", "move", "walk", "run", "north", "south", "east", "west", + "up", "down", "left", "right", "yes", "no", "ok", "okay", + "open", "close", "take", "drop", "look", "pick", "use", + "wait", "rest", "save", "attack", "flee", "jump", "crouch", + } +) + +# T3: planning, optimisation, or recovery signals +_T3_KEYWORDS = frozenset( + { + "plan", "strategy", "optimize", "optimise", "quest", "stuck", + "recover", "multi-step", "long-term", "negotiate", "persuade", + "faction", "reputation", "best", "optimal", "recommend", + "analyze", "analyse", "evaluate", "decide", "complex", "how do i", + "what should i do", "help me figure", "what is the best", + } +) + + +def classify_complexity(task: str, state: dict) -> ModelTier: + """Classify a task to the cheapest-sufficient model tier. + + Classification priority (highest wins): + 1. T3 — any T3 keyword, stuck indicator, or ``state["require_t3"] = True`` + 2. T1 — short task with only T1 keywords and no active context + 3. T2 — everything else (safe default) + + Args: + task: Natural-language task description or player input. + state: Current game state dict. Recognised keys: + ``stuck`` (bool), ``require_t3`` (bool), + ``active_quests`` (list), ``dialogue_active`` (bool). + + Returns: + ModelTier appropriate for the task. + """ + task_lower = task.lower() + words = set(task_lower.split()) + + # ── T3 signals ────────────────────────────────────────────────────────── + t3_keyword_hit = bool(words & _T3_KEYWORDS) + # Check multi-word T3 phrases + t3_phrase_hit = any(phrase in task_lower for phrase in _T3_KEYWORDS if " " in phrase) + is_stuck = bool(state.get("stuck", False)) + explicit_t3 = bool(state.get("require_t3", False)) + + if t3_keyword_hit or t3_phrase_hit or is_stuck or explicit_t3: + logger.debug( + "classify_complexity → T3 (keywords=%s stuck=%s explicit=%s)", + t3_keyword_hit or t3_phrase_hit, + is_stuck, + explicit_t3, + ) + return ModelTier.T3_COMPLEX + + # ── T1 signals ────────────────────────────────────────────────────────── + t1_keyword_hit = bool(words & _T1_KEYWORDS) + task_short = len(task.split()) <= 6 + no_active_context = ( + not state.get("active_quests") + and not state.get("dialogue_active") + and not state.get("combat_active") + ) + + if t1_keyword_hit and task_short and no_active_context: + logger.debug("classify_complexity → T1 (keywords=%s short=%s)", t1_keyword_hit, task_short) + return ModelTier.T1_ROUTINE + + # ── Default: T2 ───────────────────────────────────────────────────────── + logger.debug("classify_complexity → T2 (default)") + return ModelTier.T2_MEDIUM + + +def build_prompt( + state: dict, + ui_state: dict, + text: str, + visual_context: str | None = None, +) -> list[dict]: + """Build an OpenAI-compatible messages list from game context. + + Assembles a system message from structured game state and a user + message from the player's text input. This format is accepted by + CascadeRouter.complete() directly. + + Args: + state: Current game state dict. Common keys: + ``location`` (str), ``health`` (int/float), + ``inventory`` (list), ``active_quests`` (list), + ``stuck`` (bool). + ui_state: Current UI state dict. Common keys: + ``dialogue_active`` (bool), ``dialogue_npc`` (str), + ``menu_open`` (str), ``combat_active`` (bool). + text: Player text or task description (becomes user message). + visual_context: Optional free-text description of the current screen + or scene — from a vision model or rule-based extractor. + + Returns: + List of message dicts: [{"role": "system", ...}, {"role": "user", ...}] + """ + context_lines: list[str] = [] + + location = state.get("location", "unknown") + context_lines.append(f"Location: {location}") + + health = state.get("health") + if health is not None: + context_lines.append(f"Health: {health}") + + inventory = state.get("inventory", []) + if inventory: + items = [i if isinstance(i, str) else i.get("name", str(i)) for i in inventory[:10]] + context_lines.append(f"Inventory: {', '.join(items)}") + + active_quests = state.get("active_quests", []) + if active_quests: + names = [ + q if isinstance(q, str) else q.get("name", str(q)) for q in active_quests[:5] + ] + context_lines.append(f"Active quests: {', '.join(names)}") + + if state.get("stuck"): + context_lines.append("Status: STUCK — need recovery strategy") + + if ui_state.get("dialogue_active"): + npc = ui_state.get("dialogue_npc", "NPC") + context_lines.append(f"In dialogue with: {npc}") + + if ui_state.get("menu_open"): + context_lines.append(f"Menu open: {ui_state['menu_open']}") + + if ui_state.get("combat_active"): + context_lines.append("Status: IN COMBAT") + + if visual_context: + context_lines.append(f"Scene: {visual_context}") + + system_content = ( + "You are Timmy, an AI game agent. " + "Respond with valid game commands only.\n\n" + + "\n".join(context_lines) + ) + + return [ + {"role": "system", "content": system_content}, + {"role": "user", "content": text}, + ] + + +# ── Default model assignments ──────────────────────────────────────────────── +# Overridable per deployment via MetabolicRouter(tier_models={...}). +# Model benchmarks (M3 Max 36 GB, issue #1063): +# Qwen3-8B Q6_K — 0.933 F1 tool calling, ~45-55 tok/s (~6 GB) +# Qwen3-14B Q5_K_M — 0.971 F1 tool calling, ~20-28 tok/s (~9.5 GB) +# Qwen3-32B Q4_K_M — highest quality, ~8-12 tok/s (~20 GB, on demand) +DEFAULT_TIER_MODELS: dict[ModelTier, str] = { + ModelTier.T1_ROUTINE: "qwen3:8b", + ModelTier.T2_MEDIUM: "qwen3:14b", + ModelTier.T3_COMPLEX: "qwen3:30b", # Closest Ollama tag to 32B Q4 +} + + +class MetabolicRouter: + """Routes LLM requests to the cheapest-sufficient model tier. + + Wraps CascadeRouter with: + - Complexity classification via classify_complexity() + - Prompt assembly via build_prompt() + - T3 world-pause / world-unpause (graceful if no world adapter) + + Usage:: + + router = MetabolicRouter() + + # Simple route call — classification + prompt + inference in one step + result = await router.route( + task="Go north", + state={"location": "Balmora"}, + ui_state={}, + ) + print(result["content"], result["tier"]) + + # Pre-classify if you need the tier for telemetry + tier = router.classify("Plan the best path to Vivec", game_state) + + # Wire in world adapter for T3 pause/unpause + router.set_world(world_adapter) + """ + + def __init__( + self, + cascade: Any | None = None, + tier_models: dict[ModelTier, str] | None = None, + ) -> None: + """Initialise the metabolic router. + + Args: + cascade: CascadeRouter instance to use. If None, the + singleton returned by get_router() is used lazily. + tier_models: Override default model names per tier. + """ + self._cascade = cascade + self._tier_models: dict[ModelTier, str] = dict(DEFAULT_TIER_MODELS) + if tier_models: + self._tier_models.update(tier_models) + self._world: Any | None = None + + def set_world(self, world: Any) -> None: + """Wire in a world adapter for T3 pause / unpause support. + + The adapter only needs to implement ``act(CommandInput)`` — the full + WorldInterface contract is not required. A missing or broken world + adapter degrades gracefully (logs a warning, inference continues). + + Args: + world: Any object with an ``act(CommandInput)`` method. + """ + self._world = world + + def _get_cascade(self) -> Any: + """Return the CascadeRouter, creating the singleton if needed.""" + if self._cascade is None: + from infrastructure.router.cascade import get_router + + self._cascade = get_router() + return self._cascade + + def classify(self, task: str, state: dict) -> ModelTier: + """Classify task complexity. Delegates to classify_complexity().""" + return classify_complexity(task, state) + + async def _pause_world(self) -> None: + """Pause the game world before T3 inference (graceful degradation).""" + if self._world is None: + return + try: + from infrastructure.world.types import CommandInput + + await asyncio.to_thread(self._world.act, CommandInput(action="pause")) + logger.debug("MetabolicRouter: world paused for T3 inference") + except Exception as exc: + logger.warning("world.pause() failed — continuing without pause: %s", exc) + + async def _unpause_world(self) -> None: + """Unpause the game world after T3 inference (always called, even on error).""" + if self._world is None: + return + try: + from infrastructure.world.types import CommandInput + + await asyncio.to_thread(self._world.act, CommandInput(action="unpause")) + logger.debug("MetabolicRouter: world unpaused after T3 inference") + except Exception as exc: + logger.warning("world.unpause() failed — game may remain paused: %s", exc) + + async def route( + self, + task: str, + state: dict, + ui_state: dict | None = None, + visual_context: str | None = None, + temperature: float = 0.3, + max_tokens: int | None = None, + ) -> dict: + """Route a task to the appropriate model tier and return the LLM response. + + Selects the tier via classify_complexity(), assembles the prompt via + build_prompt(), and dispatches to CascadeRouter. For T3, the game + world is paused before inference and unpaused after (in a finally block). + + Args: + task: Natural-language task description or player input. + state: Current game state dict. + ui_state: Current UI state dict (optional, defaults to {}). + visual_context: Optional screen/scene description from vision model. + temperature: Sampling temperature (default 0.3 for game commands). + max_tokens: Maximum tokens to generate. + + Returns: + Dict with keys: ``content``, ``provider``, ``model``, ``tier``, + ``latency_ms``, plus any extra keys from CascadeRouter. + + Raises: + RuntimeError: If all providers fail (propagated from CascadeRouter). + """ + ui_state = ui_state or {} + tier = self.classify(task, state) + model = self._tier_models[tier] + messages = build_prompt(state, ui_state, task, visual_context) + cascade = self._get_cascade() + + logger.info( + "MetabolicRouter: tier=%s model=%s task=%r", + tier, + model, + task[:80], + ) + + if tier == ModelTier.T3_COMPLEX: + await self._pause_world() + try: + result = await cascade.complete( + messages=messages, + model=model, + temperature=temperature, + max_tokens=max_tokens, + ) + finally: + await self._unpause_world() + else: + result = await cascade.complete( + messages=messages, + model=model, + temperature=temperature, + max_tokens=max_tokens, + ) + + result["tier"] = tier + return result + + +# ── Module-level singleton ──────────────────────────────────────────────────── +_metabolic_router: MetabolicRouter | None = None + + +def get_metabolic_router() -> MetabolicRouter: + """Get or create the MetabolicRouter singleton.""" + global _metabolic_router + if _metabolic_router is None: + _metabolic_router = MetabolicRouter() + return _metabolic_router diff --git a/tests/infrastructure/test_metabolic_router.py b/tests/infrastructure/test_metabolic_router.py new file mode 100644 index 00000000..21ccf689 --- /dev/null +++ b/tests/infrastructure/test_metabolic_router.py @@ -0,0 +1,386 @@ +"""Tests for the three-tier metabolic LLM router (issue #966).""" + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +pytestmark = pytest.mark.unit + +from infrastructure.router.metabolic import ( + DEFAULT_TIER_MODELS, + MetabolicRouter, + ModelTier, + build_prompt, + classify_complexity, + get_metabolic_router, +) + + +# ── classify_complexity ────────────────────────────────────────────────────── + + +class TestClassifyComplexity: + """Verify tier classification for representative task / state pairs.""" + + # ── T1: Routine ───────────────────────────────────────────────────────── + + def test_simple_navigation_is_t1(self): + assert classify_complexity("go north", {}) == ModelTier.T1_ROUTINE + + def test_single_action_is_t1(self): + assert classify_complexity("open door", {}) == ModelTier.T1_ROUTINE + + def test_t1_with_extra_words_stays_t1(self): + # 6 words, all T1 territory, no active context + assert classify_complexity("go south and take it", {}) == ModelTier.T1_ROUTINE + + def test_t1_long_task_upgrades_to_t2(self): + # More than 6 words → not T1 even with nav words + assert ( + classify_complexity("go north and then move east and pick up the sword", {}) + != ModelTier.T1_ROUTINE + ) + + def test_active_quest_upgrades_t1_to_t2(self): + state = {"active_quests": ["Rescue the Mage"]} + assert classify_complexity("go north", state) == ModelTier.T2_MEDIUM + + def test_dialogue_active_upgrades_t1_to_t2(self): + state = {"dialogue_active": True} + assert classify_complexity("yes", state) == ModelTier.T2_MEDIUM + + def test_combat_active_upgrades_t1_to_t2(self): + state = {"combat_active": True} + assert classify_complexity("attack", state) == ModelTier.T2_MEDIUM + + # ── T2: Medium ────────────────────────────────────────────────────────── + + def test_default_is_t2(self): + assert classify_complexity("what do I have in my inventory", {}) == ModelTier.T2_MEDIUM + + def test_dialogue_response_is_t2(self): + state = {"dialogue_active": True, "dialogue_npc": "Caius Cosades"} + result = classify_complexity("I'm looking for Caius Cosades", state) + assert result == ModelTier.T2_MEDIUM + + # ── T3: Complex ───────────────────────────────────────────────────────── + + def test_quest_planning_is_t3(self): + assert classify_complexity("plan my quest route", {}) == ModelTier.T3_COMPLEX + + def test_strategy_keyword_is_t3(self): + assert classify_complexity("what is the best strategy", {}) == ModelTier.T3_COMPLEX + + def test_stuck_keyword_is_t3(self): + assert classify_complexity("I am stuck", {}) == ModelTier.T3_COMPLEX + + def test_stuck_state_is_t3(self): + assert classify_complexity("help me", {"stuck": True}) == ModelTier.T3_COMPLEX + + def test_require_t3_flag_forces_t3(self): + state = {"require_t3": True} + assert classify_complexity("go north", state) == ModelTier.T3_COMPLEX + + def test_optimize_keyword_is_t3(self): + assert classify_complexity("optimize my skill build", {}) == ModelTier.T3_COMPLEX + + def test_multi_word_t3_phrase(self): + assert classify_complexity("how do i get past the guards", {}) == ModelTier.T3_COMPLEX + + def test_case_insensitive(self): + assert classify_complexity("PLAN my route", {}) == ModelTier.T3_COMPLEX + + +# ── build_prompt ───────────────────────────────────────────────────────────── + + +class TestBuildPrompt: + """Verify prompt structure and content assembly.""" + + def test_returns_two_messages(self): + msgs = build_prompt({}, {}, "go north") + assert len(msgs) == 2 + assert msgs[0]["role"] == "system" + assert msgs[1]["role"] == "user" + + def test_user_message_contains_task(self): + msgs = build_prompt({}, {}, "pick up the sword") + assert msgs[1]["content"] == "pick up the sword" + + def test_location_in_system(self): + msgs = build_prompt({"location": "Balmora"}, {}, "look around") + assert "Balmora" in msgs[0]["content"] + + def test_health_in_system(self): + msgs = build_prompt({"health": 42}, {}, "rest") + assert "42" in msgs[0]["content"] + + def test_inventory_in_system(self): + msgs = build_prompt({"inventory": ["iron sword", "bread"]}, {}, "use item") + assert "iron sword" in msgs[0]["content"] + + def test_inventory_truncated_to_10(self): + inventory = [f"item{i}" for i in range(20)] + msgs = build_prompt({"inventory": inventory}, {}, "check") + # Only first 10 should appear in the system message + assert "item10" not in msgs[0]["content"] + + def test_active_quests_in_system(self): + msgs = build_prompt({"active_quests": ["Morrowind Main Quest"]}, {}, "help") + assert "Morrowind Main Quest" in msgs[0]["content"] + + def test_stuck_indicator_in_system(self): + msgs = build_prompt({"stuck": True}, {}, "what now") + assert "STUCK" in msgs[0]["content"] + + def test_dialogue_npc_in_system(self): + msgs = build_prompt({}, {"dialogue_active": True, "dialogue_npc": "Vivec"}, "hello") + assert "Vivec" in msgs[0]["content"] + + def test_menu_open_in_system(self): + msgs = build_prompt({}, {"menu_open": "inventory"}, "check items") + assert "inventory" in msgs[0]["content"] + + def test_combat_active_in_system(self): + msgs = build_prompt({}, {"combat_active": True}, "attack") + assert "COMBAT" in msgs[0]["content"] + + def test_visual_context_in_system(self): + msgs = build_prompt({}, {}, "where am I", visual_context="A dark dungeon corridor") + assert "dungeon corridor" in msgs[0]["content"] + + def test_missing_optional_fields_omitted(self): + msgs = build_prompt({}, {}, "move forward") + system = msgs[0]["content"] + assert "Health:" not in system + assert "Inventory:" not in system + assert "Active quests:" not in system + + def test_inventory_dict_items(self): + inventory = [{"name": "silver dagger"}, {"name": "potion"}] + msgs = build_prompt({"inventory": inventory}, {}, "use") + assert "silver dagger" in msgs[0]["content"] + + def test_quest_dict_items(self): + quests = [{"name": "The Warlord"}, {"name": "Lost in Translation"}] + msgs = build_prompt({"active_quests": quests}, {}, "help") + assert "The Warlord" in msgs[0]["content"] + + +# ── MetabolicRouter ────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +class TestMetabolicRouter: + """Test MetabolicRouter routing, tier labelling, and T3 world-pause logic.""" + + def _make_router(self, mock_cascade=None): + """Create a MetabolicRouter with a mocked CascadeRouter.""" + if mock_cascade is None: + mock_cascade = MagicMock() + mock_cascade.complete = AsyncMock( + return_value={ + "content": "Move north confirmed.", + "provider": "ollama-local", + "model": "qwen3:8b", + "latency_ms": 120.0, + } + ) + return MetabolicRouter(cascade=mock_cascade) + + async def test_route_returns_tier_in_result(self): + router = self._make_router() + result = await router.route("go north", state={}) + assert "tier" in result + assert result["tier"] == ModelTier.T1_ROUTINE + + async def test_t1_uses_t1_model(self): + mock_cascade = MagicMock() + mock_cascade.complete = AsyncMock( + return_value={"content": "ok", "provider": "ollama-local", "model": "qwen3:8b", "latency_ms": 100} + ) + router = MetabolicRouter(cascade=mock_cascade) + await router.route("go north", state={}) + call_kwargs = mock_cascade.complete.call_args + assert call_kwargs.kwargs["model"] == DEFAULT_TIER_MODELS[ModelTier.T1_ROUTINE] + + async def test_t2_uses_t2_model(self): + mock_cascade = MagicMock() + mock_cascade.complete = AsyncMock( + return_value={"content": "ok", "provider": "ollama-local", "model": "qwen3:14b", "latency_ms": 300} + ) + router = MetabolicRouter(cascade=mock_cascade) + await router.route("what should I say to the innkeeper", state={}) + call_kwargs = mock_cascade.complete.call_args + assert call_kwargs.kwargs["model"] == DEFAULT_TIER_MODELS[ModelTier.T2_MEDIUM] + + async def test_t3_uses_t3_model(self): + mock_cascade = MagicMock() + mock_cascade.complete = AsyncMock( + return_value={"content": "ok", "provider": "ollama-local", "model": "qwen3:30b", "latency_ms": 2000} + ) + router = MetabolicRouter(cascade=mock_cascade) + await router.route("plan the optimal quest route", state={}) + call_kwargs = mock_cascade.complete.call_args + assert call_kwargs.kwargs["model"] == DEFAULT_TIER_MODELS[ModelTier.T3_COMPLEX] + + async def test_custom_tier_models_respected(self): + mock_cascade = MagicMock() + mock_cascade.complete = AsyncMock( + return_value={"content": "ok", "provider": "test", "model": "custom-8b", "latency_ms": 100} + ) + custom = {ModelTier.T1_ROUTINE: "custom-8b"} + router = MetabolicRouter(cascade=mock_cascade, tier_models=custom) + await router.route("go north", state={}) + call_kwargs = mock_cascade.complete.call_args + assert call_kwargs.kwargs["model"] == "custom-8b" + + async def test_t3_pauses_world_before_inference(self): + mock_cascade = MagicMock() + mock_cascade.complete = AsyncMock( + return_value={"content": "ok", "provider": "ollama", "model": "qwen3:30b", "latency_ms": 1500} + ) + router = MetabolicRouter(cascade=mock_cascade) + + pause_calls = [] + unpause_calls = [] + + mock_world = MagicMock() + + def track_act(cmd): + if cmd.action == "pause": + pause_calls.append(cmd) + elif cmd.action == "unpause": + unpause_calls.append(cmd) + + mock_world.act = track_act + router.set_world(mock_world) + + await router.route("plan the quest", state={}) + + assert len(pause_calls) == 1, "world.pause() should be called once for T3" + assert len(unpause_calls) == 1, "world.unpause() should be called once for T3" + + async def test_t3_unpauses_world_even_on_llm_error(self): + """world.unpause() must be called even when the LLM raises.""" + mock_cascade = MagicMock() + mock_cascade.complete = AsyncMock(side_effect=RuntimeError("LLM failed")) + router = MetabolicRouter(cascade=mock_cascade) + + unpause_calls = [] + mock_world = MagicMock() + mock_world.act = lambda cmd: unpause_calls.append(cmd) if cmd.action == "unpause" else None + router.set_world(mock_world) + + with pytest.raises(RuntimeError, match="LLM failed"): + await router.route("plan the quest", state={}) + + assert len(unpause_calls) == 1, "world.unpause() must run even when LLM errors" + + async def test_t1_does_not_pause_world(self): + mock_cascade = MagicMock() + mock_cascade.complete = AsyncMock( + return_value={"content": "ok", "provider": "ollama", "model": "qwen3:8b", "latency_ms": 120} + ) + router = MetabolicRouter(cascade=mock_cascade) + + pause_calls = [] + mock_world = MagicMock() + mock_world.act = lambda cmd: pause_calls.append(cmd) + router.set_world(mock_world) + + await router.route("go north", state={}) + + assert len(pause_calls) == 0, "world.pause() must NOT be called for T1" + + async def test_t2_does_not_pause_world(self): + mock_cascade = MagicMock() + mock_cascade.complete = AsyncMock( + return_value={"content": "ok", "provider": "ollama", "model": "qwen3:14b", "latency_ms": 350} + ) + router = MetabolicRouter(cascade=mock_cascade) + + pause_calls = [] + mock_world = MagicMock() + mock_world.act = lambda cmd: pause_calls.append(cmd) + router.set_world(mock_world) + + await router.route("talk to the merchant", state={}) + + assert len(pause_calls) == 0, "world.pause() must NOT be called for T2" + + async def test_broken_world_adapter_degrades_gracefully(self): + """If world.act() raises, inference must still complete.""" + mock_cascade = MagicMock() + mock_cascade.complete = AsyncMock( + return_value={"content": "done", "provider": "ollama", "model": "qwen3:30b", "latency_ms": 2000} + ) + router = MetabolicRouter(cascade=mock_cascade) + + mock_world = MagicMock() + mock_world.act = MagicMock(side_effect=RuntimeError("world broken")) + router.set_world(mock_world) + + # Should not raise — degradation only logs a warning + result = await router.route("plan the quest", state={}) + assert result["content"] == "done" + + async def test_no_world_adapter_t3_still_works(self): + mock_cascade = MagicMock() + mock_cascade.complete = AsyncMock( + return_value={"content": "plan done", "provider": "ollama", "model": "qwen3:30b", "latency_ms": 2000} + ) + router = MetabolicRouter(cascade=mock_cascade) + # No set_world() called + + result = await router.route("plan the quest route", state={}) + assert result["content"] == "plan done" + assert result["tier"] == ModelTier.T3_COMPLEX + + async def test_classify_delegates_to_module_function(self): + router = MetabolicRouter(cascade=MagicMock()) + assert router.classify("go north", {}) == classify_complexity("go north", {}) + assert router.classify("plan the quest", {}) == classify_complexity("plan the quest", {}) + + async def test_ui_state_defaults_to_empty_dict(self): + """Calling route without ui_state should not raise.""" + mock_cascade = MagicMock() + mock_cascade.complete = AsyncMock( + return_value={"content": "ok", "provider": "ollama", "model": "qwen3:8b", "latency_ms": 100} + ) + router = MetabolicRouter(cascade=mock_cascade) + # No ui_state argument + result = await router.route("go north", state={}) + assert result["content"] == "ok" + + async def test_temperature_and_max_tokens_forwarded(self): + mock_cascade = MagicMock() + mock_cascade.complete = AsyncMock( + return_value={"content": "ok", "provider": "ollama", "model": "qwen3:14b", "latency_ms": 200} + ) + router = MetabolicRouter(cascade=mock_cascade) + await router.route("describe the scene", state={}, temperature=0.1, max_tokens=50) + call_kwargs = mock_cascade.complete.call_args.kwargs + assert call_kwargs["temperature"] == 0.1 + assert call_kwargs["max_tokens"] == 50 + + +class TestGetMetabolicRouter: + """Test module-level singleton.""" + + def test_returns_metabolic_router_instance(self): + import infrastructure.router.metabolic as m_module + + # Reset singleton for clean test + m_module._metabolic_router = None + router = get_metabolic_router() + assert isinstance(router, MetabolicRouter) + + def test_singleton_returns_same_instance(self): + import infrastructure.router.metabolic as m_module + + m_module._metabolic_router = None + r1 = get_metabolic_router() + r2 = get_metabolic_router() + assert r1 is r2