Compare commits
2 Commits
claude/iss
...
claude/iss
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1a4653570e | ||
|
|
46ef9d3aba |
11
src/bannerlord/__init__.py
Normal file
11
src/bannerlord/__init__.py
Normal file
@@ -0,0 +1,11 @@
|
||||
"""Bannerlord campaign agent — M2: Basic Campaign Actions.
|
||||
|
||||
Provides GABS integration (TCP JSON-RPC, port 4825) and the observe →
|
||||
decide → act loop for autonomous campaign play: move, trade, recruit,
|
||||
and engage bandits.
|
||||
|
||||
Key GABS tools: party/move_to_settlement, inventory/buy_item,
|
||||
party/recruit_all, party/engage_party.
|
||||
|
||||
Done-condition: party grows from 20 → 100 troops, gold reaches 10 000 denars.
|
||||
"""
|
||||
200
src/bannerlord/campaign_actions.py
Normal file
200
src/bannerlord/campaign_actions.py
Normal file
@@ -0,0 +1,200 @@
|
||||
"""Bannerlord M2 campaign action primitives.
|
||||
|
||||
Wraps the four key GABS tools for the M2 milestone:
|
||||
- party/move_to_settlement → move the party to a named settlement
|
||||
- inventory/buy_item → purchase trade goods
|
||||
- party/recruit_all → hire all available recruits
|
||||
- party/engage_party → engage a nearby bandit party
|
||||
|
||||
All functions are async and return an ``ActionResult`` that is compatible
|
||||
with the ``WorldInterface`` contract.
|
||||
|
||||
Error handling follows Pattern 3 (Feature Disable): if GABS rejects an
|
||||
action, log a warning and return a FAILURE result — never raise.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from enum import StrEnum
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from infrastructure.world.types import ActionResult, ActionStatus
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bannerlord.gabs_client import GabsClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GABS method names — canonical reference
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class GabsTool(StrEnum):
|
||||
"""GABS JSON-RPC method names for the M2 action set."""
|
||||
|
||||
MOVE_TO_SETTLEMENT = "party/move_to_settlement"
|
||||
BUY_ITEM = "inventory/buy_item"
|
||||
RECRUIT_ALL = "party/recruit_all"
|
||||
ENGAGE_PARTY = "party/engage_party"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Action functions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def move_to_settlement(
|
||||
client: "GabsClient",
|
||||
settlement_id: str,
|
||||
*,
|
||||
settlement_name: str = "",
|
||||
) -> ActionResult:
|
||||
"""Move the party to a target settlement.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
client:
|
||||
Connected ``GabsClient`` instance.
|
||||
settlement_id:
|
||||
GABS settlement identifier (e.g. ``"town_A1"``).
|
||||
settlement_name:
|
||||
Human-readable name for logging only.
|
||||
"""
|
||||
label = settlement_name or settlement_id
|
||||
try:
|
||||
result = await client.call(
|
||||
GabsTool.MOVE_TO_SETTLEMENT,
|
||||
{"settlement_id": settlement_id},
|
||||
)
|
||||
logger.info("MOVE → %s: %s", label, result)
|
||||
return ActionResult(
|
||||
status=ActionStatus.SUCCESS,
|
||||
message=f"Moving to {label}",
|
||||
data=result or {},
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("MOVE → %s failed: %s", label, exc)
|
||||
return ActionResult(
|
||||
status=ActionStatus.FAILURE,
|
||||
message=f"Move to {label} failed: {exc}",
|
||||
data={},
|
||||
)
|
||||
|
||||
|
||||
async def buy_item(
|
||||
client: "GabsClient",
|
||||
item_id: str,
|
||||
quantity: int,
|
||||
*,
|
||||
settlement_id: str = "",
|
||||
) -> ActionResult:
|
||||
"""Purchase trade goods from the current or specified settlement.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
client:
|
||||
Connected ``GabsClient`` instance.
|
||||
item_id:
|
||||
Item identifier (e.g. ``"grain"``, ``"iron"``, ``"wool"``).
|
||||
quantity:
|
||||
Number of units to purchase.
|
||||
settlement_id:
|
||||
Optional target settlement; empty means current location.
|
||||
"""
|
||||
try:
|
||||
params: dict = {"item_id": item_id, "quantity": quantity}
|
||||
if settlement_id:
|
||||
params["settlement_id"] = settlement_id
|
||||
|
||||
result = await client.call(GabsTool.BUY_ITEM, params)
|
||||
logger.info("BUY %dx %s: %s", quantity, item_id, result)
|
||||
return ActionResult(
|
||||
status=ActionStatus.SUCCESS,
|
||||
message=f"Purchased {quantity}x {item_id}",
|
||||
data=result or {},
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("BUY %dx %s failed: %s", quantity, item_id, exc)
|
||||
return ActionResult(
|
||||
status=ActionStatus.FAILURE,
|
||||
message=f"Buy {item_id} failed: {exc}",
|
||||
data={},
|
||||
)
|
||||
|
||||
|
||||
async def recruit_all(
|
||||
client: "GabsClient",
|
||||
*,
|
||||
settlement_id: str = "",
|
||||
) -> ActionResult:
|
||||
"""Recruit all available troops at the current or specified settlement.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
client:
|
||||
Connected ``GabsClient`` instance.
|
||||
settlement_id:
|
||||
Optional settlement to recruit from; empty means current.
|
||||
"""
|
||||
try:
|
||||
params: dict = {}
|
||||
if settlement_id:
|
||||
params["settlement_id"] = settlement_id
|
||||
|
||||
result = await client.call(GabsTool.RECRUIT_ALL, params)
|
||||
recruited = (result or {}).get("recruited", "?")
|
||||
logger.info("RECRUIT_ALL: recruited %s troops", recruited)
|
||||
return ActionResult(
|
||||
status=ActionStatus.SUCCESS,
|
||||
message=f"Recruited {recruited} troops",
|
||||
data=result or {},
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("RECRUIT_ALL failed: %s", exc)
|
||||
return ActionResult(
|
||||
status=ActionStatus.FAILURE,
|
||||
message=f"Recruit all failed: {exc}",
|
||||
data={},
|
||||
)
|
||||
|
||||
|
||||
async def engage_party(
|
||||
client: "GabsClient",
|
||||
party_id: str,
|
||||
*,
|
||||
party_name: str = "",
|
||||
) -> ActionResult:
|
||||
"""Engage a nearby party (typically a bandit gang) in combat.
|
||||
|
||||
Auto-resolve is expected at high Tactics skill — the agent relies
|
||||
on GABS to handle the battle outcome.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
client:
|
||||
Connected ``GabsClient`` instance.
|
||||
party_id:
|
||||
GABS party identifier of the target.
|
||||
party_name:
|
||||
Human-readable name for logging only.
|
||||
"""
|
||||
label = party_name or party_id
|
||||
try:
|
||||
result = await client.call(GabsTool.ENGAGE_PARTY, {"party_id": party_id})
|
||||
outcome = (result or {}).get("outcome", "unknown")
|
||||
logger.info("ENGAGE %s: %s", label, outcome)
|
||||
return ActionResult(
|
||||
status=ActionStatus.SUCCESS,
|
||||
message=f"Engaged {label}: {outcome}",
|
||||
data=result or {},
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("ENGAGE %s failed: %s", label, exc)
|
||||
return ActionResult(
|
||||
status=ActionStatus.FAILURE,
|
||||
message=f"Engage {label} failed: {exc}",
|
||||
data={},
|
||||
)
|
||||
316
src/bannerlord/campaign_loop.py
Normal file
316
src/bannerlord/campaign_loop.py
Normal file
@@ -0,0 +1,316 @@
|
||||
"""Bannerlord M2 campaign action loop.
|
||||
|
||||
Implements the observe → decide → act → wait pipeline described in
|
||||
issue #1094. The loop runs until the M2 victory conditions are met
|
||||
(100 troops + 10 000 gold) or until stopped externally.
|
||||
|
||||
Architecture:
|
||||
CampaignLoop.run()
|
||||
while not m2_complete:
|
||||
state = gabs.get_game_state() # observe
|
||||
decision = decide(state) # decide (local Qwen3)
|
||||
result = dispatch(decision, gabs) # act (GABS)
|
||||
await asyncio.sleep(tick_seconds) # wait
|
||||
|
||||
Error handling:
|
||||
- GABS connection failures → log + retry with backoff (max 3 attempts)
|
||||
- LLM failures → WAIT action (graceful degradation)
|
||||
- Action failures → log + continue to next tick
|
||||
|
||||
Progress tracking:
|
||||
Loop publishes heartbeat events via the event bus so the dashboard
|
||||
can display live party size and gold.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from bannerlord.campaign_actions import buy_item, engage_party, move_to_settlement, recruit_all
|
||||
from bannerlord.campaign_state import parse_campaign_state
|
||||
from bannerlord.decision import M2Action, decide
|
||||
from bannerlord.gabs_client import GabsClient
|
||||
from config import settings
|
||||
from infrastructure.world.types import ActionResult, ActionStatus
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_MAX_RECONNECT_ATTEMPTS = 3
|
||||
_RECONNECT_DELAY = 5.0 # seconds
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Progress snapshot (emitted each tick)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class TickResult:
|
||||
"""Summary of one campaign tick."""
|
||||
|
||||
tick: int
|
||||
timestamp: str
|
||||
party_size: int
|
||||
gold: int
|
||||
action: str
|
||||
action_status: str
|
||||
reasoning: str
|
||||
duration_ms: int
|
||||
m2_complete: bool = False
|
||||
error: str = ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Campaign loop
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class CampaignLoop:
|
||||
"""Runs the Bannerlord M2 autonomous campaign.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
gabs_host:
|
||||
Override GABS server host.
|
||||
gabs_port:
|
||||
Override GABS server port.
|
||||
tick_seconds:
|
||||
Real-time pause between in-game ticks.
|
||||
on_tick:
|
||||
Optional async callback invoked after each tick with the
|
||||
``TickResult``. Used by the dashboard for live updates.
|
||||
max_ticks:
|
||||
Hard cap for testing / benchmarking. 0 = unlimited.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
gabs_host: str | None = None,
|
||||
gabs_port: int | None = None,
|
||||
tick_seconds: float | None = None,
|
||||
on_tick=None,
|
||||
max_ticks: int = 0,
|
||||
) -> None:
|
||||
self._host = gabs_host or settings.gabs_host
|
||||
self._port = gabs_port or settings.gabs_port
|
||||
self._tick_seconds = tick_seconds if tick_seconds is not None else settings.bannerlord_tick_seconds
|
||||
self._on_tick = on_tick
|
||||
self._max_ticks = max_ticks
|
||||
self._running = False
|
||||
self.history: list[TickResult] = []
|
||||
|
||||
# -- public API --------------------------------------------------------
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
return self._running
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Signal the loop to stop after the current tick."""
|
||||
self._running = False
|
||||
logger.info("CampaignLoop stop requested")
|
||||
|
||||
async def run(self) -> list[TickResult]:
|
||||
"""Start the campaign loop.
|
||||
|
||||
Returns the list of tick results (for testing / benchmarking).
|
||||
Runs until M2 complete, externally stopped, or max_ticks reached.
|
||||
"""
|
||||
self._running = True
|
||||
logger.info(
|
||||
"CampaignLoop starting — gabs=%s:%d tick=%.1fs",
|
||||
self._host,
|
||||
self._port,
|
||||
self._tick_seconds,
|
||||
)
|
||||
|
||||
client = GabsClient(host=self._host, port=self._port)
|
||||
try:
|
||||
await self._connect_with_retry(client)
|
||||
except RuntimeError as exc: # noqa: BLE001
|
||||
logger.error("CampaignLoop: could not connect to GABS — aborting: %s", exc)
|
||||
self._running = False
|
||||
return self.history
|
||||
|
||||
tick_num = 0
|
||||
try:
|
||||
while self._running:
|
||||
tick_num += 1
|
||||
if self._max_ticks > 0 and tick_num > self._max_ticks:
|
||||
logger.info("CampaignLoop: max_ticks=%d reached", self._max_ticks)
|
||||
break
|
||||
|
||||
result = await self._run_tick(client, tick_num)
|
||||
self.history.append(result)
|
||||
|
||||
await self._emit(result)
|
||||
|
||||
if result.m2_complete:
|
||||
logger.info(
|
||||
"M2 COMPLETE! Party=%d troops, Gold=%d denars",
|
||||
result.party_size,
|
||||
result.gold,
|
||||
)
|
||||
break
|
||||
|
||||
if result.error and not self._running:
|
||||
break
|
||||
|
||||
await asyncio.sleep(self._tick_seconds)
|
||||
|
||||
finally:
|
||||
await client.disconnect()
|
||||
self._running = False
|
||||
logger.info("CampaignLoop stopped after %d ticks", tick_num)
|
||||
|
||||
return self.history
|
||||
|
||||
# -- internal: single tick ---------------------------------------------
|
||||
|
||||
async def _run_tick(self, client: "Any", tick_num: int) -> TickResult:
|
||||
"""Execute one observe → decide → act cycle."""
|
||||
start = time.monotonic()
|
||||
|
||||
# 1. Observe
|
||||
raw_state = await client.get_game_state()
|
||||
state = parse_campaign_state(raw_state)
|
||||
state = _override_tick(state, tick_num)
|
||||
|
||||
# 2. Decide
|
||||
decision = await decide(state)
|
||||
|
||||
# 3. Act
|
||||
action_result = await self._dispatch(decision, client)
|
||||
|
||||
duration_ms = int((time.monotonic() - start) * 1000)
|
||||
|
||||
return TickResult(
|
||||
tick=tick_num,
|
||||
timestamp=datetime.now(UTC).isoformat(),
|
||||
party_size=state.party.party_size,
|
||||
gold=state.economy.gold,
|
||||
action=decision.action,
|
||||
action_status=action_result.status.value,
|
||||
reasoning=decision.reasoning,
|
||||
duration_ms=duration_ms,
|
||||
m2_complete=state.m2_complete,
|
||||
)
|
||||
|
||||
async def _dispatch(self, decision: "Any", client: "Any") -> "Any":
|
||||
"""Route the decision to the correct GABS action function."""
|
||||
action = decision.action
|
||||
|
||||
if action == M2Action.MOVE:
|
||||
if not decision.settlement_id:
|
||||
logger.warning("MOVE decision has no settlement_id — skipping")
|
||||
return ActionResult(
|
||||
status=ActionStatus.FAILURE,
|
||||
message="MOVE missing settlement_id",
|
||||
)
|
||||
return await move_to_settlement(
|
||||
client,
|
||||
decision.settlement_id,
|
||||
settlement_name=decision.settlement_name,
|
||||
)
|
||||
|
||||
elif action == M2Action.TRADE:
|
||||
if not decision.item_id:
|
||||
logger.warning("TRADE decision has no item_id — skipping")
|
||||
return ActionResult(
|
||||
status=ActionStatus.FAILURE,
|
||||
message="TRADE missing item_id",
|
||||
)
|
||||
return await buy_item(
|
||||
client,
|
||||
decision.item_id,
|
||||
decision.quantity,
|
||||
settlement_id=decision.settlement_id,
|
||||
)
|
||||
|
||||
elif action == M2Action.RECRUIT:
|
||||
return await recruit_all(
|
||||
client,
|
||||
settlement_id=decision.settlement_id,
|
||||
)
|
||||
|
||||
elif action == M2Action.ENGAGE:
|
||||
if not decision.party_id:
|
||||
logger.warning("ENGAGE decision has no party_id — skipping")
|
||||
return ActionResult(
|
||||
status=ActionStatus.FAILURE,
|
||||
message="ENGAGE missing party_id",
|
||||
)
|
||||
return await engage_party(
|
||||
client,
|
||||
decision.party_id,
|
||||
party_name=decision.party_name,
|
||||
)
|
||||
|
||||
else: # WAIT or unknown
|
||||
logger.debug("Tick %s: WAIT — %s", decision.action, decision.reasoning)
|
||||
return ActionResult(
|
||||
status=ActionStatus.NOOP,
|
||||
message=f"WAIT: {decision.reasoning}",
|
||||
)
|
||||
|
||||
# -- internal: connectivity --------------------------------------------
|
||||
|
||||
async def _connect_with_retry(self, client: "Any") -> None:
|
||||
"""Try to connect, retrying up to _MAX_RECONNECT_ATTEMPTS times."""
|
||||
for attempt in range(1, _MAX_RECONNECT_ATTEMPTS + 1):
|
||||
try:
|
||||
await client.connect()
|
||||
return
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning(
|
||||
"GABS connect attempt %d/%d failed: %s",
|
||||
attempt,
|
||||
_MAX_RECONNECT_ATTEMPTS,
|
||||
exc,
|
||||
)
|
||||
if attempt < _MAX_RECONNECT_ATTEMPTS:
|
||||
await asyncio.sleep(_RECONNECT_DELAY)
|
||||
|
||||
raise RuntimeError(
|
||||
f"Could not connect to GABS at {self._host}:{self._port} "
|
||||
f"after {_MAX_RECONNECT_ATTEMPTS} attempts"
|
||||
)
|
||||
|
||||
# -- internal: event emission ------------------------------------------
|
||||
|
||||
async def _emit(self, result: TickResult) -> None:
|
||||
"""Emit tick data to the event bus (best-effort)."""
|
||||
try:
|
||||
from infrastructure.events.bus import event_bus # noqa: PLC0415
|
||||
|
||||
await event_bus.publish(
|
||||
"bannerlord.tick",
|
||||
{
|
||||
"tick": result.tick,
|
||||
"party_size": result.party_size,
|
||||
"gold": result.gold,
|
||||
"action": result.action,
|
||||
"action_status": result.action_status,
|
||||
"m2_complete": result.m2_complete,
|
||||
"duration_ms": result.duration_ms,
|
||||
},
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.debug("CampaignLoop emit skipped: %s", exc)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _override_tick(state: "Any", tick_num: int) -> "Any":
|
||||
"""Set the tick counter from the loop (GABS may not provide it)."""
|
||||
if state.tick == 0:
|
||||
state.tick = tick_num
|
||||
return state
|
||||
213
src/bannerlord/campaign_state.py
Normal file
213
src/bannerlord/campaign_state.py
Normal file
@@ -0,0 +1,213 @@
|
||||
"""Bannerlord campaign state models.
|
||||
|
||||
Parses the raw GABS ``game/get_state`` payload into typed models and
|
||||
tracks the M2 progress counters: party size and gold accumulation.
|
||||
|
||||
Done-condition (from issue #1094):
|
||||
party_size >= 100 AND gold >= 10_000
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# M2 victory conditions
|
||||
M2_TROOP_GOAL = 100
|
||||
M2_GOLD_GOAL = 10_000
|
||||
|
||||
|
||||
@dataclass
|
||||
class PartyState:
|
||||
"""Current party composition and position."""
|
||||
|
||||
party_size: int = 0
|
||||
wounded: int = 0
|
||||
prisoners: int = 0
|
||||
food_days: float = 0.0
|
||||
morale: float = 100.0
|
||||
current_settlement: str = ""
|
||||
speed: float = 0.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class EconomyState:
|
||||
"""Current gold and trade state."""
|
||||
|
||||
gold: int = 0
|
||||
daily_income: int = 0
|
||||
daily_expenses: int = 0
|
||||
|
||||
@property
|
||||
def net_income(self) -> int:
|
||||
return self.daily_income - self.daily_expenses
|
||||
|
||||
|
||||
@dataclass
|
||||
class NearbyParty:
|
||||
"""A nearby lord/bandit party visible on the map."""
|
||||
|
||||
party_id: str
|
||||
name: str
|
||||
faction: str
|
||||
is_hostile: bool
|
||||
troop_count: int
|
||||
distance: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class Settlement:
|
||||
"""A settlement visible or reachable from the current position."""
|
||||
|
||||
settlement_id: str
|
||||
name: str
|
||||
faction: str
|
||||
is_friendly: bool
|
||||
distance: float
|
||||
has_recruits: bool = False
|
||||
has_trade_goods: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class CampaignState:
|
||||
"""Full parsed snapshot of the GABS game state.
|
||||
|
||||
Built from the raw ``dict`` returned by ``GabsClient.get_game_state()``.
|
||||
"""
|
||||
|
||||
tick: int = 0
|
||||
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
|
||||
party: PartyState = field(default_factory=PartyState)
|
||||
economy: EconomyState = field(default_factory=EconomyState)
|
||||
nearby_parties: list[NearbyParty] = field(default_factory=list)
|
||||
settlements: list[Settlement] = field(default_factory=list)
|
||||
raw: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
# -- M2 progress -------------------------------------------------------
|
||||
|
||||
@property
|
||||
def troops_progress(self) -> str:
|
||||
"""Human-readable M2 troop progress."""
|
||||
return f"{self.party.party_size}/{M2_TROOP_GOAL}"
|
||||
|
||||
@property
|
||||
def gold_progress(self) -> str:
|
||||
"""Human-readable M2 gold progress."""
|
||||
return f"{self.economy.gold:,}/{M2_GOLD_GOAL:,}"
|
||||
|
||||
@property
|
||||
def m2_complete(self) -> bool:
|
||||
"""True when both M2 victory conditions are met."""
|
||||
return self.party.party_size >= M2_TROOP_GOAL and self.economy.gold >= M2_GOLD_GOAL
|
||||
|
||||
# -- hostile detection -------------------------------------------------
|
||||
|
||||
def hostile_bandits_nearby(self, max_distance: float = 5.0) -> list[NearbyParty]:
|
||||
"""Return hostile bandit parties within *max_distance* map units."""
|
||||
return [
|
||||
p
|
||||
for p in self.nearby_parties
|
||||
if p.is_hostile and "bandit" in p.faction.lower() and p.distance <= max_distance
|
||||
]
|
||||
|
||||
def nearest_settlement(self, *, friendly_only: bool = False) -> Settlement | None:
|
||||
"""Return the closest (optionally friendly) settlement."""
|
||||
candidates = [s for s in self.settlements if not friendly_only or s.is_friendly]
|
||||
if not candidates:
|
||||
return None
|
||||
return min(candidates, key=lambda s: s.distance)
|
||||
|
||||
def nearest_recruit_settlement(self) -> Settlement | None:
|
||||
"""Return the nearest settlement that has recruits available."""
|
||||
candidates = [s for s in self.settlements if s.has_recruits]
|
||||
if not candidates:
|
||||
return None
|
||||
return min(candidates, key=lambda s: s.distance)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Parser
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def parse_campaign_state(raw: dict[str, Any]) -> CampaignState:
|
||||
"""Build a ``CampaignState`` from the raw GABS state dict.
|
||||
|
||||
Unknown / missing fields are silently defaulted so the parser never
|
||||
crashes when GABS returns partial data.
|
||||
"""
|
||||
if not raw:
|
||||
logger.debug("parse_campaign_state: empty payload — returning default state")
|
||||
return CampaignState(raw=raw)
|
||||
|
||||
# -- party -------------------------------------------------------------
|
||||
party_raw = raw.get("party", {})
|
||||
party = PartyState(
|
||||
party_size=int(party_raw.get("size", 0)),
|
||||
wounded=int(party_raw.get("wounded", 0)),
|
||||
prisoners=int(party_raw.get("prisoners", 0)),
|
||||
food_days=float(party_raw.get("food_days", 0.0)),
|
||||
morale=float(party_raw.get("morale", 100.0)),
|
||||
current_settlement=str(party_raw.get("current_settlement", "")),
|
||||
speed=float(party_raw.get("speed", 0.0)),
|
||||
)
|
||||
|
||||
# -- economy -----------------------------------------------------------
|
||||
economy_raw = raw.get("economy", {})
|
||||
economy = EconomyState(
|
||||
gold=int(economy_raw.get("gold", 0)),
|
||||
daily_income=int(economy_raw.get("daily_income", 0)),
|
||||
daily_expenses=int(economy_raw.get("daily_expenses", 0)),
|
||||
)
|
||||
|
||||
# -- nearby parties ----------------------------------------------------
|
||||
nearby_parties = []
|
||||
for p in raw.get("nearby_parties", []):
|
||||
try:
|
||||
if not isinstance(p, dict) or not p.get("id"):
|
||||
logger.debug("Skipping malformed nearby_party entry: missing id")
|
||||
continue
|
||||
nearby_parties.append(
|
||||
NearbyParty(
|
||||
party_id=str(p.get("id", "")),
|
||||
name=str(p.get("name", "")),
|
||||
faction=str(p.get("faction", "")),
|
||||
is_hostile=bool(p.get("is_hostile", False)),
|
||||
troop_count=int(p.get("troop_count", 0)),
|
||||
distance=float(p.get("distance", 999.0)),
|
||||
)
|
||||
)
|
||||
except (KeyError, ValueError, TypeError, AttributeError) as exc:
|
||||
logger.debug("Skipping malformed nearby_party entry: %s", exc)
|
||||
|
||||
# -- settlements -------------------------------------------------------
|
||||
settlements = []
|
||||
for s in raw.get("settlements", []):
|
||||
try:
|
||||
settlements.append(
|
||||
Settlement(
|
||||
settlement_id=str(s.get("id", "")),
|
||||
name=str(s.get("name", "")),
|
||||
faction=str(s.get("faction", "")),
|
||||
is_friendly=bool(s.get("is_friendly", False)),
|
||||
distance=float(s.get("distance", 999.0)),
|
||||
has_recruits=bool(s.get("has_recruits", False)),
|
||||
has_trade_goods=bool(s.get("has_trade_goods", False)),
|
||||
)
|
||||
)
|
||||
except (KeyError, ValueError, TypeError, AttributeError) as exc:
|
||||
logger.debug("Skipping malformed settlement entry: %s", exc)
|
||||
|
||||
return CampaignState(
|
||||
tick=int(raw.get("tick", 0)),
|
||||
timestamp=datetime.now(UTC),
|
||||
party=party,
|
||||
economy=economy,
|
||||
nearby_parties=nearby_parties,
|
||||
settlements=settlements,
|
||||
raw=raw,
|
||||
)
|
||||
284
src/bannerlord/decision.py
Normal file
284
src/bannerlord/decision.py
Normal file
@@ -0,0 +1,284 @@
|
||||
"""LLM-powered campaign decision engine for Bannerlord M2.
|
||||
|
||||
Builds a structured prompt from the current ``CampaignState`` and asks
|
||||
the local Qwen3 model (via Ollama) to choose one action from the M2
|
||||
action vocabulary. Returns a ``CampaignDecision`` pydantic model with
|
||||
the chosen action and its parameters.
|
||||
|
||||
The decision model is intentionally simple for M2:
|
||||
MOVE → move to a named settlement
|
||||
TRADE → buy a trade item
|
||||
RECRUIT → hire troops at current/nearby settlement
|
||||
ENGAGE → fight a nearby bandit party
|
||||
WAIT → idle (e.g. low food, waiting for morale to recover)
|
||||
|
||||
Qwen3 responds in JSON mode with temperature=0.1 for deterministic play.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from enum import StrEnum
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Decision schema
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class M2Action(StrEnum):
|
||||
"""Vocabulary of actions available in the M2 milestone."""
|
||||
|
||||
MOVE = "MOVE"
|
||||
TRADE = "TRADE"
|
||||
RECRUIT = "RECRUIT"
|
||||
ENGAGE = "ENGAGE"
|
||||
WAIT = "WAIT"
|
||||
|
||||
|
||||
class CampaignDecision:
|
||||
"""Parsed LLM decision for one campaign tick.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
action:
|
||||
One of the ``M2Action`` values.
|
||||
settlement_id:
|
||||
Target settlement ID (for MOVE / RECRUIT / TRADE).
|
||||
settlement_name:
|
||||
Human-readable settlement name (for logging).
|
||||
item_id:
|
||||
Trade item to buy (for TRADE).
|
||||
quantity:
|
||||
Trade quantity (for TRADE).
|
||||
party_id:
|
||||
Target party ID (for ENGAGE).
|
||||
party_name:
|
||||
Human-readable party name (for ENGAGE / logging).
|
||||
reasoning:
|
||||
LLM's brief explanation of the choice.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
action: M2Action = M2Action.WAIT,
|
||||
*,
|
||||
settlement_id: str = "",
|
||||
settlement_name: str = "",
|
||||
item_id: str = "",
|
||||
quantity: int = 1,
|
||||
party_id: str = "",
|
||||
party_name: str = "",
|
||||
reasoning: str = "",
|
||||
) -> None:
|
||||
self.action = action
|
||||
self.settlement_id = settlement_id
|
||||
self.settlement_name = settlement_name
|
||||
self.item_id = item_id
|
||||
self.quantity = quantity
|
||||
self.party_id = party_id
|
||||
self.party_name = party_name
|
||||
self.reasoning = reasoning
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f"CampaignDecision(action={self.action!r}, "
|
||||
f"reasoning={self.reasoning[:60]!r})"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Prompt builder
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def build_decision_prompt(state: "Any") -> list[dict[str, str]]:
|
||||
"""Return an OpenAI-style message list for the decision LLM.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
state:
|
||||
A ``CampaignState`` instance.
|
||||
"""
|
||||
# Build a compact context block
|
||||
party = state.party
|
||||
econ = state.economy
|
||||
ctx_lines = [
|
||||
f"Campaign tick: {state.tick}",
|
||||
f"Party size: {party.party_size} troops ({party.wounded} wounded)",
|
||||
f"Food: {party.food_days:.1f} days remaining",
|
||||
f"Morale: {party.morale:.0f}/100",
|
||||
f"Gold: {econ.gold:,} denars (net {econ.net_income:+d}/day)",
|
||||
f"Current location: {party.current_settlement or 'travelling'}",
|
||||
"",
|
||||
"== M2 GOALS ==",
|
||||
f"Troops: {state.troops_progress} (need 100)",
|
||||
f"Gold: {state.gold_progress} (need 10,000)",
|
||||
"",
|
||||
]
|
||||
|
||||
# Nearby bandits
|
||||
bandits = state.hostile_bandits_nearby()
|
||||
if bandits:
|
||||
ctx_lines.append("== NEARBY HOSTILE BANDITS ==")
|
||||
for b in bandits[:3]:
|
||||
ctx_lines.append(
|
||||
f" - {b.name} (id={b.party_id}, {b.troop_count} troops, "
|
||||
f"{b.distance:.1f} away)"
|
||||
)
|
||||
ctx_lines.append("")
|
||||
|
||||
# Settlements
|
||||
settlements = state.settlements[:5]
|
||||
if settlements:
|
||||
ctx_lines.append("== REACHABLE SETTLEMENTS ==")
|
||||
for s in settlements:
|
||||
flags = []
|
||||
if s.has_recruits:
|
||||
flags.append("recruits")
|
||||
if s.has_trade_goods:
|
||||
flags.append("trade")
|
||||
if not s.is_friendly:
|
||||
flags.append("hostile-faction")
|
||||
flag_str = f" [{', '.join(flags)}]" if flags else ""
|
||||
ctx_lines.append(
|
||||
f" - {s.name} (id={s.settlement_id}, "
|
||||
f"{s.distance:.1f} away{flag_str})"
|
||||
)
|
||||
ctx_lines.append("")
|
||||
|
||||
context = "\n".join(ctx_lines)
|
||||
|
||||
system_prompt = (
|
||||
"You are the campaign manager for Timmy, an autonomous Bannerlord agent. "
|
||||
"Your job is to choose the single best action for this campaign tick. "
|
||||
"Respond ONLY with a JSON object — no prose, no markdown fences.\n\n"
|
||||
"JSON schema:\n"
|
||||
'{\n'
|
||||
' "action": "MOVE|TRADE|RECRUIT|ENGAGE|WAIT",\n'
|
||||
' "settlement_id": "<id or empty>",\n'
|
||||
' "settlement_name": "<name or empty>",\n'
|
||||
' "item_id": "<item or empty>",\n'
|
||||
' "quantity": <int>,\n'
|
||||
' "party_id": "<id or empty>",\n'
|
||||
' "party_name": "<name or empty>",\n'
|
||||
' "reasoning": "<one sentence>"\n'
|
||||
"}\n\n"
|
||||
"Priority rules:\n"
|
||||
"1. ENGAGE bandits only if they are weak (< 15 troops) and we have > 25 troops.\n"
|
||||
"2. RECRUIT when a nearby settlement has recruits and party < 80 troops.\n"
|
||||
"3. TRADE when gold < 5000 and a settlement has trade goods.\n"
|
||||
"4. MOVE toward the nearest settlement with recruits or trade goods.\n"
|
||||
"5. WAIT only if food < 1 day or morale < 40."
|
||||
)
|
||||
|
||||
user_prompt = f"Current game state:\n\n{context}\nChoose the best action."
|
||||
|
||||
return [
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": user_prompt},
|
||||
]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Response parser
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def parse_decision(raw_response: str) -> CampaignDecision:
|
||||
"""Parse the LLM JSON response into a ``CampaignDecision``.
|
||||
|
||||
Falls back to ``WAIT`` on any parse error so the loop never crashes.
|
||||
"""
|
||||
# Strip accidental markdown code fences
|
||||
text = raw_response.strip()
|
||||
if text.startswith("```"):
|
||||
lines = text.splitlines()
|
||||
text = "\n".join(
|
||||
line for line in lines if not line.startswith("```")
|
||||
).strip()
|
||||
|
||||
try:
|
||||
data = json.loads(text)
|
||||
except json.JSONDecodeError as exc:
|
||||
logger.warning("Decision parse error (bad JSON): %s | raw=%r", exc, raw_response[:200])
|
||||
return CampaignDecision(action=M2Action.WAIT, reasoning="parse error")
|
||||
|
||||
try:
|
||||
action_str = str(data.get("action", "WAIT")).upper()
|
||||
try:
|
||||
action = M2Action(action_str)
|
||||
except ValueError:
|
||||
logger.warning("Unknown action %r — defaulting to WAIT", action_str)
|
||||
action = M2Action.WAIT
|
||||
|
||||
return CampaignDecision(
|
||||
action=action,
|
||||
settlement_id=str(data.get("settlement_id", "")),
|
||||
settlement_name=str(data.get("settlement_name", "")),
|
||||
item_id=str(data.get("item_id", "")),
|
||||
quantity=max(1, int(data.get("quantity", 1))),
|
||||
party_id=str(data.get("party_id", "")),
|
||||
party_name=str(data.get("party_name", "")),
|
||||
reasoning=str(data.get("reasoning", "")),
|
||||
)
|
||||
except (KeyError, ValueError, TypeError) as exc:
|
||||
logger.warning("Decision parse error (bad fields): %s", exc)
|
||||
return CampaignDecision(action=M2Action.WAIT, reasoning=f"field error: {exc}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def decide(state: "Any") -> CampaignDecision:
|
||||
"""Ask the local LLM to choose a campaign action.
|
||||
|
||||
Uses the cascade router (Ollama → Claude fallback) configured in
|
||||
``config/providers.yaml``. Gracefully returns WAIT on any LLM failure.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
state:
|
||||
A ``CampaignState`` instance.
|
||||
|
||||
Returns
|
||||
-------
|
||||
CampaignDecision
|
||||
The chosen action and its parameters.
|
||||
"""
|
||||
from config import settings
|
||||
|
||||
messages = build_decision_prompt(state)
|
||||
model = settings.bannerlord_model
|
||||
|
||||
try:
|
||||
from infrastructure.router import get_router
|
||||
|
||||
router = get_router()
|
||||
response = await router.complete(
|
||||
messages=messages,
|
||||
model=model,
|
||||
temperature=0.1,
|
||||
)
|
||||
raw_text: str = response.get("content", "")
|
||||
decision = parse_decision(raw_text)
|
||||
logger.info(
|
||||
"Decision [tick=%d]: %s — %s",
|
||||
state.tick,
|
||||
decision.action,
|
||||
decision.reasoning,
|
||||
)
|
||||
return decision
|
||||
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("Decision LLM call failed: %s — defaulting to WAIT", exc)
|
||||
return CampaignDecision(
|
||||
action=M2Action.WAIT,
|
||||
reasoning=f"LLM unavailable: {exc}",
|
||||
)
|
||||
195
src/bannerlord/gabs_client.py
Normal file
195
src/bannerlord/gabs_client.py
Normal file
@@ -0,0 +1,195 @@
|
||||
"""GABS TCP/JSON-RPC client for Bannerlord.
|
||||
|
||||
Connects to the GABS C# mod (Bannerlord.GABS) over TCP on port 4825
|
||||
and dispatches JSON-RPC 2.0 requests. All I/O is async; synchronous
|
||||
callers must wrap in ``asyncio.to_thread()``.
|
||||
|
||||
Architecture:
|
||||
Bannerlord (Windows VM) ← GABS C# mod ← TCP:4825 ← this client
|
||||
|
||||
Usage::
|
||||
|
||||
async with GabsClient() as client:
|
||||
state = await client.get_game_state()
|
||||
result = await client.call("party/move_to_settlement",
|
||||
{"settlement_id": "town_A1"})
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# JSON-RPC framing: each message is newline-delimited UTF-8 JSON.
|
||||
_ENCODING = "utf-8"
|
||||
_NEWLINE = b"\n"
|
||||
_DEFAULT_TIMEOUT = 30.0
|
||||
|
||||
|
||||
class GabsError(Exception):
|
||||
"""Raised when GABS returns a JSON-RPC error response."""
|
||||
|
||||
def __init__(self, code: int, message: str, data: Any = None) -> None:
|
||||
super().__init__(f"GABS error {code}: {message}")
|
||||
self.code = code
|
||||
self.data = data
|
||||
|
||||
|
||||
class GabsClient:
|
||||
"""Async TCP JSON-RPC 2.0 client for the GABS Bannerlord mod.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
host:
|
||||
GABS server host (Windows VM IP or ``localhost`` for port-forwarded).
|
||||
port:
|
||||
GABS server port (default 4825).
|
||||
timeout:
|
||||
Per-call timeout in seconds.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
host: str | None = None,
|
||||
port: int | None = None,
|
||||
timeout: float = _DEFAULT_TIMEOUT,
|
||||
) -> None:
|
||||
self._host = host or settings.gabs_host
|
||||
self._port = port or settings.gabs_port
|
||||
self._timeout = timeout
|
||||
self._reader: asyncio.StreamReader | None = None
|
||||
self._writer: asyncio.StreamWriter | None = None
|
||||
self._req_id = 0
|
||||
self._connected = False
|
||||
|
||||
# -- lifecycle ---------------------------------------------------------
|
||||
|
||||
async def connect(self) -> None:
|
||||
"""Open the TCP connection to GABS."""
|
||||
try:
|
||||
self._reader, self._writer = await asyncio.wait_for(
|
||||
asyncio.open_connection(self._host, self._port),
|
||||
timeout=self._timeout,
|
||||
)
|
||||
self._connected = True
|
||||
logger.info("GabsClient connected to %s:%d", self._host, self._port)
|
||||
except (OSError, asyncio.TimeoutError) as exc:
|
||||
logger.warning("GabsClient could not connect to GABS: %s", exc)
|
||||
self._connected = False
|
||||
raise
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
"""Close the TCP connection."""
|
||||
if self._writer is not None:
|
||||
try:
|
||||
self._writer.close()
|
||||
await self._writer.wait_closed()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.debug("GabsClient disconnect error (ignored): %s", exc)
|
||||
self._connected = False
|
||||
self._reader = None
|
||||
self._writer = None
|
||||
logger.info("GabsClient disconnected")
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return self._connected
|
||||
|
||||
# -- context manager ---------------------------------------------------
|
||||
|
||||
async def __aenter__(self) -> "GabsClient":
|
||||
await self.connect()
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *_: Any) -> None:
|
||||
await self.disconnect()
|
||||
|
||||
# -- public API --------------------------------------------------------
|
||||
|
||||
async def call(self, method: str, params: dict[str, Any] | None = None) -> Any:
|
||||
"""Call a GABS tool and return the result.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
method:
|
||||
GABS tool name, e.g. ``"party/move_to_settlement"``.
|
||||
params:
|
||||
Tool parameters dict.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Any
|
||||
The ``result`` field from the JSON-RPC response.
|
||||
|
||||
Raises
|
||||
------
|
||||
GabsError
|
||||
If GABS returns an error response.
|
||||
RuntimeError
|
||||
If not connected.
|
||||
"""
|
||||
if not self._connected or self._writer is None or self._reader is None:
|
||||
raise RuntimeError("GabsClient is not connected — call connect() first")
|
||||
|
||||
self._req_id += 1
|
||||
request = {
|
||||
"jsonrpc": "2.0",
|
||||
"id": self._req_id,
|
||||
"method": method,
|
||||
"params": params or {},
|
||||
}
|
||||
|
||||
raw = json.dumps(request).encode(_ENCODING) + _NEWLINE
|
||||
|
||||
try:
|
||||
self._writer.write(raw)
|
||||
await asyncio.wait_for(self._writer.drain(), timeout=self._timeout)
|
||||
|
||||
line = await asyncio.wait_for(
|
||||
self._reader.readline(), timeout=self._timeout
|
||||
)
|
||||
except asyncio.TimeoutError as exc:
|
||||
raise RuntimeError(f"GABS call '{method}' timed out after {self._timeout}s") from exc
|
||||
except (OSError, ConnectionResetError) as exc:
|
||||
self._connected = False
|
||||
raise RuntimeError(f"GABS connection lost during '{method}': {exc}") from exc
|
||||
|
||||
response = json.loads(line.decode(_ENCODING))
|
||||
|
||||
if "error" in response:
|
||||
err = response["error"]
|
||||
raise GabsError(
|
||||
code=err.get("code", -1),
|
||||
message=err.get("message", "unknown error"),
|
||||
data=err.get("data"),
|
||||
)
|
||||
|
||||
return response.get("result")
|
||||
|
||||
async def get_game_state(self) -> dict[str, Any]:
|
||||
"""Return the full game state snapshot from GABS.
|
||||
|
||||
Returns an empty dict and logs a warning if GABS is unreachable.
|
||||
"""
|
||||
try:
|
||||
result = await self.call("game/get_state")
|
||||
return result if isinstance(result, dict) else {}
|
||||
except (GabsError, RuntimeError) as exc:
|
||||
logger.warning("GABS get_game_state failed: %s", exc)
|
||||
return {}
|
||||
|
||||
async def ping(self) -> bool:
|
||||
"""Return True if GABS responds to a ping."""
|
||||
try:
|
||||
await self.call("game/ping")
|
||||
return True
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.debug("GABS ping failed: %s", exc)
|
||||
return False
|
||||
@@ -374,6 +374,17 @@ class Settings(BaseSettings):
|
||||
error_feedback_enabled: bool = True # Auto-create bug report tasks
|
||||
error_dedup_window_seconds: int = 300 # 5-min dedup window
|
||||
|
||||
# ── Bannerlord / GABS ─────────────────────────────────────────────
|
||||
# GABS (Bannerlord Agent Bridge System) TCP/JSON-RPC server.
|
||||
# Runs inside the Windows VM hosting Bannerlord.
|
||||
# Override with GABS_HOST / GABS_PORT env vars.
|
||||
gabs_host: str = "localhost"
|
||||
gabs_port: int = 4825
|
||||
# Decision model for the Bannerlord campaign agent (Qwen3 preferred).
|
||||
bannerlord_model: str = "qwen3:14b"
|
||||
# Campaign-tick interval in seconds (real-time pause between in-game days).
|
||||
bannerlord_tick_seconds: float = 5.0
|
||||
|
||||
# ── Scripture / Biblical Integration ──────────────────────────────
|
||||
# Enable the biblical text module.
|
||||
scripture_enabled: bool = True
|
||||
|
||||
234
src/infrastructure/world/adapters/bannerlord.py
Normal file
234
src/infrastructure/world/adapters/bannerlord.py
Normal file
@@ -0,0 +1,234 @@
|
||||
"""Bannerlord world adapter — bridges GABS to the WorldInterface contract.
|
||||
|
||||
Allows the existing ``Heartbeat`` loop to drive the Bannerlord campaign
|
||||
by treating it as just another game world. Wraps the async ``GabsClient``
|
||||
for synchronous use (the ``Heartbeat`` calls ``observe()`` and ``act()``
|
||||
synchronously).
|
||||
|
||||
Async callers should use ``CampaignLoop`` directly — it is more efficient
|
||||
and handles the full M2 logic natively.
|
||||
|
||||
Usage::
|
||||
|
||||
adapter = BannerlordWorldAdapter()
|
||||
adapter.connect()
|
||||
heartbeat = Heartbeat(world=adapter, interval=5.0)
|
||||
await heartbeat.run_once()
|
||||
adapter.disconnect()
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from infrastructure.world.interface import WorldInterface
|
||||
from infrastructure.world.types import (
|
||||
ActionResult,
|
||||
ActionStatus,
|
||||
CommandInput,
|
||||
PerceptionOutput,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BannerlordWorldAdapter(WorldInterface):
|
||||
"""WorldInterface adapter for Bannerlord via GABS.
|
||||
|
||||
Wraps ``GabsClient`` and ``CampaignState`` to present the Bannerlord
|
||||
campaign map as a ``WorldInterface``-compatible world.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
host:
|
||||
Override GABS server host (defaults to ``settings.gabs_host``).
|
||||
port:
|
||||
Override GABS server port (defaults to ``settings.gabs_port``).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
host: str | None = None,
|
||||
port: int | None = None,
|
||||
) -> None:
|
||||
from config import settings
|
||||
|
||||
self._host = host or settings.gabs_host
|
||||
self._port = port or settings.gabs_port
|
||||
self._connected = False
|
||||
self._client = None
|
||||
self._loop: asyncio.AbstractEventLoop | None = None
|
||||
|
||||
# -- lifecycle ---------------------------------------------------------
|
||||
|
||||
def connect(self) -> None:
|
||||
"""Open the GABS TCP connection (synchronous wrapper)."""
|
||||
from bannerlord.gabs_client import GabsClient
|
||||
|
||||
self._client = GabsClient(host=self._host, port=self._port)
|
||||
try:
|
||||
self._loop = asyncio.get_event_loop()
|
||||
except RuntimeError:
|
||||
self._loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self._loop)
|
||||
|
||||
try:
|
||||
self._loop.run_until_complete(self._client.connect())
|
||||
self._connected = True
|
||||
logger.info("BannerlordWorldAdapter connected to GABS")
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("BannerlordWorldAdapter: GABS connect failed: %s", exc)
|
||||
self._connected = False
|
||||
|
||||
def disconnect(self) -> None:
|
||||
"""Close the GABS TCP connection (synchronous wrapper)."""
|
||||
if self._client is not None and self._loop is not None:
|
||||
try:
|
||||
self._loop.run_until_complete(self._client.disconnect())
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.debug("BannerlordWorldAdapter disconnect error: %s", exc)
|
||||
self._connected = False
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return self._connected
|
||||
|
||||
# -- core contract -----------------------------------------------------
|
||||
|
||||
def observe(self) -> PerceptionOutput:
|
||||
"""Poll GABS for current game state and return structured perception."""
|
||||
from bannerlord.campaign_state import parse_campaign_state
|
||||
|
||||
if not self._connected or self._client is None or self._loop is None:
|
||||
return PerceptionOutput(
|
||||
location="disconnected",
|
||||
entities=[],
|
||||
events=["gabs_disconnected"],
|
||||
raw={"error": "GABS not connected"},
|
||||
)
|
||||
|
||||
try:
|
||||
raw = self._loop.run_until_complete(self._client.get_game_state())
|
||||
state = parse_campaign_state(raw)
|
||||
|
||||
# Build entities list from settlements and nearby parties
|
||||
entities: list[str] = []
|
||||
for s in state.settlements[:5]:
|
||||
entities.append(f"settlement:{s.name}")
|
||||
for p in state.nearby_parties[:3]:
|
||||
prefix = "hostile" if p.is_hostile else "friendly"
|
||||
entities.append(f"{prefix}_party:{p.name}")
|
||||
|
||||
# Build events list
|
||||
events: list[str] = []
|
||||
if state.party.food_days < 2.0:
|
||||
events.append("low_food")
|
||||
if state.party.morale < 40:
|
||||
events.append("low_morale")
|
||||
if state.hostile_bandits_nearby():
|
||||
events.append("bandits_nearby")
|
||||
if state.m2_complete:
|
||||
events.append("m2_complete")
|
||||
|
||||
location = state.party.current_settlement or "campaign_map"
|
||||
|
||||
return PerceptionOutput(
|
||||
location=location,
|
||||
entities=entities,
|
||||
events=events,
|
||||
raw=raw,
|
||||
)
|
||||
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("BannerlordWorldAdapter.observe() failed: %s", exc)
|
||||
return PerceptionOutput(
|
||||
location="unknown",
|
||||
entities=[],
|
||||
events=[f"observe_error:{exc}"],
|
||||
raw={"error": str(exc)},
|
||||
)
|
||||
|
||||
def act(self, command: CommandInput) -> ActionResult:
|
||||
"""Dispatch a campaign command to GABS.
|
||||
|
||||
Recognized ``command.action`` values:
|
||||
- ``"move"`` → party/move_to_settlement (target = settlement_id)
|
||||
- ``"trade"`` → inventory/buy_item (target = item_id)
|
||||
- ``"recruit"`` → party/recruit_all
|
||||
- ``"engage"`` → party/engage_party (target = party_id)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
command:
|
||||
WorldInterface ``CommandInput`` with action, target, parameters.
|
||||
"""
|
||||
if not self._connected or self._client is None or self._loop is None:
|
||||
return ActionResult(
|
||||
status=ActionStatus.FAILURE,
|
||||
message="GABS not connected",
|
||||
)
|
||||
|
||||
try:
|
||||
return self._loop.run_until_complete(self._async_act(command))
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("BannerlordWorldAdapter.act() failed: %s", exc)
|
||||
return ActionResult(
|
||||
status=ActionStatus.FAILURE,
|
||||
message=f"act failed: {exc}",
|
||||
)
|
||||
|
||||
async def _async_act(self, command: CommandInput) -> ActionResult:
|
||||
"""Async implementation of act()."""
|
||||
from bannerlord.campaign_actions import (
|
||||
buy_item,
|
||||
engage_party,
|
||||
move_to_settlement,
|
||||
recruit_all,
|
||||
)
|
||||
|
||||
action = command.action.lower()
|
||||
params = command.parameters
|
||||
|
||||
if action == "move":
|
||||
settlement_id = command.target or params.get("settlement_id", "")
|
||||
return await move_to_settlement(
|
||||
self._client,
|
||||
settlement_id,
|
||||
settlement_name=params.get("settlement_name", ""),
|
||||
)
|
||||
|
||||
elif action == "trade":
|
||||
item_id = command.target or params.get("item_id", "")
|
||||
quantity = int(params.get("quantity", 1))
|
||||
return await buy_item(
|
||||
self._client,
|
||||
item_id,
|
||||
quantity,
|
||||
settlement_id=params.get("settlement_id", ""),
|
||||
)
|
||||
|
||||
elif action == "recruit":
|
||||
return await recruit_all(
|
||||
self._client,
|
||||
settlement_id=params.get("settlement_id", ""),
|
||||
)
|
||||
|
||||
elif action == "engage":
|
||||
party_id = command.target or params.get("party_id", "")
|
||||
return await engage_party(
|
||||
self._client,
|
||||
party_id,
|
||||
party_name=params.get("party_name", ""),
|
||||
)
|
||||
|
||||
else:
|
||||
return ActionResult(
|
||||
status=ActionStatus.NOOP,
|
||||
message=f"Unknown action: {command.action}",
|
||||
)
|
||||
|
||||
def speak(self, message: str, target: str | None = None) -> None:
|
||||
"""Log the message — GABS has no chat mechanism in M2."""
|
||||
logger.info("BannerlordWorldAdapter.speak: %r (target=%r)", message, target)
|
||||
0
tests/bannerlord/__init__.py
Normal file
0
tests/bannerlord/__init__.py
Normal file
102
tests/bannerlord/test_campaign_actions.py
Normal file
102
tests/bannerlord/test_campaign_actions.py
Normal file
@@ -0,0 +1,102 @@
|
||||
"""Unit tests for bannerlord.campaign_actions."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from bannerlord.campaign_actions import (
|
||||
GabsTool,
|
||||
buy_item,
|
||||
engage_party,
|
||||
move_to_settlement,
|
||||
recruit_all,
|
||||
)
|
||||
from infrastructure.world.types import ActionStatus
|
||||
|
||||
|
||||
def _mock_client(return_value=None, raise_exc=None):
|
||||
"""Build a mock GabsClient."""
|
||||
client = MagicMock()
|
||||
if raise_exc is not None:
|
||||
client.call = AsyncMock(side_effect=raise_exc)
|
||||
else:
|
||||
client.call = AsyncMock(return_value=return_value)
|
||||
return client
|
||||
|
||||
|
||||
class TestMoveToSettlement:
|
||||
async def test_success(self):
|
||||
client = _mock_client({"eta_days": 2})
|
||||
result = await move_to_settlement(client, "town_A1", settlement_name="Marunath")
|
||||
assert result.status == ActionStatus.SUCCESS
|
||||
client.call.assert_called_once_with(
|
||||
GabsTool.MOVE_TO_SETTLEMENT, {"settlement_id": "town_A1"}
|
||||
)
|
||||
|
||||
async def test_failure_on_gabs_error(self):
|
||||
client = _mock_client(raise_exc=RuntimeError("GABS timeout"))
|
||||
result = await move_to_settlement(client, "town_A1")
|
||||
assert result.status == ActionStatus.FAILURE
|
||||
assert "GABS timeout" in result.message
|
||||
|
||||
async def test_uses_settlement_id_as_label_when_no_name(self):
|
||||
client = _mock_client({})
|
||||
result = await move_to_settlement(client, "town_B2")
|
||||
assert result.status == ActionStatus.SUCCESS
|
||||
assert "town_B2" in result.message
|
||||
|
||||
|
||||
class TestBuyItem:
|
||||
async def test_success(self):
|
||||
client = _mock_client({"cost": 100})
|
||||
result = await buy_item(client, "grain", 5)
|
||||
assert result.status == ActionStatus.SUCCESS
|
||||
assert "grain" in result.message
|
||||
client.call.assert_called_once_with(
|
||||
GabsTool.BUY_ITEM, {"item_id": "grain", "quantity": 5}
|
||||
)
|
||||
|
||||
async def test_includes_settlement_id_when_given(self):
|
||||
client = _mock_client({})
|
||||
await buy_item(client, "iron", 2, settlement_id="town_A1")
|
||||
call_params = client.call.call_args[0][1]
|
||||
assert call_params["settlement_id"] == "town_A1"
|
||||
|
||||
async def test_failure_logged_gracefully(self):
|
||||
client = _mock_client(raise_exc=Exception("inventory full"))
|
||||
result = await buy_item(client, "wool", 10)
|
||||
assert result.status == ActionStatus.FAILURE
|
||||
|
||||
|
||||
class TestRecruitAll:
|
||||
async def test_success(self):
|
||||
client = _mock_client({"recruited": 15})
|
||||
result = await recruit_all(client)
|
||||
assert result.status == ActionStatus.SUCCESS
|
||||
assert "15" in result.message
|
||||
|
||||
async def test_success_with_settlement(self):
|
||||
client = _mock_client({"recruited": 8})
|
||||
result = await recruit_all(client, settlement_id="town_A1")
|
||||
call_params = client.call.call_args[0][1]
|
||||
assert call_params["settlement_id"] == "town_A1"
|
||||
|
||||
async def test_failure_graceful(self):
|
||||
client = _mock_client(raise_exc=RuntimeError("no recruits"))
|
||||
result = await recruit_all(client)
|
||||
assert result.status == ActionStatus.FAILURE
|
||||
|
||||
|
||||
class TestEngageParty:
|
||||
async def test_success(self):
|
||||
client = _mock_client({"outcome": "victory", "loot": 200})
|
||||
result = await engage_party(client, "bandit_1", party_name="Forest Bandits")
|
||||
assert result.status == ActionStatus.SUCCESS
|
||||
assert "victory" in result.message
|
||||
|
||||
async def test_failure_graceful(self):
|
||||
client = _mock_client(raise_exc=RuntimeError("party not found"))
|
||||
result = await engage_party(client, "bandit_1")
|
||||
assert result.status == ActionStatus.FAILURE
|
||||
200
tests/bannerlord/test_campaign_loop.py
Normal file
200
tests/bannerlord/test_campaign_loop.py
Normal file
@@ -0,0 +1,200 @@
|
||||
"""Unit tests for bannerlord.campaign_loop."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from bannerlord.campaign_loop import CampaignLoop, TickResult
|
||||
from bannerlord.decision import CampaignDecision, M2Action
|
||||
from infrastructure.world.types import ActionResult, ActionStatus
|
||||
|
||||
|
||||
def _make_game_state(*, troops: int = 30, gold: int = 2000) -> dict:
|
||||
return {
|
||||
"tick": 0,
|
||||
"party": {
|
||||
"size": troops,
|
||||
"wounded": 0,
|
||||
"food_days": 5.0,
|
||||
"morale": 80.0,
|
||||
"current_settlement": "town_A1",
|
||||
},
|
||||
"economy": {"gold": gold, "daily_income": 200, "daily_expenses": 150},
|
||||
"nearby_parties": [],
|
||||
"settlements": [
|
||||
{
|
||||
"id": "town_A1",
|
||||
"name": "Marunath",
|
||||
"faction": "aserai",
|
||||
"is_friendly": True,
|
||||
"distance": 0.0,
|
||||
"has_recruits": True,
|
||||
"has_trade_goods": False,
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
class TestCampaignLoopDispatch:
|
||||
"""Tests for the internal _dispatch() routing."""
|
||||
|
||||
def _loop(self) -> CampaignLoop:
|
||||
return CampaignLoop(tick_seconds=0.0, max_ticks=1)
|
||||
|
||||
async def test_dispatch_move(self):
|
||||
loop = self._loop()
|
||||
client = MagicMock()
|
||||
decision = CampaignDecision(
|
||||
action=M2Action.MOVE,
|
||||
settlement_id="town_A1",
|
||||
settlement_name="Marunath",
|
||||
)
|
||||
|
||||
with patch("bannerlord.campaign_loop.move_to_settlement", new_callable=AsyncMock) as mock_move:
|
||||
mock_move.return_value = ActionResult(status=ActionStatus.SUCCESS, message="ok")
|
||||
await loop._dispatch(decision, client)
|
||||
mock_move.assert_called_once_with(client, "town_A1", settlement_name="Marunath")
|
||||
|
||||
async def test_dispatch_recruit(self):
|
||||
loop = self._loop()
|
||||
client = MagicMock()
|
||||
decision = CampaignDecision(
|
||||
action=M2Action.RECRUIT,
|
||||
settlement_id="town_A1",
|
||||
)
|
||||
|
||||
with patch("bannerlord.campaign_loop.recruit_all", new_callable=AsyncMock) as mock_recruit:
|
||||
mock_recruit.return_value = ActionResult(status=ActionStatus.SUCCESS, message="15 recruited")
|
||||
await loop._dispatch(decision, client)
|
||||
mock_recruit.assert_called_once()
|
||||
|
||||
async def test_dispatch_engage(self):
|
||||
loop = self._loop()
|
||||
client = MagicMock()
|
||||
decision = CampaignDecision(
|
||||
action=M2Action.ENGAGE,
|
||||
party_id="bandit_1",
|
||||
party_name="Forest Bandits",
|
||||
)
|
||||
|
||||
with patch("bannerlord.campaign_loop.engage_party", new_callable=AsyncMock) as mock_engage:
|
||||
mock_engage.return_value = ActionResult(status=ActionStatus.SUCCESS, message="victory")
|
||||
await loop._dispatch(decision, client)
|
||||
mock_engage.assert_called_once_with(client, "bandit_1", party_name="Forest Bandits")
|
||||
|
||||
async def test_dispatch_trade(self):
|
||||
loop = self._loop()
|
||||
client = MagicMock()
|
||||
decision = CampaignDecision(
|
||||
action=M2Action.TRADE,
|
||||
item_id="grain",
|
||||
quantity=5,
|
||||
)
|
||||
|
||||
with patch("bannerlord.campaign_loop.buy_item", new_callable=AsyncMock) as mock_buy:
|
||||
mock_buy.return_value = ActionResult(status=ActionStatus.SUCCESS, message="bought")
|
||||
await loop._dispatch(decision, client)
|
||||
mock_buy.assert_called_once_with(client, "grain", 5, settlement_id="")
|
||||
|
||||
async def test_dispatch_wait_returns_noop(self):
|
||||
loop = self._loop()
|
||||
client = MagicMock()
|
||||
decision = CampaignDecision(action=M2Action.WAIT, reasoning="low food")
|
||||
result = await loop._dispatch(decision, client)
|
||||
assert result.status == ActionStatus.NOOP
|
||||
|
||||
async def test_dispatch_move_missing_settlement_id(self):
|
||||
loop = self._loop()
|
||||
client = MagicMock()
|
||||
decision = CampaignDecision(action=M2Action.MOVE, settlement_id="")
|
||||
result = await loop._dispatch(decision, client)
|
||||
assert result.status == ActionStatus.FAILURE
|
||||
|
||||
async def test_dispatch_engage_missing_party_id(self):
|
||||
loop = self._loop()
|
||||
client = MagicMock()
|
||||
decision = CampaignDecision(action=M2Action.ENGAGE, party_id="")
|
||||
result = await loop._dispatch(decision, client)
|
||||
assert result.status == ActionStatus.FAILURE
|
||||
|
||||
|
||||
class TestCampaignLoopRun:
|
||||
"""Integration-level tests for the full run() loop (mocked GABS)."""
|
||||
|
||||
async def test_run_stops_at_max_ticks(self):
|
||||
"""Loop respects max_ticks and returns correct number of results."""
|
||||
game_state = _make_game_state()
|
||||
|
||||
with (
|
||||
patch("bannerlord.campaign_loop.GabsClient") as MockClient,
|
||||
patch("bannerlord.campaign_loop.decide", new_callable=AsyncMock) as mock_decide,
|
||||
patch("bannerlord.campaign_loop.move_to_settlement", new_callable=AsyncMock) as mock_move,
|
||||
):
|
||||
# Setup fake client
|
||||
fake_client = AsyncMock()
|
||||
fake_client.get_game_state = AsyncMock(return_value=game_state)
|
||||
fake_client.connect = AsyncMock()
|
||||
fake_client.disconnect = AsyncMock()
|
||||
MockClient.return_value = fake_client
|
||||
|
||||
mock_decide.return_value = CampaignDecision(
|
||||
action=M2Action.MOVE,
|
||||
settlement_id="town_B1",
|
||||
settlement_name="Epicrotea",
|
||||
reasoning="moving",
|
||||
)
|
||||
mock_move.return_value = ActionResult(status=ActionStatus.SUCCESS, message="ok")
|
||||
|
||||
loop = CampaignLoop(tick_seconds=0.0, max_ticks=3)
|
||||
results = await loop.run()
|
||||
|
||||
assert len(results) == 3
|
||||
assert all(isinstance(r, TickResult) for r in results)
|
||||
|
||||
async def test_run_stops_when_m2_complete(self):
|
||||
"""Loop exits early when M2 conditions are met."""
|
||||
# State with M2 already complete
|
||||
game_state = _make_game_state(troops=100, gold=10000)
|
||||
|
||||
with (
|
||||
patch("bannerlord.campaign_loop.GabsClient") as MockClient,
|
||||
patch("bannerlord.campaign_loop.decide", new_callable=AsyncMock) as mock_decide,
|
||||
):
|
||||
fake_client = AsyncMock()
|
||||
fake_client.get_game_state = AsyncMock(return_value=game_state)
|
||||
fake_client.connect = AsyncMock()
|
||||
fake_client.disconnect = AsyncMock()
|
||||
MockClient.return_value = fake_client
|
||||
|
||||
mock_decide.return_value = CampaignDecision(
|
||||
action=M2Action.WAIT,
|
||||
reasoning="done",
|
||||
)
|
||||
|
||||
loop = CampaignLoop(tick_seconds=0.0, max_ticks=10)
|
||||
results = await loop.run()
|
||||
|
||||
# Should exit after first tick (m2_complete = True)
|
||||
assert len(results) == 1
|
||||
assert results[0].m2_complete is True
|
||||
|
||||
async def test_run_aborts_on_connect_failure(self):
|
||||
"""Loop returns empty history if GABS cannot be reached."""
|
||||
with patch("bannerlord.campaign_loop.GabsClient") as MockClient:
|
||||
fake_client = AsyncMock()
|
||||
fake_client.connect = AsyncMock(side_effect=OSError("refused"))
|
||||
fake_client.disconnect = AsyncMock()
|
||||
MockClient.return_value = fake_client
|
||||
|
||||
loop = CampaignLoop(tick_seconds=0.0, max_ticks=5)
|
||||
results = await loop.run()
|
||||
|
||||
assert results == []
|
||||
|
||||
def test_stop_sets_running_false(self):
|
||||
loop = CampaignLoop()
|
||||
loop._running = True
|
||||
loop.stop()
|
||||
assert not loop.is_running
|
||||
150
tests/bannerlord/test_campaign_state.py
Normal file
150
tests/bannerlord/test_campaign_state.py
Normal file
@@ -0,0 +1,150 @@
|
||||
"""Unit tests for bannerlord.campaign_state."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from bannerlord.campaign_state import (
|
||||
M2_GOLD_GOAL,
|
||||
M2_TROOP_GOAL,
|
||||
CampaignState,
|
||||
NearbyParty,
|
||||
Settlement,
|
||||
parse_campaign_state,
|
||||
)
|
||||
|
||||
|
||||
class TestParseCampaignState:
|
||||
def test_empty_dict_returns_defaults(self):
|
||||
state = parse_campaign_state({})
|
||||
assert state.party.party_size == 0
|
||||
assert state.economy.gold == 0
|
||||
assert state.nearby_parties == []
|
||||
assert state.settlements == []
|
||||
|
||||
def test_full_payload_parsed(self):
|
||||
raw = {
|
||||
"tick": 5,
|
||||
"party": {
|
||||
"size": 30,
|
||||
"wounded": 2,
|
||||
"prisoners": 1,
|
||||
"food_days": 3.5,
|
||||
"morale": 75.0,
|
||||
"current_settlement": "town_A1",
|
||||
"speed": 5.2,
|
||||
},
|
||||
"economy": {
|
||||
"gold": 4500,
|
||||
"daily_income": 200,
|
||||
"daily_expenses": 150,
|
||||
},
|
||||
"nearby_parties": [
|
||||
{
|
||||
"id": "bandit_1",
|
||||
"name": "Forest Bandits",
|
||||
"faction": "bandit",
|
||||
"is_hostile": True,
|
||||
"troop_count": 10,
|
||||
"distance": 3.0,
|
||||
}
|
||||
],
|
||||
"settlements": [
|
||||
{
|
||||
"id": "town_A1",
|
||||
"name": "Marunath",
|
||||
"faction": "aserai",
|
||||
"is_friendly": True,
|
||||
"distance": 0.0,
|
||||
"has_recruits": True,
|
||||
"has_trade_goods": False,
|
||||
}
|
||||
],
|
||||
}
|
||||
state = parse_campaign_state(raw)
|
||||
|
||||
assert state.tick == 5
|
||||
assert state.party.party_size == 30
|
||||
assert state.party.wounded == 2
|
||||
assert state.economy.gold == 4500
|
||||
assert state.economy.net_income == 50
|
||||
assert len(state.nearby_parties) == 1
|
||||
assert state.nearby_parties[0].name == "Forest Bandits"
|
||||
assert len(state.settlements) == 1
|
||||
assert state.settlements[0].name == "Marunath"
|
||||
|
||||
def test_malformed_entries_skipped(self):
|
||||
raw = {
|
||||
"nearby_parties": [{"id": "ok", "name": "Good", "faction": "bandit",
|
||||
"is_hostile": True, "troop_count": 5, "distance": 2.0},
|
||||
{"bad": "data"}],
|
||||
"settlements": [None, "not_a_dict"],
|
||||
}
|
||||
state = parse_campaign_state(raw)
|
||||
assert len(state.nearby_parties) == 1
|
||||
assert state.settlements == []
|
||||
|
||||
|
||||
class TestCampaignStateProperties:
|
||||
def _make_state(self, *, troops: int, gold: int) -> CampaignState:
|
||||
state = CampaignState()
|
||||
state.party.party_size = troops
|
||||
state.economy.gold = gold
|
||||
return state
|
||||
|
||||
def test_m2_not_complete_by_default(self):
|
||||
state = self._make_state(troops=20, gold=0)
|
||||
assert not state.m2_complete
|
||||
|
||||
def test_m2_complete_when_both_goals_met(self):
|
||||
state = self._make_state(troops=M2_TROOP_GOAL, gold=M2_GOLD_GOAL)
|
||||
assert state.m2_complete
|
||||
|
||||
def test_m2_not_complete_if_only_troops_met(self):
|
||||
state = self._make_state(troops=M2_TROOP_GOAL, gold=M2_GOLD_GOAL - 1)
|
||||
assert not state.m2_complete
|
||||
|
||||
def test_m2_not_complete_if_only_gold_met(self):
|
||||
state = self._make_state(troops=M2_TROOP_GOAL - 1, gold=M2_GOLD_GOAL)
|
||||
assert not state.m2_complete
|
||||
|
||||
def test_troops_progress_string(self):
|
||||
state = self._make_state(troops=45, gold=0)
|
||||
assert state.troops_progress == f"45/{M2_TROOP_GOAL}"
|
||||
|
||||
def test_gold_progress_string(self):
|
||||
state = self._make_state(troops=0, gold=3000)
|
||||
assert "3,000" in state.gold_progress
|
||||
|
||||
def test_hostile_bandits_nearby_filter(self):
|
||||
state = CampaignState()
|
||||
state.nearby_parties = [
|
||||
NearbyParty("b1", "Bandits", "bandit", True, 10, 2.0),
|
||||
NearbyParty("l1", "Lord", "empire", False, 50, 1.0),
|
||||
NearbyParty("b2", "Far Bandits", "bandit", True, 5, 10.0),
|
||||
]
|
||||
nearby = state.hostile_bandits_nearby(max_distance=5.0)
|
||||
assert len(nearby) == 1
|
||||
assert nearby[0].party_id == "b1"
|
||||
|
||||
def test_nearest_settlement_returns_closest(self):
|
||||
state = CampaignState()
|
||||
state.settlements = [
|
||||
Settlement("s1", "Far Town", "empire", True, 10.0),
|
||||
Settlement("s2", "Near Town", "empire", True, 2.0),
|
||||
]
|
||||
nearest = state.nearest_settlement()
|
||||
assert nearest.settlement_id == "s2"
|
||||
|
||||
def test_nearest_recruit_settlement(self):
|
||||
state = CampaignState()
|
||||
state.settlements = [
|
||||
Settlement("s1", "Town A", "empire", True, 5.0, has_recruits=False),
|
||||
Settlement("s2", "Town B", "empire", True, 8.0, has_recruits=True),
|
||||
]
|
||||
recruit = state.nearest_recruit_settlement()
|
||||
assert recruit.settlement_id == "s2"
|
||||
|
||||
def test_nearest_settlement_none_when_empty(self):
|
||||
state = CampaignState()
|
||||
assert state.nearest_settlement() is None
|
||||
154
tests/bannerlord/test_decision.py
Normal file
154
tests/bannerlord/test_decision.py
Normal file
@@ -0,0 +1,154 @@
|
||||
"""Unit tests for bannerlord.decision."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
from bannerlord.campaign_state import (
|
||||
CampaignState,
|
||||
EconomyState,
|
||||
NearbyParty,
|
||||
PartyState,
|
||||
Settlement,
|
||||
)
|
||||
from bannerlord.decision import (
|
||||
M2Action,
|
||||
CampaignDecision,
|
||||
build_decision_prompt,
|
||||
parse_decision,
|
||||
)
|
||||
|
||||
|
||||
def _make_state(
|
||||
*,
|
||||
troops: int = 30,
|
||||
gold: int = 2000,
|
||||
food_days: float = 5.0,
|
||||
morale: float = 80.0,
|
||||
settlements: list | None = None,
|
||||
nearby_parties: list | None = None,
|
||||
) -> CampaignState:
|
||||
state = CampaignState()
|
||||
state.party = PartyState(
|
||||
party_size=troops,
|
||||
food_days=food_days,
|
||||
morale=morale,
|
||||
)
|
||||
state.economy = EconomyState(gold=gold, daily_income=200, daily_expenses=150)
|
||||
state.settlements = settlements or []
|
||||
state.nearby_parties = nearby_parties or []
|
||||
return state
|
||||
|
||||
|
||||
class TestBuildDecisionPrompt:
|
||||
def test_returns_two_messages(self):
|
||||
state = _make_state()
|
||||
messages = build_decision_prompt(state)
|
||||
assert len(messages) == 2
|
||||
assert messages[0]["role"] == "system"
|
||||
assert messages[1]["role"] == "user"
|
||||
|
||||
def test_user_message_includes_party_info(self):
|
||||
state = _make_state(troops=45, gold=3000)
|
||||
messages = build_decision_prompt(state)
|
||||
user_content = messages[1]["content"]
|
||||
assert "45" in user_content
|
||||
assert "3,000" in user_content
|
||||
|
||||
def test_bandits_appear_in_prompt_when_nearby(self):
|
||||
state = _make_state(
|
||||
nearby_parties=[NearbyParty("b1", "Forest Bandits", "bandit", True, 10, 2.0)]
|
||||
)
|
||||
messages = build_decision_prompt(state)
|
||||
user_content = messages[1]["content"]
|
||||
assert "Forest Bandits" in user_content
|
||||
|
||||
def test_settlements_appear_in_prompt(self):
|
||||
state = _make_state(
|
||||
settlements=[Settlement("s1", "Marunath", "aserai", True, 3.0, has_recruits=True)]
|
||||
)
|
||||
messages = build_decision_prompt(state)
|
||||
user_content = messages[1]["content"]
|
||||
assert "Marunath" in user_content
|
||||
|
||||
def test_system_prompt_contains_action_vocabulary(self):
|
||||
state = _make_state()
|
||||
messages = build_decision_prompt(state)
|
||||
system = messages[0]["content"]
|
||||
for action in ("MOVE", "TRADE", "RECRUIT", "ENGAGE", "WAIT"):
|
||||
assert action in system
|
||||
|
||||
|
||||
class TestParseDecision:
|
||||
def test_valid_move_decision(self):
|
||||
raw = json.dumps({
|
||||
"action": "MOVE",
|
||||
"settlement_id": "town_A1",
|
||||
"settlement_name": "Marunath",
|
||||
"item_id": "",
|
||||
"quantity": 1,
|
||||
"party_id": "",
|
||||
"party_name": "",
|
||||
"reasoning": "Moving to recruit troops",
|
||||
})
|
||||
decision = parse_decision(raw)
|
||||
assert decision.action == M2Action.MOVE
|
||||
assert decision.settlement_id == "town_A1"
|
||||
assert decision.settlement_name == "Marunath"
|
||||
|
||||
def test_valid_recruit_decision(self):
|
||||
raw = json.dumps({
|
||||
"action": "RECRUIT",
|
||||
"settlement_id": "town_A1",
|
||||
"settlement_name": "Marunath",
|
||||
"item_id": "",
|
||||
"quantity": 1,
|
||||
"party_id": "",
|
||||
"party_name": "",
|
||||
"reasoning": "Has recruits available",
|
||||
})
|
||||
decision = parse_decision(raw)
|
||||
assert decision.action == M2Action.RECRUIT
|
||||
|
||||
def test_valid_engage_decision(self):
|
||||
raw = json.dumps({
|
||||
"action": "ENGAGE",
|
||||
"settlement_id": "",
|
||||
"settlement_name": "",
|
||||
"item_id": "",
|
||||
"quantity": 1,
|
||||
"party_id": "bandit_1",
|
||||
"party_name": "Forest Bandits",
|
||||
"reasoning": "Weak bandits — easy XP",
|
||||
})
|
||||
decision = parse_decision(raw)
|
||||
assert decision.action == M2Action.ENGAGE
|
||||
assert decision.party_id == "bandit_1"
|
||||
|
||||
def test_wait_on_invalid_json(self):
|
||||
decision = parse_decision("not json at all")
|
||||
assert decision.action == M2Action.WAIT
|
||||
|
||||
def test_wait_on_unknown_action(self):
|
||||
raw = json.dumps({"action": "TELEPORT", "reasoning": "hack"})
|
||||
decision = parse_decision(raw)
|
||||
assert decision.action == M2Action.WAIT
|
||||
|
||||
def test_strips_markdown_fences(self):
|
||||
raw = '```json\n{"action": "WAIT", "reasoning": "low food"}\n```'
|
||||
decision = parse_decision(raw)
|
||||
assert decision.action == M2Action.WAIT
|
||||
|
||||
def test_quantity_minimum_one(self):
|
||||
raw = json.dumps({"action": "TRADE", "item_id": "grain", "quantity": -5, "reasoning": "x"})
|
||||
decision = parse_decision(raw)
|
||||
assert decision.quantity == 1
|
||||
|
||||
def test_missing_optional_fields_default_to_empty(self):
|
||||
raw = json.dumps({"action": "WAIT", "reasoning": "resting"})
|
||||
decision = parse_decision(raw)
|
||||
assert decision.settlement_id == ""
|
||||
assert decision.party_id == ""
|
||||
assert decision.item_id == ""
|
||||
120
tests/bannerlord/test_gabs_client.py
Normal file
120
tests/bannerlord/test_gabs_client.py
Normal file
@@ -0,0 +1,120 @@
|
||||
"""Unit tests for bannerlord.gabs_client."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from bannerlord.gabs_client import GabsClient, GabsError
|
||||
|
||||
|
||||
class TestGabsClientCall:
|
||||
"""Tests for GabsClient.call() using mock StreamReader/Writer."""
|
||||
|
||||
def _make_client(self, response: dict) -> GabsClient:
|
||||
"""Return a pre-connected GabsClient with mocked I/O."""
|
||||
client = GabsClient(host="localhost", port=4825, timeout=5.0)
|
||||
client._connected = True
|
||||
|
||||
writer = MagicMock()
|
||||
writer.write = MagicMock()
|
||||
writer.drain = AsyncMock()
|
||||
|
||||
raw_response = json.dumps(response).encode() + b"\n"
|
||||
reader = MagicMock()
|
||||
reader.readline = AsyncMock(return_value=raw_response)
|
||||
|
||||
client._reader = reader
|
||||
client._writer = writer
|
||||
return client
|
||||
|
||||
async def test_successful_call_returns_result(self):
|
||||
client = self._make_client({"jsonrpc": "2.0", "id": 1, "result": {"status": "ok"}})
|
||||
result = await client.call("game/ping")
|
||||
assert result == {"status": "ok"}
|
||||
|
||||
async def test_error_response_raises_gabs_error(self):
|
||||
client = self._make_client({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"error": {"code": -32601, "message": "Method not found"},
|
||||
})
|
||||
with pytest.raises(GabsError) as exc_info:
|
||||
await client.call("unknown/method")
|
||||
assert exc_info.value.code == -32601
|
||||
|
||||
async def test_not_connected_raises_runtime_error(self):
|
||||
client = GabsClient()
|
||||
with pytest.raises(RuntimeError, match="not connected"):
|
||||
await client.call("game/ping")
|
||||
|
||||
async def test_request_id_increments(self):
|
||||
client = self._make_client({"jsonrpc": "2.0", "id": 1, "result": {}})
|
||||
await client.call("game/ping")
|
||||
# Reset reader for second call
|
||||
client._reader.readline = AsyncMock(
|
||||
return_value=json.dumps({"jsonrpc": "2.0", "id": 2, "result": {}}).encode() + b"\n"
|
||||
)
|
||||
await client.call("game/ping")
|
||||
assert client._req_id == 2
|
||||
|
||||
async def test_get_game_state_returns_empty_on_error(self):
|
||||
client = GabsClient()
|
||||
client._connected = True
|
||||
|
||||
writer = MagicMock()
|
||||
writer.write = MagicMock()
|
||||
writer.drain = AsyncMock()
|
||||
reader = MagicMock()
|
||||
reader.readline = AsyncMock(side_effect=OSError("connection reset"))
|
||||
|
||||
client._reader = reader
|
||||
client._writer = writer
|
||||
|
||||
result = await client.get_game_state()
|
||||
assert result == {}
|
||||
|
||||
async def test_ping_returns_true_on_success(self):
|
||||
client = self._make_client({"jsonrpc": "2.0", "id": 1, "result": "pong"})
|
||||
result = await client.ping()
|
||||
assert result is True
|
||||
|
||||
async def test_ping_returns_false_on_failure(self):
|
||||
client = GabsClient()
|
||||
result = await client.ping()
|
||||
assert result is False
|
||||
|
||||
|
||||
class TestGabsClientLifecycle:
|
||||
async def test_connect_failure_sets_not_connected(self):
|
||||
client = GabsClient(host="localhost", port=9999, timeout=0.1)
|
||||
with pytest.raises(Exception):
|
||||
await client.connect()
|
||||
assert not client.is_connected
|
||||
|
||||
async def test_context_manager_calls_connect_and_disconnect(self):
|
||||
client = GabsClient()
|
||||
connect_called = False
|
||||
disconnect_called = False
|
||||
|
||||
async def _fake_connect():
|
||||
nonlocal connect_called
|
||||
connect_called = True
|
||||
client._connected = True
|
||||
|
||||
async def _fake_disconnect():
|
||||
nonlocal disconnect_called
|
||||
disconnect_called = True
|
||||
client._connected = False
|
||||
|
||||
client.connect = _fake_connect
|
||||
client.disconnect = _fake_disconnect
|
||||
|
||||
async with client as c:
|
||||
assert c is client
|
||||
assert connect_called
|
||||
|
||||
assert disconnect_called
|
||||
Reference in New Issue
Block a user