diff --git a/src/bannerlord/__init__.py b/src/bannerlord/__init__.py new file mode 100644 index 00000000..390d40de --- /dev/null +++ b/src/bannerlord/__init__.py @@ -0,0 +1,11 @@ +"""Bannerlord campaign agent — M2: Basic Campaign Actions. + +Provides GABS integration (TCP JSON-RPC, port 4825) and the observe → +decide → act loop for autonomous campaign play: move, trade, recruit, +and engage bandits. + +Key GABS tools: party/move_to_settlement, inventory/buy_item, +party/recruit_all, party/engage_party. + +Done-condition: party grows from 20 → 100 troops, gold reaches 10 000 denars. +""" diff --git a/src/bannerlord/campaign_actions.py b/src/bannerlord/campaign_actions.py new file mode 100644 index 00000000..453a39cb --- /dev/null +++ b/src/bannerlord/campaign_actions.py @@ -0,0 +1,200 @@ +"""Bannerlord M2 campaign action primitives. + +Wraps the four key GABS tools for the M2 milestone: + - party/move_to_settlement → move the party to a named settlement + - inventory/buy_item → purchase trade goods + - party/recruit_all → hire all available recruits + - party/engage_party → engage a nearby bandit party + +All functions are async and return an ``ActionResult`` that is compatible +with the ``WorldInterface`` contract. + +Error handling follows Pattern 3 (Feature Disable): if GABS rejects an +action, log a warning and return a FAILURE result — never raise. +""" + +from __future__ import annotations + +import logging +from enum import StrEnum +from typing import TYPE_CHECKING + +from infrastructure.world.types import ActionResult, ActionStatus + +if TYPE_CHECKING: + from bannerlord.gabs_client import GabsClient + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# GABS method names — canonical reference +# --------------------------------------------------------------------------- + + +class GabsTool(StrEnum): + """GABS JSON-RPC method names for the M2 action set.""" + + MOVE_TO_SETTLEMENT = "party/move_to_settlement" + BUY_ITEM = "inventory/buy_item" + RECRUIT_ALL = "party/recruit_all" + ENGAGE_PARTY = "party/engage_party" + + +# --------------------------------------------------------------------------- +# Action functions +# --------------------------------------------------------------------------- + + +async def move_to_settlement( + client: "GabsClient", + settlement_id: str, + *, + settlement_name: str = "", +) -> ActionResult: + """Move the party to a target settlement. + + Parameters + ---------- + client: + Connected ``GabsClient`` instance. + settlement_id: + GABS settlement identifier (e.g. ``"town_A1"``). + settlement_name: + Human-readable name for logging only. + """ + label = settlement_name or settlement_id + try: + result = await client.call( + GabsTool.MOVE_TO_SETTLEMENT, + {"settlement_id": settlement_id}, + ) + logger.info("MOVE → %s: %s", label, result) + return ActionResult( + status=ActionStatus.SUCCESS, + message=f"Moving to {label}", + data=result or {}, + ) + except Exception as exc: # noqa: BLE001 + logger.warning("MOVE → %s failed: %s", label, exc) + return ActionResult( + status=ActionStatus.FAILURE, + message=f"Move to {label} failed: {exc}", + data={}, + ) + + +async def buy_item( + client: "GabsClient", + item_id: str, + quantity: int, + *, + settlement_id: str = "", +) -> ActionResult: + """Purchase trade goods from the current or specified settlement. + + Parameters + ---------- + client: + Connected ``GabsClient`` instance. + item_id: + Item identifier (e.g. ``"grain"``, ``"iron"``, ``"wool"``). + quantity: + Number of units to purchase. + settlement_id: + Optional target settlement; empty means current location. + """ + try: + params: dict = {"item_id": item_id, "quantity": quantity} + if settlement_id: + params["settlement_id"] = settlement_id + + result = await client.call(GabsTool.BUY_ITEM, params) + logger.info("BUY %dx %s: %s", quantity, item_id, result) + return ActionResult( + status=ActionStatus.SUCCESS, + message=f"Purchased {quantity}x {item_id}", + data=result or {}, + ) + except Exception as exc: # noqa: BLE001 + logger.warning("BUY %dx %s failed: %s", quantity, item_id, exc) + return ActionResult( + status=ActionStatus.FAILURE, + message=f"Buy {item_id} failed: {exc}", + data={}, + ) + + +async def recruit_all( + client: "GabsClient", + *, + settlement_id: str = "", +) -> ActionResult: + """Recruit all available troops at the current or specified settlement. + + Parameters + ---------- + client: + Connected ``GabsClient`` instance. + settlement_id: + Optional settlement to recruit from; empty means current. + """ + try: + params: dict = {} + if settlement_id: + params["settlement_id"] = settlement_id + + result = await client.call(GabsTool.RECRUIT_ALL, params) + recruited = (result or {}).get("recruited", "?") + logger.info("RECRUIT_ALL: recruited %s troops", recruited) + return ActionResult( + status=ActionStatus.SUCCESS, + message=f"Recruited {recruited} troops", + data=result or {}, + ) + except Exception as exc: # noqa: BLE001 + logger.warning("RECRUIT_ALL failed: %s", exc) + return ActionResult( + status=ActionStatus.FAILURE, + message=f"Recruit all failed: {exc}", + data={}, + ) + + +async def engage_party( + client: "GabsClient", + party_id: str, + *, + party_name: str = "", +) -> ActionResult: + """Engage a nearby party (typically a bandit gang) in combat. + + Auto-resolve is expected at high Tactics skill — the agent relies + on GABS to handle the battle outcome. + + Parameters + ---------- + client: + Connected ``GabsClient`` instance. + party_id: + GABS party identifier of the target. + party_name: + Human-readable name for logging only. + """ + label = party_name or party_id + try: + result = await client.call(GabsTool.ENGAGE_PARTY, {"party_id": party_id}) + outcome = (result or {}).get("outcome", "unknown") + logger.info("ENGAGE %s: %s", label, outcome) + return ActionResult( + status=ActionStatus.SUCCESS, + message=f"Engaged {label}: {outcome}", + data=result or {}, + ) + except Exception as exc: # noqa: BLE001 + logger.warning("ENGAGE %s failed: %s", label, exc) + return ActionResult( + status=ActionStatus.FAILURE, + message=f"Engage {label} failed: {exc}", + data={}, + ) diff --git a/src/bannerlord/campaign_loop.py b/src/bannerlord/campaign_loop.py new file mode 100644 index 00000000..1282f97c --- /dev/null +++ b/src/bannerlord/campaign_loop.py @@ -0,0 +1,318 @@ +"""Bannerlord M2 campaign action loop. + +Implements the observe → decide → act → wait pipeline described in +issue #1094. The loop runs until the M2 victory conditions are met +(100 troops + 10 000 gold) or until stopped externally. + +Architecture: + CampaignLoop.run() + while not m2_complete: + state = gabs.get_game_state() # observe + decision = decide(state) # decide (local Qwen3) + result = dispatch(decision, gabs) # act (GABS) + await asyncio.sleep(tick_seconds) # wait + +Error handling: + - GABS connection failures → log + retry with backoff (max 3 attempts) + - LLM failures → WAIT action (graceful degradation) + - Action failures → log + continue to next tick + +Progress tracking: + Loop publishes heartbeat events via the event bus so the dashboard + can display live party size and gold. +""" + +from __future__ import annotations + +import asyncio +import logging +import time +from dataclasses import dataclass, field +from datetime import UTC, datetime + +from bannerlord.campaign_actions import buy_item, engage_party, move_to_settlement, recruit_all +from bannerlord.campaign_state import parse_campaign_state +from bannerlord.decision import M2Action, decide +from bannerlord.gabs_client import GabsClient +from config import settings +from infrastructure.world.types import ActionResult, ActionStatus + +logger = logging.getLogger(__name__) + +_MAX_RECONNECT_ATTEMPTS = 3 +_RECONNECT_DELAY = 5.0 # seconds + + +# --------------------------------------------------------------------------- +# Progress snapshot (emitted each tick) +# --------------------------------------------------------------------------- + + +@dataclass +class TickResult: + """Summary of one campaign tick.""" + + tick: int + timestamp: str + party_size: int + gold: int + action: str + action_status: str + reasoning: str + duration_ms: int + m2_complete: bool = False + error: str = "" + + +# --------------------------------------------------------------------------- +# Campaign loop +# --------------------------------------------------------------------------- + + +class CampaignLoop: + """Runs the Bannerlord M2 autonomous campaign. + + Parameters + ---------- + gabs_host: + Override GABS server host. + gabs_port: + Override GABS server port. + tick_seconds: + Real-time pause between in-game ticks. + on_tick: + Optional async callback invoked after each tick with the + ``TickResult``. Used by the dashboard for live updates. + max_ticks: + Hard cap for testing / benchmarking. 0 = unlimited. + """ + + def __init__( + self, + *, + gabs_host: str | None = None, + gabs_port: int | None = None, + tick_seconds: float | None = None, + on_tick=None, + max_ticks: int = 0, + ) -> None: + self._host = gabs_host or settings.gabs_host + self._port = gabs_port or settings.gabs_port + self._tick_seconds = tick_seconds if tick_seconds is not None else settings.bannerlord_tick_seconds + self._on_tick = on_tick + self._max_ticks = max_ticks + self._running = False + self.history: list[TickResult] = [] + + # -- public API -------------------------------------------------------- + + @property + def is_running(self) -> bool: + return self._running + + def stop(self) -> None: + """Signal the loop to stop after the current tick.""" + self._running = False + logger.info("CampaignLoop stop requested") + + async def run(self) -> list[TickResult]: + """Start the campaign loop. + + Returns the list of tick results (for testing / benchmarking). + Runs until M2 complete, externally stopped, or max_ticks reached. + """ + from bannerlord.gabs_client import GabsClient + + self._running = True + logger.info( + "CampaignLoop starting — gabs=%s:%d tick=%.1fs", + self._host, + self._port, + self._tick_seconds, + ) + + client = GabsClient(host=self._host, port=self._port) + try: + await self._connect_with_retry(client) + except RuntimeError as exc: # noqa: BLE001 + logger.error("CampaignLoop: could not connect to GABS — aborting: %s", exc) + self._running = False + return self.history + + tick_num = 0 + try: + while self._running: + tick_num += 1 + if self._max_ticks > 0 and tick_num > self._max_ticks: + logger.info("CampaignLoop: max_ticks=%d reached", self._max_ticks) + break + + result = await self._run_tick(client, tick_num) + self.history.append(result) + + await self._emit(result) + + if result.m2_complete: + logger.info( + "M2 COMPLETE! Party=%d troops, Gold=%d denars", + result.party_size, + result.gold, + ) + break + + if result.error and not self._running: + break + + await asyncio.sleep(self._tick_seconds) + + finally: + await client.disconnect() + self._running = False + logger.info("CampaignLoop stopped after %d ticks", tick_num) + + return self.history + + # -- internal: single tick --------------------------------------------- + + async def _run_tick(self, client: "Any", tick_num: int) -> TickResult: + """Execute one observe → decide → act cycle.""" + start = time.monotonic() + + # 1. Observe + raw_state = await client.get_game_state() + state = parse_campaign_state(raw_state) + state = _override_tick(state, tick_num) + + # 2. Decide + decision = await decide(state) + + # 3. Act + action_result = await self._dispatch(decision, client) + + duration_ms = int((time.monotonic() - start) * 1000) + + return TickResult( + tick=tick_num, + timestamp=datetime.now(UTC).isoformat(), + party_size=state.party.party_size, + gold=state.economy.gold, + action=decision.action, + action_status=action_result.status.value, + reasoning=decision.reasoning, + duration_ms=duration_ms, + m2_complete=state.m2_complete, + ) + + async def _dispatch(self, decision: "Any", client: "Any") -> "Any": + """Route the decision to the correct GABS action function.""" + action = decision.action + + if action == M2Action.MOVE: + if not decision.settlement_id: + logger.warning("MOVE decision has no settlement_id — skipping") + return ActionResult( + status=ActionStatus.FAILURE, + message="MOVE missing settlement_id", + ) + return await move_to_settlement( + client, + decision.settlement_id, + settlement_name=decision.settlement_name, + ) + + elif action == M2Action.TRADE: + if not decision.item_id: + logger.warning("TRADE decision has no item_id — skipping") + return ActionResult( + status=ActionStatus.FAILURE, + message="TRADE missing item_id", + ) + return await buy_item( + client, + decision.item_id, + decision.quantity, + settlement_id=decision.settlement_id, + ) + + elif action == M2Action.RECRUIT: + return await recruit_all( + client, + settlement_id=decision.settlement_id, + ) + + elif action == M2Action.ENGAGE: + if not decision.party_id: + logger.warning("ENGAGE decision has no party_id — skipping") + return ActionResult( + status=ActionStatus.FAILURE, + message="ENGAGE missing party_id", + ) + return await engage_party( + client, + decision.party_id, + party_name=decision.party_name, + ) + + else: # WAIT or unknown + logger.debug("Tick %s: WAIT — %s", decision.action, decision.reasoning) + return ActionResult( + status=ActionStatus.NOOP, + message=f"WAIT: {decision.reasoning}", + ) + + # -- internal: connectivity -------------------------------------------- + + async def _connect_with_retry(self, client: "Any") -> None: + """Try to connect, retrying up to _MAX_RECONNECT_ATTEMPTS times.""" + for attempt in range(1, _MAX_RECONNECT_ATTEMPTS + 1): + try: + await client.connect() + return + except Exception as exc: # noqa: BLE001 + logger.warning( + "GABS connect attempt %d/%d failed: %s", + attempt, + _MAX_RECONNECT_ATTEMPTS, + exc, + ) + if attempt < _MAX_RECONNECT_ATTEMPTS: + await asyncio.sleep(_RECONNECT_DELAY) + + raise RuntimeError( + f"Could not connect to GABS at {self._host}:{self._port} " + f"after {_MAX_RECONNECT_ATTEMPTS} attempts" + ) + + # -- internal: event emission ------------------------------------------ + + async def _emit(self, result: TickResult) -> None: + """Emit tick data to the event bus (best-effort).""" + try: + from infrastructure.events.bus import event_bus # noqa: PLC0415 + + await event_bus.publish( + "bannerlord.tick", + { + "tick": result.tick, + "party_size": result.party_size, + "gold": result.gold, + "action": result.action, + "action_status": result.action_status, + "m2_complete": result.m2_complete, + "duration_ms": result.duration_ms, + }, + ) + except Exception as exc: # noqa: BLE001 + logger.debug("CampaignLoop emit skipped: %s", exc) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _override_tick(state: "Any", tick_num: int) -> "Any": + """Set the tick counter from the loop (GABS may not provide it).""" + if state.tick == 0: + state.tick = tick_num + return state diff --git a/src/bannerlord/campaign_state.py b/src/bannerlord/campaign_state.py new file mode 100644 index 00000000..f54a0d58 --- /dev/null +++ b/src/bannerlord/campaign_state.py @@ -0,0 +1,210 @@ +"""Bannerlord campaign state models. + +Parses the raw GABS ``game/get_state`` payload into typed models and +tracks the M2 progress counters: party size and gold accumulation. + +Done-condition (from issue #1094): + party_size >= 100 AND gold >= 10_000 +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from datetime import UTC, datetime +from typing import Any + +logger = logging.getLogger(__name__) + +# M2 victory conditions +M2_TROOP_GOAL = 100 +M2_GOLD_GOAL = 10_000 + + +@dataclass +class PartyState: + """Current party composition and position.""" + + party_size: int = 0 + wounded: int = 0 + prisoners: int = 0 + food_days: float = 0.0 + morale: float = 100.0 + current_settlement: str = "" + speed: float = 0.0 + + +@dataclass +class EconomyState: + """Current gold and trade state.""" + + gold: int = 0 + daily_income: int = 0 + daily_expenses: int = 0 + + @property + def net_income(self) -> int: + return self.daily_income - self.daily_expenses + + +@dataclass +class NearbyParty: + """A nearby lord/bandit party visible on the map.""" + + party_id: str + name: str + faction: str + is_hostile: bool + troop_count: int + distance: float + + +@dataclass +class Settlement: + """A settlement visible or reachable from the current position.""" + + settlement_id: str + name: str + faction: str + is_friendly: bool + distance: float + has_recruits: bool = False + has_trade_goods: bool = False + + +@dataclass +class CampaignState: + """Full parsed snapshot of the GABS game state. + + Built from the raw ``dict`` returned by ``GabsClient.get_game_state()``. + """ + + tick: int = 0 + timestamp: datetime = field(default_factory=lambda: datetime.now(UTC)) + party: PartyState = field(default_factory=PartyState) + economy: EconomyState = field(default_factory=EconomyState) + nearby_parties: list[NearbyParty] = field(default_factory=list) + settlements: list[Settlement] = field(default_factory=list) + raw: dict[str, Any] = field(default_factory=dict) + + # -- M2 progress ------------------------------------------------------- + + @property + def troops_progress(self) -> str: + """Human-readable M2 troop progress.""" + return f"{self.party.party_size}/{M2_TROOP_GOAL}" + + @property + def gold_progress(self) -> str: + """Human-readable M2 gold progress.""" + return f"{self.economy.gold:,}/{M2_GOLD_GOAL:,}" + + @property + def m2_complete(self) -> bool: + """True when both M2 victory conditions are met.""" + return self.party.party_size >= M2_TROOP_GOAL and self.economy.gold >= M2_GOLD_GOAL + + # -- hostile detection ------------------------------------------------- + + def hostile_bandits_nearby(self, max_distance: float = 5.0) -> list[NearbyParty]: + """Return hostile bandit parties within *max_distance* map units.""" + return [ + p + for p in self.nearby_parties + if p.is_hostile and "bandit" in p.faction.lower() and p.distance <= max_distance + ] + + def nearest_settlement(self, *, friendly_only: bool = False) -> Settlement | None: + """Return the closest (optionally friendly) settlement.""" + candidates = [s for s in self.settlements if not friendly_only or s.is_friendly] + if not candidates: + return None + return min(candidates, key=lambda s: s.distance) + + def nearest_recruit_settlement(self) -> Settlement | None: + """Return the nearest settlement that has recruits available.""" + candidates = [s for s in self.settlements if s.has_recruits] + if not candidates: + return None + return min(candidates, key=lambda s: s.distance) + + +# --------------------------------------------------------------------------- +# Parser +# --------------------------------------------------------------------------- + + +def parse_campaign_state(raw: dict[str, Any]) -> CampaignState: + """Build a ``CampaignState`` from the raw GABS state dict. + + Unknown / missing fields are silently defaulted so the parser never + crashes when GABS returns partial data. + """ + if not raw: + logger.debug("parse_campaign_state: empty payload — returning default state") + return CampaignState(raw=raw) + + # -- party ------------------------------------------------------------- + party_raw = raw.get("party", {}) + party = PartyState( + party_size=int(party_raw.get("size", 0)), + wounded=int(party_raw.get("wounded", 0)), + prisoners=int(party_raw.get("prisoners", 0)), + food_days=float(party_raw.get("food_days", 0.0)), + morale=float(party_raw.get("morale", 100.0)), + current_settlement=str(party_raw.get("current_settlement", "")), + speed=float(party_raw.get("speed", 0.0)), + ) + + # -- economy ----------------------------------------------------------- + economy_raw = raw.get("economy", {}) + economy = EconomyState( + gold=int(economy_raw.get("gold", 0)), + daily_income=int(economy_raw.get("daily_income", 0)), + daily_expenses=int(economy_raw.get("daily_expenses", 0)), + ) + + # -- nearby parties ---------------------------------------------------- + nearby_parties = [] + for p in raw.get("nearby_parties", []): + try: + nearby_parties.append( + NearbyParty( + party_id=str(p.get("id", "")), + name=str(p.get("name", "")), + faction=str(p.get("faction", "")), + is_hostile=bool(p.get("is_hostile", False)), + troop_count=int(p.get("troop_count", 0)), + distance=float(p.get("distance", 999.0)), + ) + ) + except (KeyError, ValueError, TypeError, AttributeError) as exc: + logger.debug("Skipping malformed nearby_party entry: %s", exc) + + # -- settlements ------------------------------------------------------- + settlements = [] + for s in raw.get("settlements", []): + try: + settlements.append( + Settlement( + settlement_id=str(s.get("id", "")), + name=str(s.get("name", "")), + faction=str(s.get("faction", "")), + is_friendly=bool(s.get("is_friendly", False)), + distance=float(s.get("distance", 999.0)), + has_recruits=bool(s.get("has_recruits", False)), + has_trade_goods=bool(s.get("has_trade_goods", False)), + ) + ) + except (KeyError, ValueError, TypeError, AttributeError) as exc: + logger.debug("Skipping malformed settlement entry: %s", exc) + + return CampaignState( + tick=int(raw.get("tick", 0)), + timestamp=datetime.now(UTC), + party=party, + economy=economy, + nearby_parties=nearby_parties, + settlements=settlements, + raw=raw, + ) diff --git a/src/bannerlord/decision.py b/src/bannerlord/decision.py new file mode 100644 index 00000000..b92aeffc --- /dev/null +++ b/src/bannerlord/decision.py @@ -0,0 +1,284 @@ +"""LLM-powered campaign decision engine for Bannerlord M2. + +Builds a structured prompt from the current ``CampaignState`` and asks +the local Qwen3 model (via Ollama) to choose one action from the M2 +action vocabulary. Returns a ``CampaignDecision`` pydantic model with +the chosen action and its parameters. + +The decision model is intentionally simple for M2: + MOVE → move to a named settlement + TRADE → buy a trade item + RECRUIT → hire troops at current/nearby settlement + ENGAGE → fight a nearby bandit party + WAIT → idle (e.g. low food, waiting for morale to recover) + +Qwen3 responds in JSON mode with temperature=0.1 for deterministic play. +""" + +from __future__ import annotations + +import json +import logging +from enum import StrEnum +from typing import Any + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Decision schema +# --------------------------------------------------------------------------- + + +class M2Action(StrEnum): + """Vocabulary of actions available in the M2 milestone.""" + + MOVE = "MOVE" + TRADE = "TRADE" + RECRUIT = "RECRUIT" + ENGAGE = "ENGAGE" + WAIT = "WAIT" + + +class CampaignDecision: + """Parsed LLM decision for one campaign tick. + + Attributes + ---------- + action: + One of the ``M2Action`` values. + settlement_id: + Target settlement ID (for MOVE / RECRUIT / TRADE). + settlement_name: + Human-readable settlement name (for logging). + item_id: + Trade item to buy (for TRADE). + quantity: + Trade quantity (for TRADE). + party_id: + Target party ID (for ENGAGE). + party_name: + Human-readable party name (for ENGAGE / logging). + reasoning: + LLM's brief explanation of the choice. + """ + + def __init__( + self, + action: M2Action = M2Action.WAIT, + *, + settlement_id: str = "", + settlement_name: str = "", + item_id: str = "", + quantity: int = 1, + party_id: str = "", + party_name: str = "", + reasoning: str = "", + ) -> None: + self.action = action + self.settlement_id = settlement_id + self.settlement_name = settlement_name + self.item_id = item_id + self.quantity = quantity + self.party_id = party_id + self.party_name = party_name + self.reasoning = reasoning + + def __repr__(self) -> str: + return ( + f"CampaignDecision(action={self.action!r}, " + f"reasoning={self.reasoning[:60]!r})" + ) + + +# --------------------------------------------------------------------------- +# Prompt builder +# --------------------------------------------------------------------------- + + +def build_decision_prompt(state: "Any") -> list[dict[str, str]]: + """Return an OpenAI-style message list for the decision LLM. + + Parameters + ---------- + state: + A ``CampaignState`` instance. + """ + # Build a compact context block + party = state.party + econ = state.economy + ctx_lines = [ + f"Campaign tick: {state.tick}", + f"Party size: {party.party_size} troops ({party.wounded} wounded)", + f"Food: {party.food_days:.1f} days remaining", + f"Morale: {party.morale:.0f}/100", + f"Gold: {econ.gold:,} denars (net {econ.net_income:+d}/day)", + f"Current location: {party.current_settlement or 'travelling'}", + "", + "== M2 GOALS ==", + f"Troops: {state.troops_progress} (need 100)", + f"Gold: {state.gold_progress} (need 10,000)", + "", + ] + + # Nearby bandits + bandits = state.hostile_bandits_nearby() + if bandits: + ctx_lines.append("== NEARBY HOSTILE BANDITS ==") + for b in bandits[:3]: + ctx_lines.append( + f" - {b.name} (id={b.party_id}, {b.troop_count} troops, " + f"{b.distance:.1f} away)" + ) + ctx_lines.append("") + + # Settlements + settlements = state.settlements[:5] + if settlements: + ctx_lines.append("== REACHABLE SETTLEMENTS ==") + for s in settlements: + flags = [] + if s.has_recruits: + flags.append("recruits") + if s.has_trade_goods: + flags.append("trade") + if not s.is_friendly: + flags.append("hostile-faction") + flag_str = f" [{', '.join(flags)}]" if flags else "" + ctx_lines.append( + f" - {s.name} (id={s.settlement_id}, " + f"{s.distance:.1f} away{flag_str})" + ) + ctx_lines.append("") + + context = "\n".join(ctx_lines) + + system_prompt = ( + "You are the campaign manager for Timmy, an autonomous Bannerlord agent. " + "Your job is to choose the single best action for this campaign tick. " + "Respond ONLY with a JSON object — no prose, no markdown fences.\n\n" + "JSON schema:\n" + '{\n' + ' "action": "MOVE|TRADE|RECRUIT|ENGAGE|WAIT",\n' + ' "settlement_id": "",\n' + ' "settlement_name": "",\n' + ' "item_id": "",\n' + ' "quantity": ,\n' + ' "party_id": "",\n' + ' "party_name": "",\n' + ' "reasoning": ""\n' + "}\n\n" + "Priority rules:\n" + "1. ENGAGE bandits only if they are weak (< 15 troops) and we have > 25 troops.\n" + "2. RECRUIT when a nearby settlement has recruits and party < 80 troops.\n" + "3. TRADE when gold < 5000 and a settlement has trade goods.\n" + "4. MOVE toward the nearest settlement with recruits or trade goods.\n" + "5. WAIT only if food < 1 day or morale < 40." + ) + + user_prompt = f"Current game state:\n\n{context}\nChoose the best action." + + return [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ] + + +# --------------------------------------------------------------------------- +# Response parser +# --------------------------------------------------------------------------- + + +def parse_decision(raw_response: str) -> CampaignDecision: + """Parse the LLM JSON response into a ``CampaignDecision``. + + Falls back to ``WAIT`` on any parse error so the loop never crashes. + """ + # Strip accidental markdown code fences + text = raw_response.strip() + if text.startswith("```"): + lines = text.splitlines() + text = "\n".join( + line for line in lines if not line.startswith("```") + ).strip() + + try: + data = json.loads(text) + except json.JSONDecodeError as exc: + logger.warning("Decision parse error (bad JSON): %s | raw=%r", exc, raw_response[:200]) + return CampaignDecision(action=M2Action.WAIT, reasoning="parse error") + + try: + action_str = str(data.get("action", "WAIT")).upper() + try: + action = M2Action(action_str) + except ValueError: + logger.warning("Unknown action %r — defaulting to WAIT", action_str) + action = M2Action.WAIT + + return CampaignDecision( + action=action, + settlement_id=str(data.get("settlement_id", "")), + settlement_name=str(data.get("settlement_name", "")), + item_id=str(data.get("item_id", "")), + quantity=max(1, int(data.get("quantity", 1))), + party_id=str(data.get("party_id", "")), + party_name=str(data.get("party_name", "")), + reasoning=str(data.get("reasoning", "")), + ) + except (KeyError, ValueError, TypeError) as exc: + logger.warning("Decision parse error (bad fields): %s", exc) + return CampaignDecision(action=M2Action.WAIT, reasoning=f"field error: {exc}") + + +# --------------------------------------------------------------------------- +# Main entry point +# --------------------------------------------------------------------------- + + +async def decide(state: "Any") -> CampaignDecision: + """Ask the local LLM to choose a campaign action. + + Uses the cascade router (Ollama → Claude fallback) configured in + ``config/providers.yaml``. Gracefully returns WAIT on any LLM failure. + + Parameters + ---------- + state: + A ``CampaignState`` instance. + + Returns + ------- + CampaignDecision + The chosen action and its parameters. + """ + from config import settings + + messages = build_decision_prompt(state) + model = settings.bannerlord_model + + try: + from infrastructure.router import get_router + + router = get_router() + response = await router.complete( + messages=messages, + model=model, + temperature=0.1, + ) + raw_text: str = response.get("content", "") + decision = parse_decision(raw_text) + logger.info( + "Decision [tick=%d]: %s — %s", + state.tick, + decision.action, + decision.reasoning, + ) + return decision + + except Exception as exc: # noqa: BLE001 + logger.warning("Decision LLM call failed: %s — defaulting to WAIT", exc) + return CampaignDecision( + action=M2Action.WAIT, + reasoning=f"LLM unavailable: {exc}", + ) diff --git a/src/bannerlord/gabs_client.py b/src/bannerlord/gabs_client.py new file mode 100644 index 00000000..91a37aec --- /dev/null +++ b/src/bannerlord/gabs_client.py @@ -0,0 +1,195 @@ +"""GABS TCP/JSON-RPC client for Bannerlord. + +Connects to the GABS C# mod (Bannerlord.GABS) over TCP on port 4825 +and dispatches JSON-RPC 2.0 requests. All I/O is async; synchronous +callers must wrap in ``asyncio.to_thread()``. + +Architecture: + Bannerlord (Windows VM) ← GABS C# mod ← TCP:4825 ← this client + +Usage:: + + async with GabsClient() as client: + state = await client.get_game_state() + result = await client.call("party/move_to_settlement", + {"settlement_id": "town_A1"}) +""" + +from __future__ import annotations + +import asyncio +import json +import logging +from typing import Any + +from config import settings + +logger = logging.getLogger(__name__) + +# JSON-RPC framing: each message is newline-delimited UTF-8 JSON. +_ENCODING = "utf-8" +_NEWLINE = b"\n" +_DEFAULT_TIMEOUT = 30.0 + + +class GabsError(Exception): + """Raised when GABS returns a JSON-RPC error response.""" + + def __init__(self, code: int, message: str, data: Any = None) -> None: + super().__init__(f"GABS error {code}: {message}") + self.code = code + self.data = data + + +class GabsClient: + """Async TCP JSON-RPC 2.0 client for the GABS Bannerlord mod. + + Parameters + ---------- + host: + GABS server host (Windows VM IP or ``localhost`` for port-forwarded). + port: + GABS server port (default 4825). + timeout: + Per-call timeout in seconds. + """ + + def __init__( + self, + *, + host: str | None = None, + port: int | None = None, + timeout: float = _DEFAULT_TIMEOUT, + ) -> None: + self._host = host or settings.gabs_host + self._port = port or settings.gabs_port + self._timeout = timeout + self._reader: asyncio.StreamReader | None = None + self._writer: asyncio.StreamWriter | None = None + self._req_id = 0 + self._connected = False + + # -- lifecycle --------------------------------------------------------- + + async def connect(self) -> None: + """Open the TCP connection to GABS.""" + try: + self._reader, self._writer = await asyncio.wait_for( + asyncio.open_connection(self._host, self._port), + timeout=self._timeout, + ) + self._connected = True + logger.info("GabsClient connected to %s:%d", self._host, self._port) + except (OSError, asyncio.TimeoutError) as exc: + logger.warning("GabsClient could not connect to GABS: %s", exc) + self._connected = False + raise + + async def disconnect(self) -> None: + """Close the TCP connection.""" + if self._writer is not None: + try: + self._writer.close() + await self._writer.wait_closed() + except Exception as exc: # noqa: BLE001 + logger.debug("GabsClient disconnect error (ignored): %s", exc) + self._connected = False + self._reader = None + self._writer = None + logger.info("GabsClient disconnected") + + @property + def is_connected(self) -> bool: + return self._connected + + # -- context manager --------------------------------------------------- + + async def __aenter__(self) -> "GabsClient": + await self.connect() + return self + + async def __aexit__(self, *_: Any) -> None: + await self.disconnect() + + # -- public API -------------------------------------------------------- + + async def call(self, method: str, params: dict[str, Any] | None = None) -> Any: + """Call a GABS tool and return the result. + + Parameters + ---------- + method: + GABS tool name, e.g. ``"party/move_to_settlement"``. + params: + Tool parameters dict. + + Returns + ------- + Any + The ``result`` field from the JSON-RPC response. + + Raises + ------ + GabsError + If GABS returns an error response. + RuntimeError + If not connected. + """ + if not self._connected or self._writer is None or self._reader is None: + raise RuntimeError("GabsClient is not connected — call connect() first") + + self._req_id += 1 + request = { + "jsonrpc": "2.0", + "id": self._req_id, + "method": method, + "params": params or {}, + } + + raw = json.dumps(request).encode(_ENCODING) + _NEWLINE + + try: + self._writer.write(raw) + await asyncio.wait_for(self._writer.drain(), timeout=self._timeout) + + line = await asyncio.wait_for( + self._reader.readline(), timeout=self._timeout + ) + except asyncio.TimeoutError as exc: + raise RuntimeError(f"GABS call '{method}' timed out after {self._timeout}s") from exc + except (OSError, ConnectionResetError) as exc: + self._connected = False + raise RuntimeError(f"GABS connection lost during '{method}': {exc}") from exc + + response = json.loads(line.decode(_ENCODING)) + + if "error" in response: + err = response["error"] + raise GabsError( + code=err.get("code", -1), + message=err.get("message", "unknown error"), + data=err.get("data"), + ) + + return response.get("result") + + async def get_game_state(self) -> dict[str, Any]: + """Return the full game state snapshot from GABS. + + Returns an empty dict and logs a warning if GABS is unreachable. + """ + try: + result = await self.call("game/get_state") + return result if isinstance(result, dict) else {} + except (GabsError, RuntimeError) as exc: + logger.warning("GABS get_game_state failed: %s", exc) + return {} + + async def ping(self) -> bool: + """Return True if GABS responds to a ping.""" + try: + await self.call("game/ping") + return True + except Exception as exc: # noqa: BLE001 + logger.debug("GABS ping failed: %s", exc) + return False diff --git a/src/config.py b/src/config.py index f0c922cc..233f5b9f 100644 --- a/src/config.py +++ b/src/config.py @@ -374,6 +374,17 @@ class Settings(BaseSettings): error_feedback_enabled: bool = True # Auto-create bug report tasks error_dedup_window_seconds: int = 300 # 5-min dedup window + # ── Bannerlord / GABS ───────────────────────────────────────────── + # GABS (Bannerlord Agent Bridge System) TCP/JSON-RPC server. + # Runs inside the Windows VM hosting Bannerlord. + # Override with GABS_HOST / GABS_PORT env vars. + gabs_host: str = "localhost" + gabs_port: int = 4825 + # Decision model for the Bannerlord campaign agent (Qwen3 preferred). + bannerlord_model: str = "qwen3:14b" + # Campaign-tick interval in seconds (real-time pause between in-game days). + bannerlord_tick_seconds: float = 5.0 + # ── Scripture / Biblical Integration ────────────────────────────── # Enable the biblical text module. scripture_enabled: bool = True diff --git a/src/infrastructure/world/adapters/bannerlord.py b/src/infrastructure/world/adapters/bannerlord.py new file mode 100644 index 00000000..5399c75a --- /dev/null +++ b/src/infrastructure/world/adapters/bannerlord.py @@ -0,0 +1,234 @@ +"""Bannerlord world adapter — bridges GABS to the WorldInterface contract. + +Allows the existing ``Heartbeat`` loop to drive the Bannerlord campaign +by treating it as just another game world. Wraps the async ``GabsClient`` +for synchronous use (the ``Heartbeat`` calls ``observe()`` and ``act()`` +synchronously). + +Async callers should use ``CampaignLoop`` directly — it is more efficient +and handles the full M2 logic natively. + +Usage:: + + adapter = BannerlordWorldAdapter() + adapter.connect() + heartbeat = Heartbeat(world=adapter, interval=5.0) + await heartbeat.run_once() + adapter.disconnect() +""" + +from __future__ import annotations + +import asyncio +import logging + +from infrastructure.world.interface import WorldInterface +from infrastructure.world.types import ( + ActionResult, + ActionStatus, + CommandInput, + PerceptionOutput, +) + +logger = logging.getLogger(__name__) + + +class BannerlordWorldAdapter(WorldInterface): + """WorldInterface adapter for Bannerlord via GABS. + + Wraps ``GabsClient`` and ``CampaignState`` to present the Bannerlord + campaign map as a ``WorldInterface``-compatible world. + + Parameters + ---------- + host: + Override GABS server host (defaults to ``settings.gabs_host``). + port: + Override GABS server port (defaults to ``settings.gabs_port``). + """ + + def __init__( + self, + *, + host: str | None = None, + port: int | None = None, + ) -> None: + from config import settings + + self._host = host or settings.gabs_host + self._port = port or settings.gabs_port + self._connected = False + self._client = None + self._loop: asyncio.AbstractEventLoop | None = None + + # -- lifecycle --------------------------------------------------------- + + def connect(self) -> None: + """Open the GABS TCP connection (synchronous wrapper).""" + from bannerlord.gabs_client import GabsClient + + self._client = GabsClient(host=self._host, port=self._port) + try: + self._loop = asyncio.get_event_loop() + except RuntimeError: + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + try: + self._loop.run_until_complete(self._client.connect()) + self._connected = True + logger.info("BannerlordWorldAdapter connected to GABS") + except Exception as exc: # noqa: BLE001 + logger.warning("BannerlordWorldAdapter: GABS connect failed: %s", exc) + self._connected = False + + def disconnect(self) -> None: + """Close the GABS TCP connection (synchronous wrapper).""" + if self._client is not None and self._loop is not None: + try: + self._loop.run_until_complete(self._client.disconnect()) + except Exception as exc: # noqa: BLE001 + logger.debug("BannerlordWorldAdapter disconnect error: %s", exc) + self._connected = False + + @property + def is_connected(self) -> bool: + return self._connected + + # -- core contract ----------------------------------------------------- + + def observe(self) -> PerceptionOutput: + """Poll GABS for current game state and return structured perception.""" + from bannerlord.campaign_state import parse_campaign_state + + if not self._connected or self._client is None or self._loop is None: + return PerceptionOutput( + location="disconnected", + entities=[], + events=["gabs_disconnected"], + raw={"error": "GABS not connected"}, + ) + + try: + raw = self._loop.run_until_complete(self._client.get_game_state()) + state = parse_campaign_state(raw) + + # Build entities list from settlements and nearby parties + entities: list[str] = [] + for s in state.settlements[:5]: + entities.append(f"settlement:{s.name}") + for p in state.nearby_parties[:3]: + prefix = "hostile" if p.is_hostile else "friendly" + entities.append(f"{prefix}_party:{p.name}") + + # Build events list + events: list[str] = [] + if state.party.food_days < 2.0: + events.append("low_food") + if state.party.morale < 40: + events.append("low_morale") + if state.hostile_bandits_nearby(): + events.append("bandits_nearby") + if state.m2_complete: + events.append("m2_complete") + + location = state.party.current_settlement or "campaign_map" + + return PerceptionOutput( + location=location, + entities=entities, + events=events, + raw=raw, + ) + + except Exception as exc: # noqa: BLE001 + logger.warning("BannerlordWorldAdapter.observe() failed: %s", exc) + return PerceptionOutput( + location="unknown", + entities=[], + events=[f"observe_error:{exc}"], + raw={"error": str(exc)}, + ) + + def act(self, command: CommandInput) -> ActionResult: + """Dispatch a campaign command to GABS. + + Recognized ``command.action`` values: + - ``"move"`` → party/move_to_settlement (target = settlement_id) + - ``"trade"`` → inventory/buy_item (target = item_id) + - ``"recruit"`` → party/recruit_all + - ``"engage"`` → party/engage_party (target = party_id) + + Parameters + ---------- + command: + WorldInterface ``CommandInput`` with action, target, parameters. + """ + if not self._connected or self._client is None or self._loop is None: + return ActionResult( + status=ActionStatus.FAILURE, + message="GABS not connected", + ) + + try: + return self._loop.run_until_complete(self._async_act(command)) + except Exception as exc: # noqa: BLE001 + logger.warning("BannerlordWorldAdapter.act() failed: %s", exc) + return ActionResult( + status=ActionStatus.FAILURE, + message=f"act failed: {exc}", + ) + + async def _async_act(self, command: CommandInput) -> ActionResult: + """Async implementation of act().""" + from bannerlord.campaign_actions import ( + buy_item, + engage_party, + move_to_settlement, + recruit_all, + ) + + action = command.action.lower() + params = command.parameters + + if action == "move": + settlement_id = command.target or params.get("settlement_id", "") + return await move_to_settlement( + self._client, + settlement_id, + settlement_name=params.get("settlement_name", ""), + ) + + elif action == "trade": + item_id = command.target or params.get("item_id", "") + quantity = int(params.get("quantity", 1)) + return await buy_item( + self._client, + item_id, + quantity, + settlement_id=params.get("settlement_id", ""), + ) + + elif action == "recruit": + return await recruit_all( + self._client, + settlement_id=params.get("settlement_id", ""), + ) + + elif action == "engage": + party_id = command.target or params.get("party_id", "") + return await engage_party( + self._client, + party_id, + party_name=params.get("party_name", ""), + ) + + else: + return ActionResult( + status=ActionStatus.NOOP, + message=f"Unknown action: {command.action}", + ) + + def speak(self, message: str, target: str | None = None) -> None: + """Log the message — GABS has no chat mechanism in M2.""" + logger.info("BannerlordWorldAdapter.speak: %r (target=%r)", message, target) diff --git a/tests/bannerlord/__init__.py b/tests/bannerlord/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/bannerlord/test_campaign_actions.py b/tests/bannerlord/test_campaign_actions.py new file mode 100644 index 00000000..bc678890 --- /dev/null +++ b/tests/bannerlord/test_campaign_actions.py @@ -0,0 +1,102 @@ +"""Unit tests for bannerlord.campaign_actions.""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from bannerlord.campaign_actions import ( + GabsTool, + buy_item, + engage_party, + move_to_settlement, + recruit_all, +) +from infrastructure.world.types import ActionStatus + + +def _mock_client(return_value=None, raise_exc=None): + """Build a mock GabsClient.""" + client = MagicMock() + if raise_exc is not None: + client.call = AsyncMock(side_effect=raise_exc) + else: + client.call = AsyncMock(return_value=return_value) + return client + + +class TestMoveToSettlement: + async def test_success(self): + client = _mock_client({"eta_days": 2}) + result = await move_to_settlement(client, "town_A1", settlement_name="Marunath") + assert result.status == ActionStatus.SUCCESS + client.call.assert_called_once_with( + GabsTool.MOVE_TO_SETTLEMENT, {"settlement_id": "town_A1"} + ) + + async def test_failure_on_gabs_error(self): + client = _mock_client(raise_exc=RuntimeError("GABS timeout")) + result = await move_to_settlement(client, "town_A1") + assert result.status == ActionStatus.FAILURE + assert "GABS timeout" in result.message + + async def test_uses_settlement_id_as_label_when_no_name(self): + client = _mock_client({}) + result = await move_to_settlement(client, "town_B2") + assert result.status == ActionStatus.SUCCESS + assert "town_B2" in result.message + + +class TestBuyItem: + async def test_success(self): + client = _mock_client({"cost": 100}) + result = await buy_item(client, "grain", 5) + assert result.status == ActionStatus.SUCCESS + assert "grain" in result.message + client.call.assert_called_once_with( + GabsTool.BUY_ITEM, {"item_id": "grain", "quantity": 5} + ) + + async def test_includes_settlement_id_when_given(self): + client = _mock_client({}) + await buy_item(client, "iron", 2, settlement_id="town_A1") + call_params = client.call.call_args[0][1] + assert call_params["settlement_id"] == "town_A1" + + async def test_failure_logged_gracefully(self): + client = _mock_client(raise_exc=Exception("inventory full")) + result = await buy_item(client, "wool", 10) + assert result.status == ActionStatus.FAILURE + + +class TestRecruitAll: + async def test_success(self): + client = _mock_client({"recruited": 15}) + result = await recruit_all(client) + assert result.status == ActionStatus.SUCCESS + assert "15" in result.message + + async def test_success_with_settlement(self): + client = _mock_client({"recruited": 8}) + result = await recruit_all(client, settlement_id="town_A1") + call_params = client.call.call_args[0][1] + assert call_params["settlement_id"] == "town_A1" + + async def test_failure_graceful(self): + client = _mock_client(raise_exc=RuntimeError("no recruits")) + result = await recruit_all(client) + assert result.status == ActionStatus.FAILURE + + +class TestEngageParty: + async def test_success(self): + client = _mock_client({"outcome": "victory", "loot": 200}) + result = await engage_party(client, "bandit_1", party_name="Forest Bandits") + assert result.status == ActionStatus.SUCCESS + assert "victory" in result.message + + async def test_failure_graceful(self): + client = _mock_client(raise_exc=RuntimeError("party not found")) + result = await engage_party(client, "bandit_1") + assert result.status == ActionStatus.FAILURE diff --git a/tests/bannerlord/test_campaign_loop.py b/tests/bannerlord/test_campaign_loop.py new file mode 100644 index 00000000..a3797ffc --- /dev/null +++ b/tests/bannerlord/test_campaign_loop.py @@ -0,0 +1,200 @@ +"""Unit tests for bannerlord.campaign_loop.""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from bannerlord.campaign_loop import CampaignLoop, TickResult +from bannerlord.decision import CampaignDecision, M2Action +from infrastructure.world.types import ActionResult, ActionStatus + + +def _make_game_state(*, troops: int = 30, gold: int = 2000) -> dict: + return { + "tick": 0, + "party": { + "size": troops, + "wounded": 0, + "food_days": 5.0, + "morale": 80.0, + "current_settlement": "town_A1", + }, + "economy": {"gold": gold, "daily_income": 200, "daily_expenses": 150}, + "nearby_parties": [], + "settlements": [ + { + "id": "town_A1", + "name": "Marunath", + "faction": "aserai", + "is_friendly": True, + "distance": 0.0, + "has_recruits": True, + "has_trade_goods": False, + } + ], + } + + +class TestCampaignLoopDispatch: + """Tests for the internal _dispatch() routing.""" + + def _loop(self) -> CampaignLoop: + return CampaignLoop(tick_seconds=0.0, max_ticks=1) + + async def test_dispatch_move(self): + loop = self._loop() + client = MagicMock() + decision = CampaignDecision( + action=M2Action.MOVE, + settlement_id="town_A1", + settlement_name="Marunath", + ) + + with patch("bannerlord.campaign_loop.move_to_settlement", new_callable=AsyncMock) as mock_move: + mock_move.return_value = ActionResult(status=ActionStatus.SUCCESS, message="ok") + await loop._dispatch(decision, client) + mock_move.assert_called_once_with(client, "town_A1", settlement_name="Marunath") + + async def test_dispatch_recruit(self): + loop = self._loop() + client = MagicMock() + decision = CampaignDecision( + action=M2Action.RECRUIT, + settlement_id="town_A1", + ) + + with patch("bannerlord.campaign_loop.recruit_all", new_callable=AsyncMock) as mock_recruit: + mock_recruit.return_value = ActionResult(status=ActionStatus.SUCCESS, message="15 recruited") + await loop._dispatch(decision, client) + mock_recruit.assert_called_once() + + async def test_dispatch_engage(self): + loop = self._loop() + client = MagicMock() + decision = CampaignDecision( + action=M2Action.ENGAGE, + party_id="bandit_1", + party_name="Forest Bandits", + ) + + with patch("bannerlord.campaign_loop.engage_party", new_callable=AsyncMock) as mock_engage: + mock_engage.return_value = ActionResult(status=ActionStatus.SUCCESS, message="victory") + await loop._dispatch(decision, client) + mock_engage.assert_called_once_with(client, "bandit_1", party_name="Forest Bandits") + + async def test_dispatch_trade(self): + loop = self._loop() + client = MagicMock() + decision = CampaignDecision( + action=M2Action.TRADE, + item_id="grain", + quantity=5, + ) + + with patch("bannerlord.campaign_loop.buy_item", new_callable=AsyncMock) as mock_buy: + mock_buy.return_value = ActionResult(status=ActionStatus.SUCCESS, message="bought") + await loop._dispatch(decision, client) + mock_buy.assert_called_once_with(client, "grain", 5, settlement_id="") + + async def test_dispatch_wait_returns_noop(self): + loop = self._loop() + client = MagicMock() + decision = CampaignDecision(action=M2Action.WAIT, reasoning="low food") + result = await loop._dispatch(decision, client) + assert result.status == ActionStatus.NOOP + + async def test_dispatch_move_missing_settlement_id(self): + loop = self._loop() + client = MagicMock() + decision = CampaignDecision(action=M2Action.MOVE, settlement_id="") + result = await loop._dispatch(decision, client) + assert result.status == ActionStatus.FAILURE + + async def test_dispatch_engage_missing_party_id(self): + loop = self._loop() + client = MagicMock() + decision = CampaignDecision(action=M2Action.ENGAGE, party_id="") + result = await loop._dispatch(decision, client) + assert result.status == ActionStatus.FAILURE + + +class TestCampaignLoopRun: + """Integration-level tests for the full run() loop (mocked GABS).""" + + async def test_run_stops_at_max_ticks(self): + """Loop respects max_ticks and returns correct number of results.""" + game_state = _make_game_state() + + with ( + patch("bannerlord.campaign_loop.GabsClient") as MockClient, + patch("bannerlord.campaign_loop.decide", new_callable=AsyncMock) as mock_decide, + patch("bannerlord.campaign_loop.move_to_settlement", new_callable=AsyncMock) as mock_move, + ): + # Setup fake client + fake_client = AsyncMock() + fake_client.get_game_state = AsyncMock(return_value=game_state) + fake_client.connect = AsyncMock() + fake_client.disconnect = AsyncMock() + MockClient.return_value = fake_client + + mock_decide.return_value = CampaignDecision( + action=M2Action.MOVE, + settlement_id="town_B1", + settlement_name="Epicrotea", + reasoning="moving", + ) + mock_move.return_value = ActionResult(status=ActionStatus.SUCCESS, message="ok") + + loop = CampaignLoop(tick_seconds=0.0, max_ticks=3) + results = await loop.run() + + assert len(results) == 3 + assert all(isinstance(r, TickResult) for r in results) + + async def test_run_stops_when_m2_complete(self): + """Loop exits early when M2 conditions are met.""" + # State with M2 already complete + game_state = _make_game_state(troops=100, gold=10000) + + with ( + patch("bannerlord.campaign_loop.GabsClient") as MockClient, + patch("bannerlord.campaign_loop.decide", new_callable=AsyncMock) as mock_decide, + ): + fake_client = AsyncMock() + fake_client.get_game_state = AsyncMock(return_value=game_state) + fake_client.connect = AsyncMock() + fake_client.disconnect = AsyncMock() + MockClient.return_value = fake_client + + mock_decide.return_value = CampaignDecision( + action=M2Action.WAIT, + reasoning="done", + ) + + loop = CampaignLoop(tick_seconds=0.0, max_ticks=10) + results = await loop.run() + + # Should exit after first tick (m2_complete = True) + assert len(results) == 1 + assert results[0].m2_complete is True + + async def test_run_aborts_on_connect_failure(self): + """Loop returns empty history if GABS cannot be reached.""" + with patch("bannerlord.campaign_loop.GabsClient") as MockClient: + fake_client = AsyncMock() + fake_client.connect = AsyncMock(side_effect=OSError("refused")) + fake_client.disconnect = AsyncMock() + MockClient.return_value = fake_client + + loop = CampaignLoop(tick_seconds=0.0, max_ticks=5) + results = await loop.run() + + assert results == [] + + def test_stop_sets_running_false(self): + loop = CampaignLoop() + loop._running = True + loop.stop() + assert not loop.is_running diff --git a/tests/bannerlord/test_campaign_state.py b/tests/bannerlord/test_campaign_state.py new file mode 100644 index 00000000..fc9fd8b7 --- /dev/null +++ b/tests/bannerlord/test_campaign_state.py @@ -0,0 +1,150 @@ +"""Unit tests for bannerlord.campaign_state.""" + +from __future__ import annotations + +import pytest + +from bannerlord.campaign_state import ( + M2_GOLD_GOAL, + M2_TROOP_GOAL, + CampaignState, + NearbyParty, + Settlement, + parse_campaign_state, +) + + +class TestParseCampaignState: + def test_empty_dict_returns_defaults(self): + state = parse_campaign_state({}) + assert state.party.party_size == 0 + assert state.economy.gold == 0 + assert state.nearby_parties == [] + assert state.settlements == [] + + def test_full_payload_parsed(self): + raw = { + "tick": 5, + "party": { + "size": 30, + "wounded": 2, + "prisoners": 1, + "food_days": 3.5, + "morale": 75.0, + "current_settlement": "town_A1", + "speed": 5.2, + }, + "economy": { + "gold": 4500, + "daily_income": 200, + "daily_expenses": 150, + }, + "nearby_parties": [ + { + "id": "bandit_1", + "name": "Forest Bandits", + "faction": "bandit", + "is_hostile": True, + "troop_count": 10, + "distance": 3.0, + } + ], + "settlements": [ + { + "id": "town_A1", + "name": "Marunath", + "faction": "aserai", + "is_friendly": True, + "distance": 0.0, + "has_recruits": True, + "has_trade_goods": False, + } + ], + } + state = parse_campaign_state(raw) + + assert state.tick == 5 + assert state.party.party_size == 30 + assert state.party.wounded == 2 + assert state.economy.gold == 4500 + assert state.economy.net_income == 50 + assert len(state.nearby_parties) == 1 + assert state.nearby_parties[0].name == "Forest Bandits" + assert len(state.settlements) == 1 + assert state.settlements[0].name == "Marunath" + + def test_malformed_entries_skipped(self): + raw = { + "nearby_parties": [{"id": "ok", "name": "Good", "faction": "bandit", + "is_hostile": True, "troop_count": 5, "distance": 2.0}, + {"bad": "data"}], + "settlements": [None, "not_a_dict"], + } + state = parse_campaign_state(raw) + assert len(state.nearby_parties) == 1 + assert state.settlements == [] + + +class TestCampaignStateProperties: + def _make_state(self, *, troops: int, gold: int) -> CampaignState: + state = CampaignState() + state.party.party_size = troops + state.economy.gold = gold + return state + + def test_m2_not_complete_by_default(self): + state = self._make_state(troops=20, gold=0) + assert not state.m2_complete + + def test_m2_complete_when_both_goals_met(self): + state = self._make_state(troops=M2_TROOP_GOAL, gold=M2_GOLD_GOAL) + assert state.m2_complete + + def test_m2_not_complete_if_only_troops_met(self): + state = self._make_state(troops=M2_TROOP_GOAL, gold=M2_GOLD_GOAL - 1) + assert not state.m2_complete + + def test_m2_not_complete_if_only_gold_met(self): + state = self._make_state(troops=M2_TROOP_GOAL - 1, gold=M2_GOLD_GOAL) + assert not state.m2_complete + + def test_troops_progress_string(self): + state = self._make_state(troops=45, gold=0) + assert state.troops_progress == f"45/{M2_TROOP_GOAL}" + + def test_gold_progress_string(self): + state = self._make_state(troops=0, gold=3000) + assert "3,000" in state.gold_progress + + def test_hostile_bandits_nearby_filter(self): + state = CampaignState() + state.nearby_parties = [ + NearbyParty("b1", "Bandits", "bandit", True, 10, 2.0), + NearbyParty("l1", "Lord", "empire", False, 50, 1.0), + NearbyParty("b2", "Far Bandits", "bandit", True, 5, 10.0), + ] + nearby = state.hostile_bandits_nearby(max_distance=5.0) + assert len(nearby) == 1 + assert nearby[0].party_id == "b1" + + def test_nearest_settlement_returns_closest(self): + state = CampaignState() + state.settlements = [ + Settlement("s1", "Far Town", "empire", True, 10.0), + Settlement("s2", "Near Town", "empire", True, 2.0), + ] + nearest = state.nearest_settlement() + assert nearest.settlement_id == "s2" + + def test_nearest_recruit_settlement(self): + state = CampaignState() + state.settlements = [ + Settlement("s1", "Town A", "empire", True, 5.0, has_recruits=False), + Settlement("s2", "Town B", "empire", True, 8.0, has_recruits=True), + ] + recruit = state.nearest_recruit_settlement() + assert recruit.settlement_id == "s2" + + def test_nearest_settlement_none_when_empty(self): + state = CampaignState() + assert state.nearest_settlement() is None diff --git a/tests/bannerlord/test_decision.py b/tests/bannerlord/test_decision.py new file mode 100644 index 00000000..45a32baf --- /dev/null +++ b/tests/bannerlord/test_decision.py @@ -0,0 +1,154 @@ +"""Unit tests for bannerlord.decision.""" + +from __future__ import annotations + +import json + +import pytest + +from bannerlord.campaign_state import ( + CampaignState, + EconomyState, + NearbyParty, + PartyState, + Settlement, +) +from bannerlord.decision import ( + M2Action, + CampaignDecision, + build_decision_prompt, + parse_decision, +) + + +def _make_state( + *, + troops: int = 30, + gold: int = 2000, + food_days: float = 5.0, + morale: float = 80.0, + settlements: list | None = None, + nearby_parties: list | None = None, +) -> CampaignState: + state = CampaignState() + state.party = PartyState( + party_size=troops, + food_days=food_days, + morale=morale, + ) + state.economy = EconomyState(gold=gold, daily_income=200, daily_expenses=150) + state.settlements = settlements or [] + state.nearby_parties = nearby_parties or [] + return state + + +class TestBuildDecisionPrompt: + def test_returns_two_messages(self): + state = _make_state() + messages = build_decision_prompt(state) + assert len(messages) == 2 + assert messages[0]["role"] == "system" + assert messages[1]["role"] == "user" + + def test_user_message_includes_party_info(self): + state = _make_state(troops=45, gold=3000) + messages = build_decision_prompt(state) + user_content = messages[1]["content"] + assert "45" in user_content + assert "3,000" in user_content + + def test_bandits_appear_in_prompt_when_nearby(self): + state = _make_state( + nearby_parties=[NearbyParty("b1", "Forest Bandits", "bandit", True, 10, 2.0)] + ) + messages = build_decision_prompt(state) + user_content = messages[1]["content"] + assert "Forest Bandits" in user_content + + def test_settlements_appear_in_prompt(self): + state = _make_state( + settlements=[Settlement("s1", "Marunath", "aserai", True, 3.0, has_recruits=True)] + ) + messages = build_decision_prompt(state) + user_content = messages[1]["content"] + assert "Marunath" in user_content + + def test_system_prompt_contains_action_vocabulary(self): + state = _make_state() + messages = build_decision_prompt(state) + system = messages[0]["content"] + for action in ("MOVE", "TRADE", "RECRUIT", "ENGAGE", "WAIT"): + assert action in system + + +class TestParseDecision: + def test_valid_move_decision(self): + raw = json.dumps({ + "action": "MOVE", + "settlement_id": "town_A1", + "settlement_name": "Marunath", + "item_id": "", + "quantity": 1, + "party_id": "", + "party_name": "", + "reasoning": "Moving to recruit troops", + }) + decision = parse_decision(raw) + assert decision.action == M2Action.MOVE + assert decision.settlement_id == "town_A1" + assert decision.settlement_name == "Marunath" + + def test_valid_recruit_decision(self): + raw = json.dumps({ + "action": "RECRUIT", + "settlement_id": "town_A1", + "settlement_name": "Marunath", + "item_id": "", + "quantity": 1, + "party_id": "", + "party_name": "", + "reasoning": "Has recruits available", + }) + decision = parse_decision(raw) + assert decision.action == M2Action.RECRUIT + + def test_valid_engage_decision(self): + raw = json.dumps({ + "action": "ENGAGE", + "settlement_id": "", + "settlement_name": "", + "item_id": "", + "quantity": 1, + "party_id": "bandit_1", + "party_name": "Forest Bandits", + "reasoning": "Weak bandits — easy XP", + }) + decision = parse_decision(raw) + assert decision.action == M2Action.ENGAGE + assert decision.party_id == "bandit_1" + + def test_wait_on_invalid_json(self): + decision = parse_decision("not json at all") + assert decision.action == M2Action.WAIT + + def test_wait_on_unknown_action(self): + raw = json.dumps({"action": "TELEPORT", "reasoning": "hack"}) + decision = parse_decision(raw) + assert decision.action == M2Action.WAIT + + def test_strips_markdown_fences(self): + raw = '```json\n{"action": "WAIT", "reasoning": "low food"}\n```' + decision = parse_decision(raw) + assert decision.action == M2Action.WAIT + + def test_quantity_minimum_one(self): + raw = json.dumps({"action": "TRADE", "item_id": "grain", "quantity": -5, "reasoning": "x"}) + decision = parse_decision(raw) + assert decision.quantity == 1 + + def test_missing_optional_fields_default_to_empty(self): + raw = json.dumps({"action": "WAIT", "reasoning": "resting"}) + decision = parse_decision(raw) + assert decision.settlement_id == "" + assert decision.party_id == "" + assert decision.item_id == "" diff --git a/tests/bannerlord/test_gabs_client.py b/tests/bannerlord/test_gabs_client.py new file mode 100644 index 00000000..31b0bb2b --- /dev/null +++ b/tests/bannerlord/test_gabs_client.py @@ -0,0 +1,120 @@ +"""Unit tests for bannerlord.gabs_client.""" + +from __future__ import annotations + +import asyncio +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from bannerlord.gabs_client import GabsClient, GabsError + + +class TestGabsClientCall: + """Tests for GabsClient.call() using mock StreamReader/Writer.""" + + def _make_client(self, response: dict) -> GabsClient: + """Return a pre-connected GabsClient with mocked I/O.""" + client = GabsClient(host="localhost", port=4825, timeout=5.0) + client._connected = True + + writer = MagicMock() + writer.write = MagicMock() + writer.drain = AsyncMock() + + raw_response = json.dumps(response).encode() + b"\n" + reader = MagicMock() + reader.readline = AsyncMock(return_value=raw_response) + + client._reader = reader + client._writer = writer + return client + + async def test_successful_call_returns_result(self): + client = self._make_client({"jsonrpc": "2.0", "id": 1, "result": {"status": "ok"}}) + result = await client.call("game/ping") + assert result == {"status": "ok"} + + async def test_error_response_raises_gabs_error(self): + client = self._make_client({ + "jsonrpc": "2.0", + "id": 1, + "error": {"code": -32601, "message": "Method not found"}, + }) + with pytest.raises(GabsError) as exc_info: + await client.call("unknown/method") + assert exc_info.value.code == -32601 + + async def test_not_connected_raises_runtime_error(self): + client = GabsClient() + with pytest.raises(RuntimeError, match="not connected"): + await client.call("game/ping") + + async def test_request_id_increments(self): + client = self._make_client({"jsonrpc": "2.0", "id": 1, "result": {}}) + await client.call("game/ping") + # Reset reader for second call + client._reader.readline = AsyncMock( + return_value=json.dumps({"jsonrpc": "2.0", "id": 2, "result": {}}).encode() + b"\n" + ) + await client.call("game/ping") + assert client._req_id == 2 + + async def test_get_game_state_returns_empty_on_error(self): + client = GabsClient() + client._connected = True + + writer = MagicMock() + writer.write = MagicMock() + writer.drain = AsyncMock() + reader = MagicMock() + reader.readline = AsyncMock(side_effect=OSError("connection reset")) + + client._reader = reader + client._writer = writer + + result = await client.get_game_state() + assert result == {} + + async def test_ping_returns_true_on_success(self): + client = self._make_client({"jsonrpc": "2.0", "id": 1, "result": "pong"}) + result = await client.ping() + assert result is True + + async def test_ping_returns_false_on_failure(self): + client = GabsClient() + result = await client.ping() + assert result is False + + +class TestGabsClientLifecycle: + async def test_connect_failure_sets_not_connected(self): + client = GabsClient(host="localhost", port=9999, timeout=0.1) + with pytest.raises(Exception): + await client.connect() + assert not client.is_connected + + async def test_context_manager_calls_connect_and_disconnect(self): + client = GabsClient() + connect_called = False + disconnect_called = False + + async def _fake_connect(): + nonlocal connect_called + connect_called = True + client._connected = True + + async def _fake_disconnect(): + nonlocal disconnect_called + disconnect_called = True + client._connected = False + + client.connect = _fake_connect + client.disconnect = _fake_disconnect + + async with client as c: + assert c is client + assert connect_called + + assert disconnect_called