Compare commits

...

2 Commits

Author SHA1 Message Date
Alexander Whitestone
1a4653570e fix: fix test patching in campaign_loop and malformed entry filtering in campaign_state
Some checks failed
Tests / lint (pull_request) Failing after 29s
Tests / test (pull_request) Has been skipped
- Remove redundant local GabsClient import inside run() that bypassed
  unittest.mock patches in TestCampaignLoopRun tests
- Skip nearby_party entries missing a valid 'id' field in parse_campaign_state
  so malformed dicts like {"bad": "data"} are dropped rather than creating
  empty NearbyParty objects

Fixes #1094

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 21:34:18 -04:00
Alexander Whitestone
46ef9d3aba WIP: Claude Code progress on #1094
Automated salvage commit — agent session ended (exit 124).
Work in progress, may need continuation.
2026-03-23 14:26:15 -04:00
14 changed files with 2190 additions and 0 deletions

View File

@@ -0,0 +1,11 @@
"""Bannerlord campaign agent — M2: Basic Campaign Actions.
Provides GABS integration (TCP JSON-RPC, port 4825) and the observe →
decide → act loop for autonomous campaign play: move, trade, recruit,
and engage bandits.
Key GABS tools: party/move_to_settlement, inventory/buy_item,
party/recruit_all, party/engage_party.
Done-condition: party grows from 20 → 100 troops, gold reaches 10 000 denars.
"""

View File

@@ -0,0 +1,200 @@
"""Bannerlord M2 campaign action primitives.
Wraps the four key GABS tools for the M2 milestone:
- party/move_to_settlement → move the party to a named settlement
- inventory/buy_item → purchase trade goods
- party/recruit_all → hire all available recruits
- party/engage_party → engage a nearby bandit party
All functions are async and return an ``ActionResult`` that is compatible
with the ``WorldInterface`` contract.
Error handling follows Pattern 3 (Feature Disable): if GABS rejects an
action, log a warning and return a FAILURE result — never raise.
"""
from __future__ import annotations
import logging
from enum import StrEnum
from typing import TYPE_CHECKING
from infrastructure.world.types import ActionResult, ActionStatus
if TYPE_CHECKING:
from bannerlord.gabs_client import GabsClient
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# GABS method names — canonical reference
# ---------------------------------------------------------------------------
class GabsTool(StrEnum):
"""GABS JSON-RPC method names for the M2 action set."""
MOVE_TO_SETTLEMENT = "party/move_to_settlement"
BUY_ITEM = "inventory/buy_item"
RECRUIT_ALL = "party/recruit_all"
ENGAGE_PARTY = "party/engage_party"
# ---------------------------------------------------------------------------
# Action functions
# ---------------------------------------------------------------------------
async def move_to_settlement(
client: "GabsClient",
settlement_id: str,
*,
settlement_name: str = "",
) -> ActionResult:
"""Move the party to a target settlement.
Parameters
----------
client:
Connected ``GabsClient`` instance.
settlement_id:
GABS settlement identifier (e.g. ``"town_A1"``).
settlement_name:
Human-readable name for logging only.
"""
label = settlement_name or settlement_id
try:
result = await client.call(
GabsTool.MOVE_TO_SETTLEMENT,
{"settlement_id": settlement_id},
)
logger.info("MOVE → %s: %s", label, result)
return ActionResult(
status=ActionStatus.SUCCESS,
message=f"Moving to {label}",
data=result or {},
)
except Exception as exc: # noqa: BLE001
logger.warning("MOVE → %s failed: %s", label, exc)
return ActionResult(
status=ActionStatus.FAILURE,
message=f"Move to {label} failed: {exc}",
data={},
)
async def buy_item(
client: "GabsClient",
item_id: str,
quantity: int,
*,
settlement_id: str = "",
) -> ActionResult:
"""Purchase trade goods from the current or specified settlement.
Parameters
----------
client:
Connected ``GabsClient`` instance.
item_id:
Item identifier (e.g. ``"grain"``, ``"iron"``, ``"wool"``).
quantity:
Number of units to purchase.
settlement_id:
Optional target settlement; empty means current location.
"""
try:
params: dict = {"item_id": item_id, "quantity": quantity}
if settlement_id:
params["settlement_id"] = settlement_id
result = await client.call(GabsTool.BUY_ITEM, params)
logger.info("BUY %dx %s: %s", quantity, item_id, result)
return ActionResult(
status=ActionStatus.SUCCESS,
message=f"Purchased {quantity}x {item_id}",
data=result or {},
)
except Exception as exc: # noqa: BLE001
logger.warning("BUY %dx %s failed: %s", quantity, item_id, exc)
return ActionResult(
status=ActionStatus.FAILURE,
message=f"Buy {item_id} failed: {exc}",
data={},
)
async def recruit_all(
client: "GabsClient",
*,
settlement_id: str = "",
) -> ActionResult:
"""Recruit all available troops at the current or specified settlement.
Parameters
----------
client:
Connected ``GabsClient`` instance.
settlement_id:
Optional settlement to recruit from; empty means current.
"""
try:
params: dict = {}
if settlement_id:
params["settlement_id"] = settlement_id
result = await client.call(GabsTool.RECRUIT_ALL, params)
recruited = (result or {}).get("recruited", "?")
logger.info("RECRUIT_ALL: recruited %s troops", recruited)
return ActionResult(
status=ActionStatus.SUCCESS,
message=f"Recruited {recruited} troops",
data=result or {},
)
except Exception as exc: # noqa: BLE001
logger.warning("RECRUIT_ALL failed: %s", exc)
return ActionResult(
status=ActionStatus.FAILURE,
message=f"Recruit all failed: {exc}",
data={},
)
async def engage_party(
client: "GabsClient",
party_id: str,
*,
party_name: str = "",
) -> ActionResult:
"""Engage a nearby party (typically a bandit gang) in combat.
Auto-resolve is expected at high Tactics skill — the agent relies
on GABS to handle the battle outcome.
Parameters
----------
client:
Connected ``GabsClient`` instance.
party_id:
GABS party identifier of the target.
party_name:
Human-readable name for logging only.
"""
label = party_name or party_id
try:
result = await client.call(GabsTool.ENGAGE_PARTY, {"party_id": party_id})
outcome = (result or {}).get("outcome", "unknown")
logger.info("ENGAGE %s: %s", label, outcome)
return ActionResult(
status=ActionStatus.SUCCESS,
message=f"Engaged {label}: {outcome}",
data=result or {},
)
except Exception as exc: # noqa: BLE001
logger.warning("ENGAGE %s failed: %s", label, exc)
return ActionResult(
status=ActionStatus.FAILURE,
message=f"Engage {label} failed: {exc}",
data={},
)

View File

@@ -0,0 +1,316 @@
"""Bannerlord M2 campaign action loop.
Implements the observe → decide → act → wait pipeline described in
issue #1094. The loop runs until the M2 victory conditions are met
(100 troops + 10 000 gold) or until stopped externally.
Architecture:
CampaignLoop.run()
while not m2_complete:
state = gabs.get_game_state() # observe
decision = decide(state) # decide (local Qwen3)
result = dispatch(decision, gabs) # act (GABS)
await asyncio.sleep(tick_seconds) # wait
Error handling:
- GABS connection failures → log + retry with backoff (max 3 attempts)
- LLM failures → WAIT action (graceful degradation)
- Action failures → log + continue to next tick
Progress tracking:
Loop publishes heartbeat events via the event bus so the dashboard
can display live party size and gold.
"""
from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime
from bannerlord.campaign_actions import buy_item, engage_party, move_to_settlement, recruit_all
from bannerlord.campaign_state import parse_campaign_state
from bannerlord.decision import M2Action, decide
from bannerlord.gabs_client import GabsClient
from config import settings
from infrastructure.world.types import ActionResult, ActionStatus
logger = logging.getLogger(__name__)
_MAX_RECONNECT_ATTEMPTS = 3
_RECONNECT_DELAY = 5.0 # seconds
# ---------------------------------------------------------------------------
# Progress snapshot (emitted each tick)
# ---------------------------------------------------------------------------
@dataclass
class TickResult:
"""Summary of one campaign tick."""
tick: int
timestamp: str
party_size: int
gold: int
action: str
action_status: str
reasoning: str
duration_ms: int
m2_complete: bool = False
error: str = ""
# ---------------------------------------------------------------------------
# Campaign loop
# ---------------------------------------------------------------------------
class CampaignLoop:
"""Runs the Bannerlord M2 autonomous campaign.
Parameters
----------
gabs_host:
Override GABS server host.
gabs_port:
Override GABS server port.
tick_seconds:
Real-time pause between in-game ticks.
on_tick:
Optional async callback invoked after each tick with the
``TickResult``. Used by the dashboard for live updates.
max_ticks:
Hard cap for testing / benchmarking. 0 = unlimited.
"""
def __init__(
self,
*,
gabs_host: str | None = None,
gabs_port: int | None = None,
tick_seconds: float | None = None,
on_tick=None,
max_ticks: int = 0,
) -> None:
self._host = gabs_host or settings.gabs_host
self._port = gabs_port or settings.gabs_port
self._tick_seconds = tick_seconds if tick_seconds is not None else settings.bannerlord_tick_seconds
self._on_tick = on_tick
self._max_ticks = max_ticks
self._running = False
self.history: list[TickResult] = []
# -- public API --------------------------------------------------------
@property
def is_running(self) -> bool:
return self._running
def stop(self) -> None:
"""Signal the loop to stop after the current tick."""
self._running = False
logger.info("CampaignLoop stop requested")
async def run(self) -> list[TickResult]:
"""Start the campaign loop.
Returns the list of tick results (for testing / benchmarking).
Runs until M2 complete, externally stopped, or max_ticks reached.
"""
self._running = True
logger.info(
"CampaignLoop starting — gabs=%s:%d tick=%.1fs",
self._host,
self._port,
self._tick_seconds,
)
client = GabsClient(host=self._host, port=self._port)
try:
await self._connect_with_retry(client)
except RuntimeError as exc: # noqa: BLE001
logger.error("CampaignLoop: could not connect to GABS — aborting: %s", exc)
self._running = False
return self.history
tick_num = 0
try:
while self._running:
tick_num += 1
if self._max_ticks > 0 and tick_num > self._max_ticks:
logger.info("CampaignLoop: max_ticks=%d reached", self._max_ticks)
break
result = await self._run_tick(client, tick_num)
self.history.append(result)
await self._emit(result)
if result.m2_complete:
logger.info(
"M2 COMPLETE! Party=%d troops, Gold=%d denars",
result.party_size,
result.gold,
)
break
if result.error and not self._running:
break
await asyncio.sleep(self._tick_seconds)
finally:
await client.disconnect()
self._running = False
logger.info("CampaignLoop stopped after %d ticks", tick_num)
return self.history
# -- internal: single tick ---------------------------------------------
async def _run_tick(self, client: "Any", tick_num: int) -> TickResult:
"""Execute one observe → decide → act cycle."""
start = time.monotonic()
# 1. Observe
raw_state = await client.get_game_state()
state = parse_campaign_state(raw_state)
state = _override_tick(state, tick_num)
# 2. Decide
decision = await decide(state)
# 3. Act
action_result = await self._dispatch(decision, client)
duration_ms = int((time.monotonic() - start) * 1000)
return TickResult(
tick=tick_num,
timestamp=datetime.now(UTC).isoformat(),
party_size=state.party.party_size,
gold=state.economy.gold,
action=decision.action,
action_status=action_result.status.value,
reasoning=decision.reasoning,
duration_ms=duration_ms,
m2_complete=state.m2_complete,
)
async def _dispatch(self, decision: "Any", client: "Any") -> "Any":
"""Route the decision to the correct GABS action function."""
action = decision.action
if action == M2Action.MOVE:
if not decision.settlement_id:
logger.warning("MOVE decision has no settlement_id — skipping")
return ActionResult(
status=ActionStatus.FAILURE,
message="MOVE missing settlement_id",
)
return await move_to_settlement(
client,
decision.settlement_id,
settlement_name=decision.settlement_name,
)
elif action == M2Action.TRADE:
if not decision.item_id:
logger.warning("TRADE decision has no item_id — skipping")
return ActionResult(
status=ActionStatus.FAILURE,
message="TRADE missing item_id",
)
return await buy_item(
client,
decision.item_id,
decision.quantity,
settlement_id=decision.settlement_id,
)
elif action == M2Action.RECRUIT:
return await recruit_all(
client,
settlement_id=decision.settlement_id,
)
elif action == M2Action.ENGAGE:
if not decision.party_id:
logger.warning("ENGAGE decision has no party_id — skipping")
return ActionResult(
status=ActionStatus.FAILURE,
message="ENGAGE missing party_id",
)
return await engage_party(
client,
decision.party_id,
party_name=decision.party_name,
)
else: # WAIT or unknown
logger.debug("Tick %s: WAIT — %s", decision.action, decision.reasoning)
return ActionResult(
status=ActionStatus.NOOP,
message=f"WAIT: {decision.reasoning}",
)
# -- internal: connectivity --------------------------------------------
async def _connect_with_retry(self, client: "Any") -> None:
"""Try to connect, retrying up to _MAX_RECONNECT_ATTEMPTS times."""
for attempt in range(1, _MAX_RECONNECT_ATTEMPTS + 1):
try:
await client.connect()
return
except Exception as exc: # noqa: BLE001
logger.warning(
"GABS connect attempt %d/%d failed: %s",
attempt,
_MAX_RECONNECT_ATTEMPTS,
exc,
)
if attempt < _MAX_RECONNECT_ATTEMPTS:
await asyncio.sleep(_RECONNECT_DELAY)
raise RuntimeError(
f"Could not connect to GABS at {self._host}:{self._port} "
f"after {_MAX_RECONNECT_ATTEMPTS} attempts"
)
# -- internal: event emission ------------------------------------------
async def _emit(self, result: TickResult) -> None:
"""Emit tick data to the event bus (best-effort)."""
try:
from infrastructure.events.bus import event_bus # noqa: PLC0415
await event_bus.publish(
"bannerlord.tick",
{
"tick": result.tick,
"party_size": result.party_size,
"gold": result.gold,
"action": result.action,
"action_status": result.action_status,
"m2_complete": result.m2_complete,
"duration_ms": result.duration_ms,
},
)
except Exception as exc: # noqa: BLE001
logger.debug("CampaignLoop emit skipped: %s", exc)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _override_tick(state: "Any", tick_num: int) -> "Any":
"""Set the tick counter from the loop (GABS may not provide it)."""
if state.tick == 0:
state.tick = tick_num
return state

View File

@@ -0,0 +1,213 @@
"""Bannerlord campaign state models.
Parses the raw GABS ``game/get_state`` payload into typed models and
tracks the M2 progress counters: party size and gold accumulation.
Done-condition (from issue #1094):
party_size >= 100 AND gold >= 10_000
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
logger = logging.getLogger(__name__)
# M2 victory conditions
M2_TROOP_GOAL = 100
M2_GOLD_GOAL = 10_000
@dataclass
class PartyState:
"""Current party composition and position."""
party_size: int = 0
wounded: int = 0
prisoners: int = 0
food_days: float = 0.0
morale: float = 100.0
current_settlement: str = ""
speed: float = 0.0
@dataclass
class EconomyState:
"""Current gold and trade state."""
gold: int = 0
daily_income: int = 0
daily_expenses: int = 0
@property
def net_income(self) -> int:
return self.daily_income - self.daily_expenses
@dataclass
class NearbyParty:
"""A nearby lord/bandit party visible on the map."""
party_id: str
name: str
faction: str
is_hostile: bool
troop_count: int
distance: float
@dataclass
class Settlement:
"""A settlement visible or reachable from the current position."""
settlement_id: str
name: str
faction: str
is_friendly: bool
distance: float
has_recruits: bool = False
has_trade_goods: bool = False
@dataclass
class CampaignState:
"""Full parsed snapshot of the GABS game state.
Built from the raw ``dict`` returned by ``GabsClient.get_game_state()``.
"""
tick: int = 0
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
party: PartyState = field(default_factory=PartyState)
economy: EconomyState = field(default_factory=EconomyState)
nearby_parties: list[NearbyParty] = field(default_factory=list)
settlements: list[Settlement] = field(default_factory=list)
raw: dict[str, Any] = field(default_factory=dict)
# -- M2 progress -------------------------------------------------------
@property
def troops_progress(self) -> str:
"""Human-readable M2 troop progress."""
return f"{self.party.party_size}/{M2_TROOP_GOAL}"
@property
def gold_progress(self) -> str:
"""Human-readable M2 gold progress."""
return f"{self.economy.gold:,}/{M2_GOLD_GOAL:,}"
@property
def m2_complete(self) -> bool:
"""True when both M2 victory conditions are met."""
return self.party.party_size >= M2_TROOP_GOAL and self.economy.gold >= M2_GOLD_GOAL
# -- hostile detection -------------------------------------------------
def hostile_bandits_nearby(self, max_distance: float = 5.0) -> list[NearbyParty]:
"""Return hostile bandit parties within *max_distance* map units."""
return [
p
for p in self.nearby_parties
if p.is_hostile and "bandit" in p.faction.lower() and p.distance <= max_distance
]
def nearest_settlement(self, *, friendly_only: bool = False) -> Settlement | None:
"""Return the closest (optionally friendly) settlement."""
candidates = [s for s in self.settlements if not friendly_only or s.is_friendly]
if not candidates:
return None
return min(candidates, key=lambda s: s.distance)
def nearest_recruit_settlement(self) -> Settlement | None:
"""Return the nearest settlement that has recruits available."""
candidates = [s for s in self.settlements if s.has_recruits]
if not candidates:
return None
return min(candidates, key=lambda s: s.distance)
# ---------------------------------------------------------------------------
# Parser
# ---------------------------------------------------------------------------
def parse_campaign_state(raw: dict[str, Any]) -> CampaignState:
"""Build a ``CampaignState`` from the raw GABS state dict.
Unknown / missing fields are silently defaulted so the parser never
crashes when GABS returns partial data.
"""
if not raw:
logger.debug("parse_campaign_state: empty payload — returning default state")
return CampaignState(raw=raw)
# -- party -------------------------------------------------------------
party_raw = raw.get("party", {})
party = PartyState(
party_size=int(party_raw.get("size", 0)),
wounded=int(party_raw.get("wounded", 0)),
prisoners=int(party_raw.get("prisoners", 0)),
food_days=float(party_raw.get("food_days", 0.0)),
morale=float(party_raw.get("morale", 100.0)),
current_settlement=str(party_raw.get("current_settlement", "")),
speed=float(party_raw.get("speed", 0.0)),
)
# -- economy -----------------------------------------------------------
economy_raw = raw.get("economy", {})
economy = EconomyState(
gold=int(economy_raw.get("gold", 0)),
daily_income=int(economy_raw.get("daily_income", 0)),
daily_expenses=int(economy_raw.get("daily_expenses", 0)),
)
# -- nearby parties ----------------------------------------------------
nearby_parties = []
for p in raw.get("nearby_parties", []):
try:
if not isinstance(p, dict) or not p.get("id"):
logger.debug("Skipping malformed nearby_party entry: missing id")
continue
nearby_parties.append(
NearbyParty(
party_id=str(p.get("id", "")),
name=str(p.get("name", "")),
faction=str(p.get("faction", "")),
is_hostile=bool(p.get("is_hostile", False)),
troop_count=int(p.get("troop_count", 0)),
distance=float(p.get("distance", 999.0)),
)
)
except (KeyError, ValueError, TypeError, AttributeError) as exc:
logger.debug("Skipping malformed nearby_party entry: %s", exc)
# -- settlements -------------------------------------------------------
settlements = []
for s in raw.get("settlements", []):
try:
settlements.append(
Settlement(
settlement_id=str(s.get("id", "")),
name=str(s.get("name", "")),
faction=str(s.get("faction", "")),
is_friendly=bool(s.get("is_friendly", False)),
distance=float(s.get("distance", 999.0)),
has_recruits=bool(s.get("has_recruits", False)),
has_trade_goods=bool(s.get("has_trade_goods", False)),
)
)
except (KeyError, ValueError, TypeError, AttributeError) as exc:
logger.debug("Skipping malformed settlement entry: %s", exc)
return CampaignState(
tick=int(raw.get("tick", 0)),
timestamp=datetime.now(UTC),
party=party,
economy=economy,
nearby_parties=nearby_parties,
settlements=settlements,
raw=raw,
)

284
src/bannerlord/decision.py Normal file
View File

@@ -0,0 +1,284 @@
"""LLM-powered campaign decision engine for Bannerlord M2.
Builds a structured prompt from the current ``CampaignState`` and asks
the local Qwen3 model (via Ollama) to choose one action from the M2
action vocabulary. Returns a ``CampaignDecision`` pydantic model with
the chosen action and its parameters.
The decision model is intentionally simple for M2:
MOVE → move to a named settlement
TRADE → buy a trade item
RECRUIT → hire troops at current/nearby settlement
ENGAGE → fight a nearby bandit party
WAIT → idle (e.g. low food, waiting for morale to recover)
Qwen3 responds in JSON mode with temperature=0.1 for deterministic play.
"""
from __future__ import annotations
import json
import logging
from enum import StrEnum
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Decision schema
# ---------------------------------------------------------------------------
class M2Action(StrEnum):
"""Vocabulary of actions available in the M2 milestone."""
MOVE = "MOVE"
TRADE = "TRADE"
RECRUIT = "RECRUIT"
ENGAGE = "ENGAGE"
WAIT = "WAIT"
class CampaignDecision:
"""Parsed LLM decision for one campaign tick.
Attributes
----------
action:
One of the ``M2Action`` values.
settlement_id:
Target settlement ID (for MOVE / RECRUIT / TRADE).
settlement_name:
Human-readable settlement name (for logging).
item_id:
Trade item to buy (for TRADE).
quantity:
Trade quantity (for TRADE).
party_id:
Target party ID (for ENGAGE).
party_name:
Human-readable party name (for ENGAGE / logging).
reasoning:
LLM's brief explanation of the choice.
"""
def __init__(
self,
action: M2Action = M2Action.WAIT,
*,
settlement_id: str = "",
settlement_name: str = "",
item_id: str = "",
quantity: int = 1,
party_id: str = "",
party_name: str = "",
reasoning: str = "",
) -> None:
self.action = action
self.settlement_id = settlement_id
self.settlement_name = settlement_name
self.item_id = item_id
self.quantity = quantity
self.party_id = party_id
self.party_name = party_name
self.reasoning = reasoning
def __repr__(self) -> str:
return (
f"CampaignDecision(action={self.action!r}, "
f"reasoning={self.reasoning[:60]!r})"
)
# ---------------------------------------------------------------------------
# Prompt builder
# ---------------------------------------------------------------------------
def build_decision_prompt(state: "Any") -> list[dict[str, str]]:
"""Return an OpenAI-style message list for the decision LLM.
Parameters
----------
state:
A ``CampaignState`` instance.
"""
# Build a compact context block
party = state.party
econ = state.economy
ctx_lines = [
f"Campaign tick: {state.tick}",
f"Party size: {party.party_size} troops ({party.wounded} wounded)",
f"Food: {party.food_days:.1f} days remaining",
f"Morale: {party.morale:.0f}/100",
f"Gold: {econ.gold:,} denars (net {econ.net_income:+d}/day)",
f"Current location: {party.current_settlement or 'travelling'}",
"",
"== M2 GOALS ==",
f"Troops: {state.troops_progress} (need 100)",
f"Gold: {state.gold_progress} (need 10,000)",
"",
]
# Nearby bandits
bandits = state.hostile_bandits_nearby()
if bandits:
ctx_lines.append("== NEARBY HOSTILE BANDITS ==")
for b in bandits[:3]:
ctx_lines.append(
f" - {b.name} (id={b.party_id}, {b.troop_count} troops, "
f"{b.distance:.1f} away)"
)
ctx_lines.append("")
# Settlements
settlements = state.settlements[:5]
if settlements:
ctx_lines.append("== REACHABLE SETTLEMENTS ==")
for s in settlements:
flags = []
if s.has_recruits:
flags.append("recruits")
if s.has_trade_goods:
flags.append("trade")
if not s.is_friendly:
flags.append("hostile-faction")
flag_str = f" [{', '.join(flags)}]" if flags else ""
ctx_lines.append(
f" - {s.name} (id={s.settlement_id}, "
f"{s.distance:.1f} away{flag_str})"
)
ctx_lines.append("")
context = "\n".join(ctx_lines)
system_prompt = (
"You are the campaign manager for Timmy, an autonomous Bannerlord agent. "
"Your job is to choose the single best action for this campaign tick. "
"Respond ONLY with a JSON object — no prose, no markdown fences.\n\n"
"JSON schema:\n"
'{\n'
' "action": "MOVE|TRADE|RECRUIT|ENGAGE|WAIT",\n'
' "settlement_id": "<id or empty>",\n'
' "settlement_name": "<name or empty>",\n'
' "item_id": "<item or empty>",\n'
' "quantity": <int>,\n'
' "party_id": "<id or empty>",\n'
' "party_name": "<name or empty>",\n'
' "reasoning": "<one sentence>"\n'
"}\n\n"
"Priority rules:\n"
"1. ENGAGE bandits only if they are weak (< 15 troops) and we have > 25 troops.\n"
"2. RECRUIT when a nearby settlement has recruits and party < 80 troops.\n"
"3. TRADE when gold < 5000 and a settlement has trade goods.\n"
"4. MOVE toward the nearest settlement with recruits or trade goods.\n"
"5. WAIT only if food < 1 day or morale < 40."
)
user_prompt = f"Current game state:\n\n{context}\nChoose the best action."
return [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
# ---------------------------------------------------------------------------
# Response parser
# ---------------------------------------------------------------------------
def parse_decision(raw_response: str) -> CampaignDecision:
"""Parse the LLM JSON response into a ``CampaignDecision``.
Falls back to ``WAIT`` on any parse error so the loop never crashes.
"""
# Strip accidental markdown code fences
text = raw_response.strip()
if text.startswith("```"):
lines = text.splitlines()
text = "\n".join(
line for line in lines if not line.startswith("```")
).strip()
try:
data = json.loads(text)
except json.JSONDecodeError as exc:
logger.warning("Decision parse error (bad JSON): %s | raw=%r", exc, raw_response[:200])
return CampaignDecision(action=M2Action.WAIT, reasoning="parse error")
try:
action_str = str(data.get("action", "WAIT")).upper()
try:
action = M2Action(action_str)
except ValueError:
logger.warning("Unknown action %r — defaulting to WAIT", action_str)
action = M2Action.WAIT
return CampaignDecision(
action=action,
settlement_id=str(data.get("settlement_id", "")),
settlement_name=str(data.get("settlement_name", "")),
item_id=str(data.get("item_id", "")),
quantity=max(1, int(data.get("quantity", 1))),
party_id=str(data.get("party_id", "")),
party_name=str(data.get("party_name", "")),
reasoning=str(data.get("reasoning", "")),
)
except (KeyError, ValueError, TypeError) as exc:
logger.warning("Decision parse error (bad fields): %s", exc)
return CampaignDecision(action=M2Action.WAIT, reasoning=f"field error: {exc}")
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
async def decide(state: "Any") -> CampaignDecision:
"""Ask the local LLM to choose a campaign action.
Uses the cascade router (Ollama → Claude fallback) configured in
``config/providers.yaml``. Gracefully returns WAIT on any LLM failure.
Parameters
----------
state:
A ``CampaignState`` instance.
Returns
-------
CampaignDecision
The chosen action and its parameters.
"""
from config import settings
messages = build_decision_prompt(state)
model = settings.bannerlord_model
try:
from infrastructure.router import get_router
router = get_router()
response = await router.complete(
messages=messages,
model=model,
temperature=0.1,
)
raw_text: str = response.get("content", "")
decision = parse_decision(raw_text)
logger.info(
"Decision [tick=%d]: %s%s",
state.tick,
decision.action,
decision.reasoning,
)
return decision
except Exception as exc: # noqa: BLE001
logger.warning("Decision LLM call failed: %s — defaulting to WAIT", exc)
return CampaignDecision(
action=M2Action.WAIT,
reasoning=f"LLM unavailable: {exc}",
)

View File

@@ -0,0 +1,195 @@
"""GABS TCP/JSON-RPC client for Bannerlord.
Connects to the GABS C# mod (Bannerlord.GABS) over TCP on port 4825
and dispatches JSON-RPC 2.0 requests. All I/O is async; synchronous
callers must wrap in ``asyncio.to_thread()``.
Architecture:
Bannerlord (Windows VM) ← GABS C# mod ← TCP:4825 ← this client
Usage::
async with GabsClient() as client:
state = await client.get_game_state()
result = await client.call("party/move_to_settlement",
{"settlement_id": "town_A1"})
"""
from __future__ import annotations
import asyncio
import json
import logging
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
# JSON-RPC framing: each message is newline-delimited UTF-8 JSON.
_ENCODING = "utf-8"
_NEWLINE = b"\n"
_DEFAULT_TIMEOUT = 30.0
class GabsError(Exception):
"""Raised when GABS returns a JSON-RPC error response."""
def __init__(self, code: int, message: str, data: Any = None) -> None:
super().__init__(f"GABS error {code}: {message}")
self.code = code
self.data = data
class GabsClient:
"""Async TCP JSON-RPC 2.0 client for the GABS Bannerlord mod.
Parameters
----------
host:
GABS server host (Windows VM IP or ``localhost`` for port-forwarded).
port:
GABS server port (default 4825).
timeout:
Per-call timeout in seconds.
"""
def __init__(
self,
*,
host: str | None = None,
port: int | None = None,
timeout: float = _DEFAULT_TIMEOUT,
) -> None:
self._host = host or settings.gabs_host
self._port = port or settings.gabs_port
self._timeout = timeout
self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None
self._req_id = 0
self._connected = False
# -- lifecycle ---------------------------------------------------------
async def connect(self) -> None:
"""Open the TCP connection to GABS."""
try:
self._reader, self._writer = await asyncio.wait_for(
asyncio.open_connection(self._host, self._port),
timeout=self._timeout,
)
self._connected = True
logger.info("GabsClient connected to %s:%d", self._host, self._port)
except (OSError, asyncio.TimeoutError) as exc:
logger.warning("GabsClient could not connect to GABS: %s", exc)
self._connected = False
raise
async def disconnect(self) -> None:
"""Close the TCP connection."""
if self._writer is not None:
try:
self._writer.close()
await self._writer.wait_closed()
except Exception as exc: # noqa: BLE001
logger.debug("GabsClient disconnect error (ignored): %s", exc)
self._connected = False
self._reader = None
self._writer = None
logger.info("GabsClient disconnected")
@property
def is_connected(self) -> bool:
return self._connected
# -- context manager ---------------------------------------------------
async def __aenter__(self) -> "GabsClient":
await self.connect()
return self
async def __aexit__(self, *_: Any) -> None:
await self.disconnect()
# -- public API --------------------------------------------------------
async def call(self, method: str, params: dict[str, Any] | None = None) -> Any:
"""Call a GABS tool and return the result.
Parameters
----------
method:
GABS tool name, e.g. ``"party/move_to_settlement"``.
params:
Tool parameters dict.
Returns
-------
Any
The ``result`` field from the JSON-RPC response.
Raises
------
GabsError
If GABS returns an error response.
RuntimeError
If not connected.
"""
if not self._connected or self._writer is None or self._reader is None:
raise RuntimeError("GabsClient is not connected — call connect() first")
self._req_id += 1
request = {
"jsonrpc": "2.0",
"id": self._req_id,
"method": method,
"params": params or {},
}
raw = json.dumps(request).encode(_ENCODING) + _NEWLINE
try:
self._writer.write(raw)
await asyncio.wait_for(self._writer.drain(), timeout=self._timeout)
line = await asyncio.wait_for(
self._reader.readline(), timeout=self._timeout
)
except asyncio.TimeoutError as exc:
raise RuntimeError(f"GABS call '{method}' timed out after {self._timeout}s") from exc
except (OSError, ConnectionResetError) as exc:
self._connected = False
raise RuntimeError(f"GABS connection lost during '{method}': {exc}") from exc
response = json.loads(line.decode(_ENCODING))
if "error" in response:
err = response["error"]
raise GabsError(
code=err.get("code", -1),
message=err.get("message", "unknown error"),
data=err.get("data"),
)
return response.get("result")
async def get_game_state(self) -> dict[str, Any]:
"""Return the full game state snapshot from GABS.
Returns an empty dict and logs a warning if GABS is unreachable.
"""
try:
result = await self.call("game/get_state")
return result if isinstance(result, dict) else {}
except (GabsError, RuntimeError) as exc:
logger.warning("GABS get_game_state failed: %s", exc)
return {}
async def ping(self) -> bool:
"""Return True if GABS responds to a ping."""
try:
await self.call("game/ping")
return True
except Exception as exc: # noqa: BLE001
logger.debug("GABS ping failed: %s", exc)
return False

View File

@@ -374,6 +374,17 @@ class Settings(BaseSettings):
error_feedback_enabled: bool = True # Auto-create bug report tasks
error_dedup_window_seconds: int = 300 # 5-min dedup window
# ── Bannerlord / GABS ─────────────────────────────────────────────
# GABS (Bannerlord Agent Bridge System) TCP/JSON-RPC server.
# Runs inside the Windows VM hosting Bannerlord.
# Override with GABS_HOST / GABS_PORT env vars.
gabs_host: str = "localhost"
gabs_port: int = 4825
# Decision model for the Bannerlord campaign agent (Qwen3 preferred).
bannerlord_model: str = "qwen3:14b"
# Campaign-tick interval in seconds (real-time pause between in-game days).
bannerlord_tick_seconds: float = 5.0
# ── Scripture / Biblical Integration ──────────────────────────────
# Enable the biblical text module.
scripture_enabled: bool = True

View File

@@ -0,0 +1,234 @@
"""Bannerlord world adapter — bridges GABS to the WorldInterface contract.
Allows the existing ``Heartbeat`` loop to drive the Bannerlord campaign
by treating it as just another game world. Wraps the async ``GabsClient``
for synchronous use (the ``Heartbeat`` calls ``observe()`` and ``act()``
synchronously).
Async callers should use ``CampaignLoop`` directly — it is more efficient
and handles the full M2 logic natively.
Usage::
adapter = BannerlordWorldAdapter()
adapter.connect()
heartbeat = Heartbeat(world=adapter, interval=5.0)
await heartbeat.run_once()
adapter.disconnect()
"""
from __future__ import annotations
import asyncio
import logging
from infrastructure.world.interface import WorldInterface
from infrastructure.world.types import (
ActionResult,
ActionStatus,
CommandInput,
PerceptionOutput,
)
logger = logging.getLogger(__name__)
class BannerlordWorldAdapter(WorldInterface):
"""WorldInterface adapter for Bannerlord via GABS.
Wraps ``GabsClient`` and ``CampaignState`` to present the Bannerlord
campaign map as a ``WorldInterface``-compatible world.
Parameters
----------
host:
Override GABS server host (defaults to ``settings.gabs_host``).
port:
Override GABS server port (defaults to ``settings.gabs_port``).
"""
def __init__(
self,
*,
host: str | None = None,
port: int | None = None,
) -> None:
from config import settings
self._host = host or settings.gabs_host
self._port = port or settings.gabs_port
self._connected = False
self._client = None
self._loop: asyncio.AbstractEventLoop | None = None
# -- lifecycle ---------------------------------------------------------
def connect(self) -> None:
"""Open the GABS TCP connection (synchronous wrapper)."""
from bannerlord.gabs_client import GabsClient
self._client = GabsClient(host=self._host, port=self._port)
try:
self._loop = asyncio.get_event_loop()
except RuntimeError:
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
try:
self._loop.run_until_complete(self._client.connect())
self._connected = True
logger.info("BannerlordWorldAdapter connected to GABS")
except Exception as exc: # noqa: BLE001
logger.warning("BannerlordWorldAdapter: GABS connect failed: %s", exc)
self._connected = False
def disconnect(self) -> None:
"""Close the GABS TCP connection (synchronous wrapper)."""
if self._client is not None and self._loop is not None:
try:
self._loop.run_until_complete(self._client.disconnect())
except Exception as exc: # noqa: BLE001
logger.debug("BannerlordWorldAdapter disconnect error: %s", exc)
self._connected = False
@property
def is_connected(self) -> bool:
return self._connected
# -- core contract -----------------------------------------------------
def observe(self) -> PerceptionOutput:
"""Poll GABS for current game state and return structured perception."""
from bannerlord.campaign_state import parse_campaign_state
if not self._connected or self._client is None or self._loop is None:
return PerceptionOutput(
location="disconnected",
entities=[],
events=["gabs_disconnected"],
raw={"error": "GABS not connected"},
)
try:
raw = self._loop.run_until_complete(self._client.get_game_state())
state = parse_campaign_state(raw)
# Build entities list from settlements and nearby parties
entities: list[str] = []
for s in state.settlements[:5]:
entities.append(f"settlement:{s.name}")
for p in state.nearby_parties[:3]:
prefix = "hostile" if p.is_hostile else "friendly"
entities.append(f"{prefix}_party:{p.name}")
# Build events list
events: list[str] = []
if state.party.food_days < 2.0:
events.append("low_food")
if state.party.morale < 40:
events.append("low_morale")
if state.hostile_bandits_nearby():
events.append("bandits_nearby")
if state.m2_complete:
events.append("m2_complete")
location = state.party.current_settlement or "campaign_map"
return PerceptionOutput(
location=location,
entities=entities,
events=events,
raw=raw,
)
except Exception as exc: # noqa: BLE001
logger.warning("BannerlordWorldAdapter.observe() failed: %s", exc)
return PerceptionOutput(
location="unknown",
entities=[],
events=[f"observe_error:{exc}"],
raw={"error": str(exc)},
)
def act(self, command: CommandInput) -> ActionResult:
"""Dispatch a campaign command to GABS.
Recognized ``command.action`` values:
- ``"move"`` → party/move_to_settlement (target = settlement_id)
- ``"trade"`` → inventory/buy_item (target = item_id)
- ``"recruit"`` → party/recruit_all
- ``"engage"`` → party/engage_party (target = party_id)
Parameters
----------
command:
WorldInterface ``CommandInput`` with action, target, parameters.
"""
if not self._connected or self._client is None or self._loop is None:
return ActionResult(
status=ActionStatus.FAILURE,
message="GABS not connected",
)
try:
return self._loop.run_until_complete(self._async_act(command))
except Exception as exc: # noqa: BLE001
logger.warning("BannerlordWorldAdapter.act() failed: %s", exc)
return ActionResult(
status=ActionStatus.FAILURE,
message=f"act failed: {exc}",
)
async def _async_act(self, command: CommandInput) -> ActionResult:
"""Async implementation of act()."""
from bannerlord.campaign_actions import (
buy_item,
engage_party,
move_to_settlement,
recruit_all,
)
action = command.action.lower()
params = command.parameters
if action == "move":
settlement_id = command.target or params.get("settlement_id", "")
return await move_to_settlement(
self._client,
settlement_id,
settlement_name=params.get("settlement_name", ""),
)
elif action == "trade":
item_id = command.target or params.get("item_id", "")
quantity = int(params.get("quantity", 1))
return await buy_item(
self._client,
item_id,
quantity,
settlement_id=params.get("settlement_id", ""),
)
elif action == "recruit":
return await recruit_all(
self._client,
settlement_id=params.get("settlement_id", ""),
)
elif action == "engage":
party_id = command.target or params.get("party_id", "")
return await engage_party(
self._client,
party_id,
party_name=params.get("party_name", ""),
)
else:
return ActionResult(
status=ActionStatus.NOOP,
message=f"Unknown action: {command.action}",
)
def speak(self, message: str, target: str | None = None) -> None:
"""Log the message — GABS has no chat mechanism in M2."""
logger.info("BannerlordWorldAdapter.speak: %r (target=%r)", message, target)

View File

View File

@@ -0,0 +1,102 @@
"""Unit tests for bannerlord.campaign_actions."""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock
import pytest
from bannerlord.campaign_actions import (
GabsTool,
buy_item,
engage_party,
move_to_settlement,
recruit_all,
)
from infrastructure.world.types import ActionStatus
def _mock_client(return_value=None, raise_exc=None):
"""Build a mock GabsClient."""
client = MagicMock()
if raise_exc is not None:
client.call = AsyncMock(side_effect=raise_exc)
else:
client.call = AsyncMock(return_value=return_value)
return client
class TestMoveToSettlement:
async def test_success(self):
client = _mock_client({"eta_days": 2})
result = await move_to_settlement(client, "town_A1", settlement_name="Marunath")
assert result.status == ActionStatus.SUCCESS
client.call.assert_called_once_with(
GabsTool.MOVE_TO_SETTLEMENT, {"settlement_id": "town_A1"}
)
async def test_failure_on_gabs_error(self):
client = _mock_client(raise_exc=RuntimeError("GABS timeout"))
result = await move_to_settlement(client, "town_A1")
assert result.status == ActionStatus.FAILURE
assert "GABS timeout" in result.message
async def test_uses_settlement_id_as_label_when_no_name(self):
client = _mock_client({})
result = await move_to_settlement(client, "town_B2")
assert result.status == ActionStatus.SUCCESS
assert "town_B2" in result.message
class TestBuyItem:
async def test_success(self):
client = _mock_client({"cost": 100})
result = await buy_item(client, "grain", 5)
assert result.status == ActionStatus.SUCCESS
assert "grain" in result.message
client.call.assert_called_once_with(
GabsTool.BUY_ITEM, {"item_id": "grain", "quantity": 5}
)
async def test_includes_settlement_id_when_given(self):
client = _mock_client({})
await buy_item(client, "iron", 2, settlement_id="town_A1")
call_params = client.call.call_args[0][1]
assert call_params["settlement_id"] == "town_A1"
async def test_failure_logged_gracefully(self):
client = _mock_client(raise_exc=Exception("inventory full"))
result = await buy_item(client, "wool", 10)
assert result.status == ActionStatus.FAILURE
class TestRecruitAll:
async def test_success(self):
client = _mock_client({"recruited": 15})
result = await recruit_all(client)
assert result.status == ActionStatus.SUCCESS
assert "15" in result.message
async def test_success_with_settlement(self):
client = _mock_client({"recruited": 8})
result = await recruit_all(client, settlement_id="town_A1")
call_params = client.call.call_args[0][1]
assert call_params["settlement_id"] == "town_A1"
async def test_failure_graceful(self):
client = _mock_client(raise_exc=RuntimeError("no recruits"))
result = await recruit_all(client)
assert result.status == ActionStatus.FAILURE
class TestEngageParty:
async def test_success(self):
client = _mock_client({"outcome": "victory", "loot": 200})
result = await engage_party(client, "bandit_1", party_name="Forest Bandits")
assert result.status == ActionStatus.SUCCESS
assert "victory" in result.message
async def test_failure_graceful(self):
client = _mock_client(raise_exc=RuntimeError("party not found"))
result = await engage_party(client, "bandit_1")
assert result.status == ActionStatus.FAILURE

View File

@@ -0,0 +1,200 @@
"""Unit tests for bannerlord.campaign_loop."""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from bannerlord.campaign_loop import CampaignLoop, TickResult
from bannerlord.decision import CampaignDecision, M2Action
from infrastructure.world.types import ActionResult, ActionStatus
def _make_game_state(*, troops: int = 30, gold: int = 2000) -> dict:
return {
"tick": 0,
"party": {
"size": troops,
"wounded": 0,
"food_days": 5.0,
"morale": 80.0,
"current_settlement": "town_A1",
},
"economy": {"gold": gold, "daily_income": 200, "daily_expenses": 150},
"nearby_parties": [],
"settlements": [
{
"id": "town_A1",
"name": "Marunath",
"faction": "aserai",
"is_friendly": True,
"distance": 0.0,
"has_recruits": True,
"has_trade_goods": False,
}
],
}
class TestCampaignLoopDispatch:
"""Tests for the internal _dispatch() routing."""
def _loop(self) -> CampaignLoop:
return CampaignLoop(tick_seconds=0.0, max_ticks=1)
async def test_dispatch_move(self):
loop = self._loop()
client = MagicMock()
decision = CampaignDecision(
action=M2Action.MOVE,
settlement_id="town_A1",
settlement_name="Marunath",
)
with patch("bannerlord.campaign_loop.move_to_settlement", new_callable=AsyncMock) as mock_move:
mock_move.return_value = ActionResult(status=ActionStatus.SUCCESS, message="ok")
await loop._dispatch(decision, client)
mock_move.assert_called_once_with(client, "town_A1", settlement_name="Marunath")
async def test_dispatch_recruit(self):
loop = self._loop()
client = MagicMock()
decision = CampaignDecision(
action=M2Action.RECRUIT,
settlement_id="town_A1",
)
with patch("bannerlord.campaign_loop.recruit_all", new_callable=AsyncMock) as mock_recruit:
mock_recruit.return_value = ActionResult(status=ActionStatus.SUCCESS, message="15 recruited")
await loop._dispatch(decision, client)
mock_recruit.assert_called_once()
async def test_dispatch_engage(self):
loop = self._loop()
client = MagicMock()
decision = CampaignDecision(
action=M2Action.ENGAGE,
party_id="bandit_1",
party_name="Forest Bandits",
)
with patch("bannerlord.campaign_loop.engage_party", new_callable=AsyncMock) as mock_engage:
mock_engage.return_value = ActionResult(status=ActionStatus.SUCCESS, message="victory")
await loop._dispatch(decision, client)
mock_engage.assert_called_once_with(client, "bandit_1", party_name="Forest Bandits")
async def test_dispatch_trade(self):
loop = self._loop()
client = MagicMock()
decision = CampaignDecision(
action=M2Action.TRADE,
item_id="grain",
quantity=5,
)
with patch("bannerlord.campaign_loop.buy_item", new_callable=AsyncMock) as mock_buy:
mock_buy.return_value = ActionResult(status=ActionStatus.SUCCESS, message="bought")
await loop._dispatch(decision, client)
mock_buy.assert_called_once_with(client, "grain", 5, settlement_id="")
async def test_dispatch_wait_returns_noop(self):
loop = self._loop()
client = MagicMock()
decision = CampaignDecision(action=M2Action.WAIT, reasoning="low food")
result = await loop._dispatch(decision, client)
assert result.status == ActionStatus.NOOP
async def test_dispatch_move_missing_settlement_id(self):
loop = self._loop()
client = MagicMock()
decision = CampaignDecision(action=M2Action.MOVE, settlement_id="")
result = await loop._dispatch(decision, client)
assert result.status == ActionStatus.FAILURE
async def test_dispatch_engage_missing_party_id(self):
loop = self._loop()
client = MagicMock()
decision = CampaignDecision(action=M2Action.ENGAGE, party_id="")
result = await loop._dispatch(decision, client)
assert result.status == ActionStatus.FAILURE
class TestCampaignLoopRun:
"""Integration-level tests for the full run() loop (mocked GABS)."""
async def test_run_stops_at_max_ticks(self):
"""Loop respects max_ticks and returns correct number of results."""
game_state = _make_game_state()
with (
patch("bannerlord.campaign_loop.GabsClient") as MockClient,
patch("bannerlord.campaign_loop.decide", new_callable=AsyncMock) as mock_decide,
patch("bannerlord.campaign_loop.move_to_settlement", new_callable=AsyncMock) as mock_move,
):
# Setup fake client
fake_client = AsyncMock()
fake_client.get_game_state = AsyncMock(return_value=game_state)
fake_client.connect = AsyncMock()
fake_client.disconnect = AsyncMock()
MockClient.return_value = fake_client
mock_decide.return_value = CampaignDecision(
action=M2Action.MOVE,
settlement_id="town_B1",
settlement_name="Epicrotea",
reasoning="moving",
)
mock_move.return_value = ActionResult(status=ActionStatus.SUCCESS, message="ok")
loop = CampaignLoop(tick_seconds=0.0, max_ticks=3)
results = await loop.run()
assert len(results) == 3
assert all(isinstance(r, TickResult) for r in results)
async def test_run_stops_when_m2_complete(self):
"""Loop exits early when M2 conditions are met."""
# State with M2 already complete
game_state = _make_game_state(troops=100, gold=10000)
with (
patch("bannerlord.campaign_loop.GabsClient") as MockClient,
patch("bannerlord.campaign_loop.decide", new_callable=AsyncMock) as mock_decide,
):
fake_client = AsyncMock()
fake_client.get_game_state = AsyncMock(return_value=game_state)
fake_client.connect = AsyncMock()
fake_client.disconnect = AsyncMock()
MockClient.return_value = fake_client
mock_decide.return_value = CampaignDecision(
action=M2Action.WAIT,
reasoning="done",
)
loop = CampaignLoop(tick_seconds=0.0, max_ticks=10)
results = await loop.run()
# Should exit after first tick (m2_complete = True)
assert len(results) == 1
assert results[0].m2_complete is True
async def test_run_aborts_on_connect_failure(self):
"""Loop returns empty history if GABS cannot be reached."""
with patch("bannerlord.campaign_loop.GabsClient") as MockClient:
fake_client = AsyncMock()
fake_client.connect = AsyncMock(side_effect=OSError("refused"))
fake_client.disconnect = AsyncMock()
MockClient.return_value = fake_client
loop = CampaignLoop(tick_seconds=0.0, max_ticks=5)
results = await loop.run()
assert results == []
def test_stop_sets_running_false(self):
loop = CampaignLoop()
loop._running = True
loop.stop()
assert not loop.is_running

View File

@@ -0,0 +1,150 @@
"""Unit tests for bannerlord.campaign_state."""
from __future__ import annotations
import pytest
from bannerlord.campaign_state import (
M2_GOLD_GOAL,
M2_TROOP_GOAL,
CampaignState,
NearbyParty,
Settlement,
parse_campaign_state,
)
class TestParseCampaignState:
def test_empty_dict_returns_defaults(self):
state = parse_campaign_state({})
assert state.party.party_size == 0
assert state.economy.gold == 0
assert state.nearby_parties == []
assert state.settlements == []
def test_full_payload_parsed(self):
raw = {
"tick": 5,
"party": {
"size": 30,
"wounded": 2,
"prisoners": 1,
"food_days": 3.5,
"morale": 75.0,
"current_settlement": "town_A1",
"speed": 5.2,
},
"economy": {
"gold": 4500,
"daily_income": 200,
"daily_expenses": 150,
},
"nearby_parties": [
{
"id": "bandit_1",
"name": "Forest Bandits",
"faction": "bandit",
"is_hostile": True,
"troop_count": 10,
"distance": 3.0,
}
],
"settlements": [
{
"id": "town_A1",
"name": "Marunath",
"faction": "aserai",
"is_friendly": True,
"distance": 0.0,
"has_recruits": True,
"has_trade_goods": False,
}
],
}
state = parse_campaign_state(raw)
assert state.tick == 5
assert state.party.party_size == 30
assert state.party.wounded == 2
assert state.economy.gold == 4500
assert state.economy.net_income == 50
assert len(state.nearby_parties) == 1
assert state.nearby_parties[0].name == "Forest Bandits"
assert len(state.settlements) == 1
assert state.settlements[0].name == "Marunath"
def test_malformed_entries_skipped(self):
raw = {
"nearby_parties": [{"id": "ok", "name": "Good", "faction": "bandit",
"is_hostile": True, "troop_count": 5, "distance": 2.0},
{"bad": "data"}],
"settlements": [None, "not_a_dict"],
}
state = parse_campaign_state(raw)
assert len(state.nearby_parties) == 1
assert state.settlements == []
class TestCampaignStateProperties:
def _make_state(self, *, troops: int, gold: int) -> CampaignState:
state = CampaignState()
state.party.party_size = troops
state.economy.gold = gold
return state
def test_m2_not_complete_by_default(self):
state = self._make_state(troops=20, gold=0)
assert not state.m2_complete
def test_m2_complete_when_both_goals_met(self):
state = self._make_state(troops=M2_TROOP_GOAL, gold=M2_GOLD_GOAL)
assert state.m2_complete
def test_m2_not_complete_if_only_troops_met(self):
state = self._make_state(troops=M2_TROOP_GOAL, gold=M2_GOLD_GOAL - 1)
assert not state.m2_complete
def test_m2_not_complete_if_only_gold_met(self):
state = self._make_state(troops=M2_TROOP_GOAL - 1, gold=M2_GOLD_GOAL)
assert not state.m2_complete
def test_troops_progress_string(self):
state = self._make_state(troops=45, gold=0)
assert state.troops_progress == f"45/{M2_TROOP_GOAL}"
def test_gold_progress_string(self):
state = self._make_state(troops=0, gold=3000)
assert "3,000" in state.gold_progress
def test_hostile_bandits_nearby_filter(self):
state = CampaignState()
state.nearby_parties = [
NearbyParty("b1", "Bandits", "bandit", True, 10, 2.0),
NearbyParty("l1", "Lord", "empire", False, 50, 1.0),
NearbyParty("b2", "Far Bandits", "bandit", True, 5, 10.0),
]
nearby = state.hostile_bandits_nearby(max_distance=5.0)
assert len(nearby) == 1
assert nearby[0].party_id == "b1"
def test_nearest_settlement_returns_closest(self):
state = CampaignState()
state.settlements = [
Settlement("s1", "Far Town", "empire", True, 10.0),
Settlement("s2", "Near Town", "empire", True, 2.0),
]
nearest = state.nearest_settlement()
assert nearest.settlement_id == "s2"
def test_nearest_recruit_settlement(self):
state = CampaignState()
state.settlements = [
Settlement("s1", "Town A", "empire", True, 5.0, has_recruits=False),
Settlement("s2", "Town B", "empire", True, 8.0, has_recruits=True),
]
recruit = state.nearest_recruit_settlement()
assert recruit.settlement_id == "s2"
def test_nearest_settlement_none_when_empty(self):
state = CampaignState()
assert state.nearest_settlement() is None

View File

@@ -0,0 +1,154 @@
"""Unit tests for bannerlord.decision."""
from __future__ import annotations
import json
import pytest
from bannerlord.campaign_state import (
CampaignState,
EconomyState,
NearbyParty,
PartyState,
Settlement,
)
from bannerlord.decision import (
M2Action,
CampaignDecision,
build_decision_prompt,
parse_decision,
)
def _make_state(
*,
troops: int = 30,
gold: int = 2000,
food_days: float = 5.0,
morale: float = 80.0,
settlements: list | None = None,
nearby_parties: list | None = None,
) -> CampaignState:
state = CampaignState()
state.party = PartyState(
party_size=troops,
food_days=food_days,
morale=morale,
)
state.economy = EconomyState(gold=gold, daily_income=200, daily_expenses=150)
state.settlements = settlements or []
state.nearby_parties = nearby_parties or []
return state
class TestBuildDecisionPrompt:
def test_returns_two_messages(self):
state = _make_state()
messages = build_decision_prompt(state)
assert len(messages) == 2
assert messages[0]["role"] == "system"
assert messages[1]["role"] == "user"
def test_user_message_includes_party_info(self):
state = _make_state(troops=45, gold=3000)
messages = build_decision_prompt(state)
user_content = messages[1]["content"]
assert "45" in user_content
assert "3,000" in user_content
def test_bandits_appear_in_prompt_when_nearby(self):
state = _make_state(
nearby_parties=[NearbyParty("b1", "Forest Bandits", "bandit", True, 10, 2.0)]
)
messages = build_decision_prompt(state)
user_content = messages[1]["content"]
assert "Forest Bandits" in user_content
def test_settlements_appear_in_prompt(self):
state = _make_state(
settlements=[Settlement("s1", "Marunath", "aserai", True, 3.0, has_recruits=True)]
)
messages = build_decision_prompt(state)
user_content = messages[1]["content"]
assert "Marunath" in user_content
def test_system_prompt_contains_action_vocabulary(self):
state = _make_state()
messages = build_decision_prompt(state)
system = messages[0]["content"]
for action in ("MOVE", "TRADE", "RECRUIT", "ENGAGE", "WAIT"):
assert action in system
class TestParseDecision:
def test_valid_move_decision(self):
raw = json.dumps({
"action": "MOVE",
"settlement_id": "town_A1",
"settlement_name": "Marunath",
"item_id": "",
"quantity": 1,
"party_id": "",
"party_name": "",
"reasoning": "Moving to recruit troops",
})
decision = parse_decision(raw)
assert decision.action == M2Action.MOVE
assert decision.settlement_id == "town_A1"
assert decision.settlement_name == "Marunath"
def test_valid_recruit_decision(self):
raw = json.dumps({
"action": "RECRUIT",
"settlement_id": "town_A1",
"settlement_name": "Marunath",
"item_id": "",
"quantity": 1,
"party_id": "",
"party_name": "",
"reasoning": "Has recruits available",
})
decision = parse_decision(raw)
assert decision.action == M2Action.RECRUIT
def test_valid_engage_decision(self):
raw = json.dumps({
"action": "ENGAGE",
"settlement_id": "",
"settlement_name": "",
"item_id": "",
"quantity": 1,
"party_id": "bandit_1",
"party_name": "Forest Bandits",
"reasoning": "Weak bandits — easy XP",
})
decision = parse_decision(raw)
assert decision.action == M2Action.ENGAGE
assert decision.party_id == "bandit_1"
def test_wait_on_invalid_json(self):
decision = parse_decision("not json at all")
assert decision.action == M2Action.WAIT
def test_wait_on_unknown_action(self):
raw = json.dumps({"action": "TELEPORT", "reasoning": "hack"})
decision = parse_decision(raw)
assert decision.action == M2Action.WAIT
def test_strips_markdown_fences(self):
raw = '```json\n{"action": "WAIT", "reasoning": "low food"}\n```'
decision = parse_decision(raw)
assert decision.action == M2Action.WAIT
def test_quantity_minimum_one(self):
raw = json.dumps({"action": "TRADE", "item_id": "grain", "quantity": -5, "reasoning": "x"})
decision = parse_decision(raw)
assert decision.quantity == 1
def test_missing_optional_fields_default_to_empty(self):
raw = json.dumps({"action": "WAIT", "reasoning": "resting"})
decision = parse_decision(raw)
assert decision.settlement_id == ""
assert decision.party_id == ""
assert decision.item_id == ""

View File

@@ -0,0 +1,120 @@
"""Unit tests for bannerlord.gabs_client."""
from __future__ import annotations
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from bannerlord.gabs_client import GabsClient, GabsError
class TestGabsClientCall:
"""Tests for GabsClient.call() using mock StreamReader/Writer."""
def _make_client(self, response: dict) -> GabsClient:
"""Return a pre-connected GabsClient with mocked I/O."""
client = GabsClient(host="localhost", port=4825, timeout=5.0)
client._connected = True
writer = MagicMock()
writer.write = MagicMock()
writer.drain = AsyncMock()
raw_response = json.dumps(response).encode() + b"\n"
reader = MagicMock()
reader.readline = AsyncMock(return_value=raw_response)
client._reader = reader
client._writer = writer
return client
async def test_successful_call_returns_result(self):
client = self._make_client({"jsonrpc": "2.0", "id": 1, "result": {"status": "ok"}})
result = await client.call("game/ping")
assert result == {"status": "ok"}
async def test_error_response_raises_gabs_error(self):
client = self._make_client({
"jsonrpc": "2.0",
"id": 1,
"error": {"code": -32601, "message": "Method not found"},
})
with pytest.raises(GabsError) as exc_info:
await client.call("unknown/method")
assert exc_info.value.code == -32601
async def test_not_connected_raises_runtime_error(self):
client = GabsClient()
with pytest.raises(RuntimeError, match="not connected"):
await client.call("game/ping")
async def test_request_id_increments(self):
client = self._make_client({"jsonrpc": "2.0", "id": 1, "result": {}})
await client.call("game/ping")
# Reset reader for second call
client._reader.readline = AsyncMock(
return_value=json.dumps({"jsonrpc": "2.0", "id": 2, "result": {}}).encode() + b"\n"
)
await client.call("game/ping")
assert client._req_id == 2
async def test_get_game_state_returns_empty_on_error(self):
client = GabsClient()
client._connected = True
writer = MagicMock()
writer.write = MagicMock()
writer.drain = AsyncMock()
reader = MagicMock()
reader.readline = AsyncMock(side_effect=OSError("connection reset"))
client._reader = reader
client._writer = writer
result = await client.get_game_state()
assert result == {}
async def test_ping_returns_true_on_success(self):
client = self._make_client({"jsonrpc": "2.0", "id": 1, "result": "pong"})
result = await client.ping()
assert result is True
async def test_ping_returns_false_on_failure(self):
client = GabsClient()
result = await client.ping()
assert result is False
class TestGabsClientLifecycle:
async def test_connect_failure_sets_not_connected(self):
client = GabsClient(host="localhost", port=9999, timeout=0.1)
with pytest.raises(Exception):
await client.connect()
assert not client.is_connected
async def test_context_manager_calls_connect_and_disconnect(self):
client = GabsClient()
connect_called = False
disconnect_called = False
async def _fake_connect():
nonlocal connect_called
connect_called = True
client._connected = True
async def _fake_disconnect():
nonlocal disconnect_called
disconnect_called = True
client._connected = False
client.connect = _fake_connect
client.disconnect = _fake_disconnect
async with client as c:
assert c is client
assert connect_called
assert disconnect_called