Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
7385c46a78 WIP: Claude Code progress on #1095
Automated salvage commit — agent session ended (exit 124).
Work in progress, may need continuation.
2026-03-23 14:25:01 -04:00
34 changed files with 3555 additions and 2088 deletions

View File

@@ -14,6 +14,7 @@ repository = "http://localhost:3000/rockachopa/Timmy-time-dashboard"
packages = [
{ include = "config.py", from = "src" },
{ include = "bannerlord", from = "src" },
{ include = "dashboard", from = "src" },
{ include = "infrastructure", from = "src" },
{ include = "integrations", from = "src" },

View File

@@ -1,11 +1,49 @@
"""Bannerlord campaign agent — M2: Basic Campaign Actions.
"""Bannerlord M3 — Full Campaign Strategy.
Provides GABS integration (TCP JSON-RPC, port 4825) and the observe →
decide → act loop for autonomous campaign play: move, trade, recruit,
and engage bandits.
Timmy runs a complete Bannerlord campaign: economy, diplomacy, kingdom
building, and war decisions — all via sovereign local inference.
Key GABS tools: party/move_to_settlement, inventory/buy_item,
party/recruit_all, party/engage_party.
Key components:
gabs_client — TCP JSON-RPC client for the GABS mod (port 4825)
types — KingSubgoal, GameState, message schemas
session_memory — SQLite-backed multi-day strategic plan persistence
campaign — CampaignOrchestrator tying all agents together
adapter — WorldInterface adapter for use with the benchmark runner
agents/ — King, Vassal, and Companion agent hierarchy
Done-condition: party grows from 20 → 100 troops, gold reaches 10 000 denars.
Quick start::
from bannerlord.campaign import CampaignOrchestrator
orch = CampaignOrchestrator()
summary = await orch.run(max_ticks=100)
Register the world adapter::
from infrastructure.world import register_adapter
from bannerlord.adapter import BannerlordWorldAdapter
register_adapter("bannerlord", BannerlordWorldAdapter)
M3 done-when condition:
Timmy establishes own kingdom with 3+ fiefs and
survives 100 in-game days as ruler.
"""
from bannerlord.adapter import BannerlordWorldAdapter
from bannerlord.campaign import CampaignOrchestrator
from bannerlord.gabs_client import GABSClient
from bannerlord.session_memory import SessionMemory
from bannerlord.types import (
GameState,
KingSubgoal,
SubgoalToken,
)
__all__ = [
"BannerlordWorldAdapter",
"CampaignOrchestrator",
"GABSClient",
"GameState",
"KingSubgoal",
"SessionMemory",
"SubgoalToken",
]

228
src/bannerlord/adapter.py Normal file
View File

@@ -0,0 +1,228 @@
"""Bannerlord M3 — WorldInterface adapter wrapping the GABS TCP client.
Plugs Bannerlord into the engine-agnostic ``WorldInterface`` contract so
the benchmark runner and heartbeat loop can drive the campaign the same way
they would drive any other game world.
Register with::
from infrastructure.world import register_adapter
from bannerlord.adapter import BannerlordWorldAdapter
register_adapter("bannerlord", BannerlordWorldAdapter)
"""
from __future__ import annotations
import asyncio
import logging
from datetime import UTC, datetime
from typing import Any
from bannerlord.gabs_client import GABSClient
from bannerlord.types import GameState
from infrastructure.world.interface import WorldInterface
from infrastructure.world.types import (
ActionResult,
ActionStatus,
CommandInput,
PerceptionOutput,
)
logger = logging.getLogger(__name__)
class BannerlordWorldAdapter(WorldInterface):
"""WorldInterface adapter for Bannerlord via the GABS mod.
``observe()`` — fetches the full GameState from GABS and maps it to a
``PerceptionOutput`` with structured fields.
``act()`` — dispatches ``CommandInput.action`` as a GABS JSON-RPC call,
forwarding ``parameters`` as the call args.
``speak()`` — sends a chat message via GABS (e.g., for companion NPC
conversations or on-screen overlays).
Degrades gracefully when GABS is unavailable.
"""
def __init__(
self,
*,
host: str = "127.0.0.1",
port: int = 4825,
timeout: float = 10.0,
) -> None:
self._client = GABSClient(host=host, port=port, timeout=timeout)
self._last_state: GameState | None = None
# -- lifecycle ---------------------------------------------------------
def connect(self) -> None:
"""Synchronous connect wrapper (runs async in a new event loop)."""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# Inside async context — caller should use async connect
logger.warning(
"BannerlordWorldAdapter.connect() called from async context; "
"use 'await adapter.async_connect()' instead"
)
return
loop.run_until_complete(self._client.connect())
except Exception as exc:
logger.warning("BannerlordWorldAdapter.connect() failed: %s", exc)
def disconnect(self) -> None:
"""Synchronous disconnect wrapper."""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
return
loop.run_until_complete(self._client.disconnect())
except Exception as exc:
logger.debug("BannerlordWorldAdapter.disconnect() error: %s", exc)
async def async_connect(self) -> bool:
"""Async connect — preferred in async contexts."""
return await self._client.connect()
async def async_disconnect(self) -> None:
"""Async disconnect."""
await self._client.disconnect()
@property
def is_connected(self) -> bool:
return self._client.is_connected
# -- WorldInterface contract ------------------------------------------
def observe(self) -> PerceptionOutput:
"""Return a PerceptionOutput derived from the current GABS GameState.
Falls back to an empty perception if GABS is unreachable.
"""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
logger.warning("observe() called from async context — use async_observe()")
return self._empty_perception()
state = loop.run_until_complete(self._client.get_game_state())
return self._state_to_perception(state)
except Exception as exc:
logger.warning("BannerlordWorldAdapter.observe() error: %s", exc)
return self._empty_perception()
async def async_observe(self) -> PerceptionOutput:
"""Async observe — preferred in async contexts."""
try:
state = await self._client.get_game_state()
self._last_state = state
return self._state_to_perception(state)
except Exception as exc:
logger.warning("async_observe() error: %s", exc)
return self._empty_perception()
def act(self, command: CommandInput) -> ActionResult:
"""Dispatch a command to GABS. Returns success/failure."""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
logger.warning("act() called from async context — use async_act()")
return ActionResult(status=ActionStatus.NOOP)
result = loop.run_until_complete(self.async_act(command))
return result
except Exception as exc:
logger.warning("BannerlordWorldAdapter.act() error: %s", exc)
return ActionResult(
status=ActionStatus.FAILURE,
message=str(exc),
)
async def async_act(self, command: CommandInput) -> ActionResult:
"""Async command dispatch."""
try:
result = await self._client._call(
command.action, command.parameters or {}
)
if result is None:
return ActionResult(
status=ActionStatus.FAILURE,
message=f"GABS returned no result for {command.action}",
)
return ActionResult(
status=ActionStatus.SUCCESS,
message=f"GABS executed: {command.action}",
data=result if isinstance(result, dict) else {"result": result},
)
except Exception as exc:
logger.warning("async_act(%s) error: %s", command.action, exc)
return ActionResult(status=ActionStatus.FAILURE, message=str(exc))
def speak(self, message: str, target: str | None = None) -> None:
"""Send a message via GABS (e.g., companion dialogue or overlay)."""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
return
loop.run_until_complete(
self._client._call("chat/send", {"message": message, "target": target})
)
except Exception as exc:
logger.debug("BannerlordWorldAdapter.speak() error: %s", exc)
# -- helpers -----------------------------------------------------------
def _state_to_perception(self, state: GameState) -> PerceptionOutput:
"""Map a GameState snapshot to a PerceptionOutput."""
entities: list[str] = []
events: list[str] = []
# Party location
if state.party.location:
entities.append(f"location:{state.party.location}")
# Kingdom status
if state.has_kingdom():
entities.append(f"kingdom:{state.kingdom.name}")
for fief in state.kingdom.fiefs:
entities.append(f"fief:{fief}")
# Active wars as events
for war in state.kingdom.active_wars:
events.append(f"at_war_with:{war}")
# Faction snapshot
for faction in state.factions:
entities.append(f"faction:{faction.name}[{faction.army_strength}]")
# Alerts
if state.is_two_front_war():
events.append("alert:two_front_war")
if state.party.wounded_pct > 0.30:
events.append(f"alert:wounded_{state.party.wounded_pct:.0%}")
if state.party.food_days < 3:
events.append("alert:low_food")
return PerceptionOutput(
timestamp=datetime.now(UTC),
location=state.party.location,
entities=entities,
events=events,
raw=state.raw,
)
@staticmethod
def _empty_perception() -> PerceptionOutput:
return PerceptionOutput(
timestamp=datetime.now(UTC),
location="",
entities=[],
events=["gabs:unavailable"],
raw={"adapter": "bannerlord", "connected": False},
)
@property
def last_game_state(self) -> GameState | None:
"""Return the most recently observed GameState."""
return self._last_state

View File

@@ -0,0 +1,4 @@
"""Bannerlord M3 — feudal agent hierarchy.
King → Vassal → Companion, following Ahilan & Dayan (2019).
"""

View File

@@ -0,0 +1 @@
"""Bannerlord M3 — Companion worker agents (lowest tier)."""

View File

@@ -0,0 +1,61 @@
"""Bannerlord M3 — Caravan Companion (trade operations).
Handles trade route assessment, buy/sell goods, caravan deployment.
Triggered by TRADE subgoal or when treasury is below threshold.
Minimum margin threshold: 15% (never buy goods without ≥ 15% resale margin).
"""
from __future__ import annotations
import logging
from bannerlord.types import GameState, KingSubgoal, SubgoalToken
logger = logging.getLogger(__name__)
_MIN_MARGIN_PCT = 0.15 # minimum profitable resale margin
_CARAVAN_DENAR_THRESHOLD = 10_000 # must have 10k denars to deploy a caravan
class CaravanCompanion:
"""Companion worker for trade route management."""
AGENT_ID = "caravan_companion"
def evaluate(
self, state: GameState, subgoal: KingSubgoal
) -> list[dict]:
"""Return trade primitives to execute.
Returns:
List of dicts with 'primitive' and 'args' keys.
"""
if subgoal.token != SubgoalToken.TRADE:
return []
actions: list[dict] = []
party = state.party
# Always assess prices at current location first
actions.append({
"primitive": "assess_prices",
"args": {"town": party.location},
})
# Deploy a caravan if treasury is flush
if party.denars >= _CARAVAN_DENAR_THRESHOLD and party.location:
actions.append({
"primitive": "establish_caravan",
"args": {"town": party.location},
})
return actions
@staticmethod
def is_profitable_trade(buy_price: int, sell_price: int) -> bool:
"""Return True if the trade margin meets the minimum threshold."""
if buy_price <= 0:
return False
margin = (sell_price - buy_price) / buy_price
return margin >= _MIN_MARGIN_PCT

View File

@@ -0,0 +1,78 @@
"""Bannerlord M3 — Logistics Companion (party management).
Handles recruit, supply, rest, prisoner sale, and troop upgrade primitives.
Runs on Qwen3:8b for sub-2-second response times.
Triggered by RECRUIT and HEAL subgoals, or by party condition thresholds.
"""
from __future__ import annotations
import logging
from bannerlord.types import GameState, KingSubgoal, SubgoalToken
logger = logging.getLogger(__name__)
_FOOD_WARN_DAYS = 5
_WOUND_WARN_PCT = 0.20
_PRISONER_CAP = 20
class LogisticsCompanion:
"""Companion worker for party logistics.
Evaluates the current party state and returns a list of primitive
action names + args to dispatch to the GABSClient.
"""
AGENT_ID = "logistics_companion"
def evaluate(
self, state: GameState, subgoal: KingSubgoal
) -> list[dict]:
"""Return primitives to execute given current state and active subgoal.
Returns:
List of dicts with 'primitive' and 'args' keys.
"""
actions: list[dict] = []
party = state.party
# Subgoal-driven recruitment
if subgoal.token == SubgoalToken.RECRUIT:
qty = subgoal.quantity or 20
actions.append({
"primitive": "recruit_troop",
"args": {"troop_type": "infantry", "qty": qty},
})
# Emergency rest on heavy wounds
if subgoal.token == SubgoalToken.HEAL or party.wounded_pct > _WOUND_WARN_PCT:
actions.append({
"primitive": "rest_party",
"args": {"days": 3},
})
# Replenish food if low
if party.food_days < _FOOD_WARN_DAYS:
actions.append({
"primitive": "buy_supplies",
"args": {"qty": max(0, 10 - party.food_days)},
})
# Sell prisoners when near cap
if party.prisoners >= _PRISONER_CAP:
actions.append({
"primitive": "sell_prisoners",
"args": {"location": party.location},
})
# Upgrade troops when stable
if subgoal.token == SubgoalToken.TRAIN:
actions.append({
"primitive": "upgrade_troops",
"args": {},
})
return actions

View File

@@ -0,0 +1,58 @@
"""Bannerlord M3 — Scout Companion (intelligence gathering).
Handles lord tracking, garrison assessment, and patrol route mapping.
Triggered by SPY subgoal or proactively before expansion decisions.
"""
from __future__ import annotations
import logging
from bannerlord.types import GameState, KingSubgoal, SubgoalToken
logger = logging.getLogger(__name__)
class ScoutCompanion:
"""Companion worker for tactical intelligence."""
AGENT_ID = "scout_companion"
def evaluate(
self, state: GameState, subgoal: KingSubgoal
) -> list[dict]:
"""Return scouting primitives to execute.
Returns:
List of dicts with 'primitive' and 'args' keys.
"""
actions: list[dict] = []
if subgoal.token == SubgoalToken.SPY:
target = subgoal.target
if target:
actions.append({
"primitive": "track_lord",
"args": {"lord_name": target},
})
elif subgoal.token == SubgoalToken.EXPAND_TERRITORY:
target = subgoal.target
if target:
actions.append({
"primitive": "assess_garrison",
"args": {"settlement": target},
})
# Proactively map patrols in active war regions
for war_faction in state.kingdom.active_wars:
# Find a fief belonging to the enemy as the region reference
for faction in state.factions:
if faction.name == war_faction and faction.fiefs:
actions.append({
"primitive": "map_patrol_routes",
"args": {"region": faction.fiefs[0]},
})
break # one region per enemy faction per tick
return actions

View File

@@ -0,0 +1,145 @@
"""Bannerlord M3 — Diplomacy Vassal agent.
Handles relations management: alliances, peace deals, tribute, marriage.
Responds to the ALLY subgoal.
Reward function:
R_diplo = w1 * AlliesCount
+ w2 * TruceDurationValue
+ w3 * RelationsScore_weighted
- w4 * ActiveWarsFront
+ w5 * SubgoalBonus
Key strategic rules:
- Never start a new war if already in 2+ wars (2-front war rule)
- Prefer peace with weakest current enemy when overextended
- Time alliances before declaring war to reduce isolation risk
"""
from __future__ import annotations
import logging
from bannerlord.types import (
GameState,
KingSubgoal,
SubgoalToken,
TaskMessage,
VassalReward,
)
logger = logging.getLogger(__name__)
_W1_ALLIES = 0.30
_W2_TRUCE = 0.25
_W3_RELATIONS = 0.25
_W4_WAR_FRONTS = 0.15
_W5_SUBGOAL = 0.05
_SUBGOAL_TRIGGERS = {SubgoalToken.ALLY}
_MAX_WAR_FRONTS = 2 # flag when at 2+ simultaneous wars (two-front war)
class DiplomacyVassal:
"""Mid-tier agent responsible for diplomatic relations."""
AGENT_ID = "diplomacy_vassal"
def is_relevant(self, subgoal: KingSubgoal) -> bool:
return subgoal.token in _SUBGOAL_TRIGGERS
def plan(self, state: GameState, subgoal: KingSubgoal) -> list[TaskMessage]:
"""Return TaskMessages for the current diplomatic subgoal."""
tasks: list[TaskMessage] = []
if subgoal.token == SubgoalToken.ALLY:
tasks.extend(self._plan_alliance(state, subgoal))
return tasks
def _plan_alliance(
self, state: GameState, subgoal: KingSubgoal
) -> list[TaskMessage]:
"""Plan diplomatic outreach to reduce war fronts or build alliances."""
tasks: list[TaskMessage] = []
target = subgoal.target
if not target:
logger.warning("DiplomacyVassal: no target for ALLY subgoal")
return tasks
# If target is already an enemy, propose peace
if target in state.kingdom.active_wars:
tasks.append(
TaskMessage(
from_agent=self.AGENT_ID,
to_agent="gabs",
primitive="propose_peace",
args={"faction": target, "tribute": 0},
priority=subgoal.priority * 1.5,
)
)
else:
# Otherwise pursue alliance
tasks.append(
TaskMessage(
from_agent=self.AGENT_ID,
to_agent="gabs",
primitive="send_envoy",
args={
"faction": target,
"message": "We seek a lasting alliance for mutual defence.",
},
priority=subgoal.priority,
)
)
tasks.append(
TaskMessage(
from_agent=self.AGENT_ID,
to_agent="gabs",
primitive="request_alliance",
args={"faction": target},
priority=subgoal.priority,
)
)
return tasks
def should_avoid_war(self, state: GameState) -> bool:
"""Return True if starting a new war would be strategically unsound."""
return state.active_war_count() >= _MAX_WAR_FRONTS # 2-front war check
def compute_reward(
self,
prev_state: GameState,
curr_state: GameState,
active_subgoal: KingSubgoal,
) -> VassalReward:
"""Compute Diplomacy Vassal reward."""
allies_count = len(curr_state.kingdom.active_alliances)
truce_value = 50.0 # placeholder — days of truce remaining
relations_avg = 30.0 # placeholder — weighted relations score
war_fronts = curr_state.active_war_count()
subgoal_bonus = 1.0 if active_subgoal.token in _SUBGOAL_TRIGGERS else 0.0
total = (
_W1_ALLIES * allies_count * 10
+ _W2_TRUCE * truce_value / 100
+ _W3_RELATIONS * relations_avg / 100
- _W4_WAR_FRONTS * war_fronts * 10
+ _W5_SUBGOAL * subgoal_bonus * 10
)
return VassalReward(
agent_id=self.AGENT_ID,
component_scores={
"allies_count": allies_count,
"truce_value": truce_value,
"relations_avg": relations_avg,
"war_fronts": -war_fronts,
"subgoal_bonus": subgoal_bonus,
},
subgoal_bonus=subgoal_bonus,
total=total,
)

View File

@@ -0,0 +1,151 @@
"""Bannerlord M3 — Economy Vassal agent.
Handles settlement management, tax collection, construction, and food supply.
Responds to FORTIFY and CONSOLIDATE subgoals.
Reward function:
R_econ = w1 * DailyDenarsIncome
+ w2 * FoodStockBuffer
+ w3 * LoyaltyAverage
- w4 * ConstructionQueueLength
+ w5 * SubgoalBonus
"""
from __future__ import annotations
import logging
from bannerlord.types import (
GameState,
KingSubgoal,
SubgoalToken,
TaskMessage,
VassalReward,
)
logger = logging.getLogger(__name__)
_W1_INCOME = 0.35
_W2_FOOD = 0.25
_W3_LOYALTY = 0.20
_W4_CONSTRUCTION = 0.15
_W5_SUBGOAL = 0.05
_SUBGOAL_TRIGGERS = {SubgoalToken.FORTIFY, SubgoalToken.CONSOLIDATE}
_LOW_FOOD_THRESHOLD = 3 # days of food remaining
_INCOME_TARGET = 200 # daily net income target (denars)
class EconomyVassal:
"""Mid-tier agent responsible for settlement economy."""
AGENT_ID = "economy_vassal"
def is_relevant(self, subgoal: KingSubgoal) -> bool:
return subgoal.token in _SUBGOAL_TRIGGERS
def plan(self, state: GameState, subgoal: KingSubgoal) -> list[TaskMessage]:
"""Return TaskMessages for the current economic subgoal."""
tasks: list[TaskMessage] = []
# Always maintain food supply
if state.party.food_days < _LOW_FOOD_THRESHOLD:
tasks.append(
TaskMessage(
from_agent=self.AGENT_ID,
to_agent="logistics_companion",
primitive="buy_supplies",
args={"qty": 10},
priority=2.0,
)
)
if subgoal.token == SubgoalToken.FORTIFY:
tasks.extend(self._plan_fortify(state, subgoal))
elif subgoal.token == SubgoalToken.CONSOLIDATE:
tasks.extend(self._plan_consolidate(state))
return tasks
def _plan_fortify(
self, state: GameState, subgoal: KingSubgoal
) -> list[TaskMessage]:
"""Queue construction projects in owned settlements."""
tasks: list[TaskMessage] = []
target = subgoal.target or (state.kingdom.fiefs[0] if state.kingdom.fiefs else None)
if not target:
return tasks
tasks.append(
TaskMessage(
from_agent=self.AGENT_ID,
to_agent="gabs",
primitive="build_project",
args={"settlement": target, "project": "granary"},
priority=1.2,
)
)
tasks.append(
TaskMessage(
from_agent=self.AGENT_ID,
to_agent="gabs",
primitive="set_tax_policy",
args={"settlement": target, "policy": "normal"},
priority=1.0,
)
)
return tasks
def _plan_consolidate(self, state: GameState) -> list[TaskMessage]:
"""Stabilise: optimise tax and food across all fiefs."""
tasks: list[TaskMessage] = []
net_income = state.kingdom.daily_income - state.kingdom.daily_expenses
for fief in state.kingdom.fiefs:
policy = "normal" if net_income >= _INCOME_TARGET else "low"
tasks.append(
TaskMessage(
from_agent=self.AGENT_ID,
to_agent="gabs",
primitive="set_tax_policy",
args={"settlement": fief, "policy": policy},
priority=0.8,
)
)
return tasks
def compute_reward(
self,
prev_state: GameState,
curr_state: GameState,
active_subgoal: KingSubgoal,
) -> VassalReward:
"""Compute Economy Vassal reward."""
income_delta = (
curr_state.kingdom.daily_income - prev_state.kingdom.daily_income
)
food_buffer = curr_state.party.food_days
loyalty_avg = 70.0 # placeholder — real value from GABS raw data
queue_len = 0 # placeholder
subgoal_bonus = 1.0 if active_subgoal.token in _SUBGOAL_TRIGGERS else 0.0
total = (
_W1_INCOME * income_delta
+ _W2_FOOD * food_buffer
+ _W3_LOYALTY * loyalty_avg / 100
- _W4_CONSTRUCTION * queue_len
+ _W5_SUBGOAL * subgoal_bonus * 10
)
return VassalReward(
agent_id=self.AGENT_ID,
component_scores={
"income_delta": income_delta,
"food_buffer": food_buffer,
"loyalty_avg": loyalty_avg,
"queue_len": -queue_len,
"subgoal_bonus": subgoal_bonus,
},
subgoal_bonus=subgoal_bonus,
total=total,
)

View File

@@ -0,0 +1,266 @@
"""Bannerlord M3 — King agent (Timmy, strategic tier).
The King operates on the campaign-map timescale (1 decision per in-game day).
He reads the full GameState and emits a single KingSubgoal token that vassals
interpret. He uses Qwen3:32b via the LLM router.
Decision rules baked in (no LLM required for simple cases):
- Never initiate a second war while already fighting one (avoid 2-front wars)
- Prioritise HEAL when party wounds > 30 %
- Prioritise RECRUIT when troops < 80
- Prioritise TRADE when denars < 5,000
- If kingdom has < 3 fiefs and no active war, prioritise EXPAND_TERRITORY
- Default to CONSOLIDATE when conditions are stable
"""
from __future__ import annotations
import logging
from datetime import UTC, datetime
from typing import Any
from bannerlord.types import (
GameState,
KingSubgoal,
SubgoalToken,
)
logger = logging.getLogger(__name__)
# Hard thresholds for rule-based fallback decisions
_MIN_TROOPS = 80
_MIN_DENARS = 5_000
_MAX_WOUND_PCT = 0.30
_TARGET_FIEFS = 3
_SURVIVAL_DAYS = 100
class KingAgent:
"""Strategic decision-maker for the Bannerlord campaign.
The King agent is sovereign — it cannot be terminated by vassals.
It decides the active subgoal at most once per campaign tick.
Usage::
king = KingAgent(model="qwen3:32b")
subgoal = king.decide(game_state)
"""
def __init__(
self,
*,
model: str = "qwen3:32b",
temperature: float = 0.1,
) -> None:
self._model = model
self._temperature = temperature
self._last_subgoal: KingSubgoal | None = None
self._tick = 0
self._session_id: str | None = None
def set_session(self, session_id: str) -> None:
self._session_id = session_id
# -- primary decision interface ----------------------------------------
def decide(self, state: GameState) -> KingSubgoal:
"""Return the King's subgoal for the current campaign tick.
Uses rule-based heuristics as the primary decision engine.
LLM override can be wired in via ``_llm_decide`` in a future PR.
Args:
state: Full game state snapshot from GABS.
Returns:
A KingSubgoal to be broadcast to vassals.
"""
self._tick += 1
subgoal = self._rule_based_decide(state)
self._last_subgoal = subgoal
logger.info(
"King[tick=%d, day=%d] → %s (target=%s)",
self._tick,
state.in_game_day,
subgoal.token,
subgoal.target,
)
return subgoal
# -- rule-based strategy engine ----------------------------------------
def _rule_based_decide(self, state: GameState) -> KingSubgoal:
"""Encode campaign strategy as prioritised decision rules.
Priority order (highest to lowest):
1. Emergency: heal if heavily wounded
2. Survival: recruit if dangerously low on troops
3. Economy: earn income if broke
4. Diplomacy: seek peace if in a 2-front war
5. Expansion: take fiefs if not at war and need more territory
6. Alliance: seek allies when preparing for war
7. Default: consolidate and stabilise
"""
party = state.party
kingdom = state.kingdom
# 1. Emergency heal
if party.wounded_pct > _MAX_WOUND_PCT:
return KingSubgoal(
token=SubgoalToken.HEAL,
context=f"{party.wounded_pct:.0%} of party is wounded — rest required",
)
# 2. Critical recruitment
if party.troops < _MIN_TROOPS:
return KingSubgoal(
token=SubgoalToken.RECRUIT,
quantity=_MIN_TROOPS - party.troops,
context=f"Party at {party.troops} troops — must reach {_MIN_TROOPS}",
)
# 3. Destitute treasury
if party.denars < _MIN_DENARS:
return KingSubgoal(
token=SubgoalToken.TRADE,
context=f"Treasury at {party.denars:,} denars — run trade routes",
)
# 4. Avoid 2-front war: seek peace when fighting 2+ enemies
if state.is_two_front_war():
# Pick the weakest enemy to negotiate peace with first
weakest = self._weakest_enemy(state)
return KingSubgoal(
token=SubgoalToken.ALLY,
target=weakest,
context="2-front war detected — de-escalate with weakest enemy",
)
# 5. Kingdom not yet established: work toward first fief
if not state.has_kingdom():
if party.troops >= 120 and state.active_war_count() == 0:
target_fief = self._select_expansion_target(state)
return KingSubgoal(
token=SubgoalToken.EXPAND_TERRITORY,
target=target_fief,
context="No kingdom yet — capture a fief to establish one",
)
elif state.active_war_count() == 0:
# Not ready to fight; train up first
return KingSubgoal(
token=SubgoalToken.TRAIN,
context="Building army before first expansion",
)
# 6. Expand if below target fief count and no active war
if state.fief_count() < _TARGET_FIEFS and state.active_war_count() == 0:
target_fief = self._select_expansion_target(state)
return KingSubgoal(
token=SubgoalToken.EXPAND_TERRITORY,
target=target_fief,
priority=1.5,
context=f"Only {state.fief_count()} fiefs — need {_TARGET_FIEFS}",
)
# 7. Seek allies when stable and below fief target
if not kingdom.active_alliances and state.active_war_count() == 0:
ally_candidate = self._best_alliance_candidate(state)
if ally_candidate:
return KingSubgoal(
token=SubgoalToken.ALLY,
target=ally_candidate,
context="Stable moment — pursue defensive alliance",
)
# 8. Fortify if kingdom exists and there are fiefs to improve
if state.has_kingdom() and state.fief_count() > 0:
if kingdom.daily_income - kingdom.daily_expenses < 100:
return KingSubgoal(
token=SubgoalToken.FORTIFY,
context="Low net income — invest in settlements",
)
# 9. Default: consolidate
return KingSubgoal(
token=SubgoalToken.CONSOLIDATE,
context="Stable — hold territory and recover strength",
)
# -- helper methods ----------------------------------------------------
def _weakest_enemy(self, state: GameState) -> str | None:
"""Return the name of the weakest faction currently at war with us."""
enemy_names = set(state.kingdom.active_wars)
enemies = [f for f in state.factions if f.name in enemy_names]
if not enemies:
return None
return min(enemies, key=lambda f: f.army_strength).name
def _select_expansion_target(self, state: GameState) -> str | None:
"""Select the most vulnerable enemy settlement to target."""
# Prefer factions already at war with others (distracted)
for faction in state.factions:
if len(faction.is_at_war_with) >= 2 and faction.fiefs:
return faction.fiefs[0]
# Fallback: weakest faction with fiefs
candidates = [f for f in state.factions if f.fiefs]
if candidates:
weakest = min(candidates, key=lambda f: f.army_strength)
return weakest.fiefs[0]
return None
def _best_alliance_candidate(self, state: GameState) -> str | None:
"""Return the best faction to approach for an alliance."""
# Prefer factions with good relations and no active war with us
enemy_names = set(state.kingdom.active_wars)
candidates = [
f
for f in state.factions
if f.name not in enemy_names
and f.name != state.kingdom.name
]
if not candidates:
return None
# Pick the strongest candidate (most useful ally)
return max(candidates, key=lambda f: f.army_strength).name
# -- accessors ---------------------------------------------------------
@property
def last_subgoal(self) -> KingSubgoal | None:
return self._last_subgoal
@property
def tick(self) -> int:
return self._tick
def campaign_summary(self, state: GameState) -> dict[str, Any]:
"""Return a brief summary of the campaign status."""
return {
"tick": self._tick,
"in_game_day": state.in_game_day,
"has_kingdom": state.has_kingdom(),
"fief_count": state.fief_count(),
"active_wars": state.active_war_count(),
"two_front_war": state.is_two_front_war(),
"troops": state.party.troops,
"denars": state.party.denars,
"survival_goal_met": (
state.has_kingdom()
and state.fief_count() >= _TARGET_FIEFS
and state.in_game_day >= _SURVIVAL_DAYS
),
}
def is_done_condition_met(self, state: GameState) -> bool:
"""Return True when the M3 done-when condition is satisfied.
Done when: Timmy establishes own kingdom with 3+ fiefs and
survives 100 in-game days as ruler.
"""
return (
state.has_kingdom()
and state.fief_count() >= _TARGET_FIEFS
and state.in_game_day >= _SURVIVAL_DAYS
)

View File

@@ -0,0 +1,236 @@
"""Bannerlord M3 — War Vassal agent.
Handles military operations: sieges, field battles, raids, defensive
maneuvers. Responds to EXPAND_TERRITORY, RAID_ECONOMY, and TRAIN subgoals.
Reward function (from feudal hierarchy design):
R_war = w1 * ΔTerritoryValue
+ w2 * ΔArmyStrength_ratio
- w3 * CasualtyCost
- w4 * SupplyCost
+ w5 * SubgoalBonus
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from bannerlord.types import (
GameState,
KingSubgoal,
SubgoalToken,
TaskMessage,
VassalReward,
)
if TYPE_CHECKING:
pass
logger = logging.getLogger(__name__)
# Reward weights
_W1_TERRITORY = 0.40
_W2_ARMY_RATIO = 0.25
_W3_CASUALTY = 0.20
_W4_SUPPLY = 0.10
_W5_SUBGOAL = 0.05
_SUBGOAL_TRIGGERS = {
SubgoalToken.EXPAND_TERRITORY,
SubgoalToken.RAID_ECONOMY,
SubgoalToken.TRAIN,
}
@dataclass
class WarContext:
"""Mutable state tracked across War Vassal decisions."""
active_siege: str | None = None
last_auto_resolve_result: dict = field(default_factory=dict)
territory_gained: int = 0
casualties_taken: int = 0
class WarVassal:
"""Mid-tier agent responsible for military operations.
Runs at 4× the King's decision frequency. Translates KingSubgoals
into concrete TaskMessages for the Logistics Companion (troop management)
and issues direct GABS calls for combat actions.
"""
AGENT_ID = "war_vassal"
def __init__(self) -> None:
self._ctx = WarContext()
self._prev_army_ratio: float = 1.0
def is_relevant(self, subgoal: KingSubgoal) -> bool:
"""Return True if this vassal should act on *subgoal*."""
return subgoal.token in _SUBGOAL_TRIGGERS
def plan(self, state: GameState, subgoal: KingSubgoal) -> list[TaskMessage]:
"""Return a list of TaskMessages for the current subgoal.
Args:
state: Current game state.
subgoal: Active King subgoal.
Returns:
Ordered list of TaskMessages to dispatch.
"""
tasks: list[TaskMessage] = []
if subgoal.token == SubgoalToken.EXPAND_TERRITORY:
tasks.extend(self._plan_expansion(state, subgoal))
elif subgoal.token == SubgoalToken.RAID_ECONOMY:
tasks.extend(self._plan_raid(state, subgoal))
elif subgoal.token == SubgoalToken.TRAIN:
tasks.extend(self._plan_training(state))
return tasks
def _plan_expansion(
self, state: GameState, subgoal: KingSubgoal
) -> list[TaskMessage]:
"""Plan territory expansion toward subgoal.target."""
tasks: list[TaskMessage] = []
target = subgoal.target
if not target:
logger.warning("WarVassal.EXPAND_TERRITORY: no target in subgoal")
return tasks
# Scout garrison before sieging
tasks.append(
TaskMessage(
from_agent=self.AGENT_ID,
to_agent="scout_companion",
primitive="assess_garrison",
args={"settlement": target},
priority=1.5,
)
)
# Ensure troops are sufficient (delegate to logistics if thin)
if state.party.troops < 100:
tasks.append(
TaskMessage(
from_agent=self.AGENT_ID,
to_agent="logistics_companion",
primitive="recruit_troop",
args={"troop_type": "infantry", "qty": 100 - state.party.troops},
priority=1.8,
)
)
# Issue the siege order
tasks.append(
TaskMessage(
from_agent=self.AGENT_ID,
to_agent="gabs",
primitive="siege_settlement",
args={"settlement": target},
priority=subgoal.priority,
)
)
# Follow up with auto-resolve
tasks.append(
TaskMessage(
from_agent=self.AGENT_ID,
to_agent="gabs",
primitive="auto_resolve_battle",
args={},
priority=subgoal.priority,
)
)
return tasks
def _plan_raid(
self, state: GameState, subgoal: KingSubgoal
) -> list[TaskMessage]:
"""Plan economy raid for denars and food."""
tasks: list[TaskMessage] = []
target = subgoal.target or self._nearest_enemy_village(state)
if not target:
return tasks
tasks.append(
TaskMessage(
from_agent=self.AGENT_ID,
to_agent="gabs",
primitive="raid_village",
args={"village": target},
priority=subgoal.priority,
)
)
return tasks
def _plan_training(self, state: GameState) -> list[TaskMessage]:
"""Plan troop training via auto-resolve bandit fights."""
return [
TaskMessage(
from_agent=self.AGENT_ID,
to_agent="logistics_companion",
primitive="upgrade_troops",
args={},
priority=0.8,
)
]
# -- reward computation ------------------------------------------------
def compute_reward(
self,
prev_state: GameState,
curr_state: GameState,
active_subgoal: KingSubgoal,
) -> VassalReward:
"""Compute the War Vassal reward signal for the last decision cycle."""
territory_delta = (
curr_state.fief_count() - prev_state.fief_count()
) * 100.0
prev_strength = max(prev_state.party.troops, 1)
curr_strength = curr_state.party.troops
army_delta = (curr_strength - prev_strength) / prev_strength
casualties = max(0, prev_state.party.troops - curr_strength)
supply_burn = max(0, prev_state.party.food_days - curr_state.party.food_days)
subgoal_bonus = (
1.0 if active_subgoal.token in _SUBGOAL_TRIGGERS else 0.0
)
total = (
_W1_TERRITORY * territory_delta
+ _W2_ARMY_RATIO * army_delta * 10
- _W3_CASUALTY * casualties
- _W4_SUPPLY * supply_burn
+ _W5_SUBGOAL * subgoal_bonus * 10
)
return VassalReward(
agent_id=self.AGENT_ID,
component_scores={
"territory": territory_delta,
"army_ratio": army_delta,
"casualties": -casualties,
"supply_burn": -supply_burn,
"subgoal_bonus": subgoal_bonus,
},
subgoal_bonus=subgoal_bonus,
total=total,
)
# -- helpers -----------------------------------------------------------
@staticmethod
def _nearest_enemy_village(state: GameState) -> str | None:
enemy_names = set(state.kingdom.active_wars)
for faction in state.factions:
if faction.name in enemy_names and faction.fiefs:
return faction.fiefs[0]
return None

270
src/bannerlord/campaign.py Normal file
View File

@@ -0,0 +1,270 @@
"""Bannerlord M3 — Campaign orchestrator.
Ties together the King agent, vassals, companions, GABS client, and session
memory into a single async campaign loop.
Architecture::
CampaignOrchestrator.run()
├── GABSClient.get_game_state() → GameState
├── KingAgent.decide(state) → KingSubgoal
├── SessionMemory.log_subgoal(...)
├── WarVassal.plan(state, subgoal) → [TaskMessage]
├── EconomyVassal.plan(state, subgoal) → [TaskMessage]
├── DiplomacyVassal.plan(state, subgoal)→ [TaskMessage]
├── [Companions].evaluate(state, subgoal) → [primitives]
└── _dispatch_tasks([...]) → GABS calls
Usage::
from bannerlord.campaign import CampaignOrchestrator
orch = CampaignOrchestrator()
await orch.run(max_ticks=100)
"""
from __future__ import annotations
import asyncio
import logging
from pathlib import Path
from typing import Any
from bannerlord.agents.companions.caravan import CaravanCompanion
from bannerlord.agents.companions.logistics import LogisticsCompanion
from bannerlord.agents.companions.scout import ScoutCompanion
from bannerlord.agents.diplomacy_vassal import DiplomacyVassal
from bannerlord.agents.economy_vassal import EconomyVassal
from bannerlord.agents.king import KingAgent
from bannerlord.agents.war_vassal import WarVassal
from bannerlord.gabs_client import GABSClient
from bannerlord.session_memory import SessionMemory
from bannerlord.types import GameState, KingSubgoal, TaskMessage
logger = logging.getLogger(__name__)
_DEFAULT_TICK_INTERVAL = 1.0 # seconds between campaign ticks (real time)
_DEFAULT_DB_PATH = Path("data/bannerlord/campaign.db")
_KINGDOM_NAME = "House Timmerson"
class CampaignOrchestrator:
"""Full-campaign strategy orchestrator for Bannerlord M3.
Runs the King → Vassal → Companion decision loop on each campaign tick.
Persists progress to SQLite via SessionMemory.
Args:
gabs_host: Hostname where GABS mod is listening.
gabs_port: TCP port for GABS JSON-RPC (default 4825).
tick_interval: Real-time seconds between campaign ticks.
db_path: Path to the SQLite session memory database.
session_id: Existing session to resume (None = new session).
"""
def __init__(
self,
*,
gabs_host: str = "127.0.0.1",
gabs_port: int = 4825,
tick_interval: float = _DEFAULT_TICK_INTERVAL,
db_path: Path = _DEFAULT_DB_PATH,
session_id: str | None = None,
) -> None:
self._gabs = GABSClient(host=gabs_host, port=gabs_port)
self._king = KingAgent()
self._war = WarVassal()
self._economy = EconomyVassal()
self._diplomacy = DiplomacyVassal()
self._logistics = LogisticsCompanion()
self._caravan = CaravanCompanion()
self._scout = ScoutCompanion()
self._memory = SessionMemory(db_path)
self._tick_interval = tick_interval
self._session_id = session_id
self._running = False
self._prev_state: GameState | None = None
# -- lifecycle ---------------------------------------------------------
async def start(self) -> str:
"""Connect to GABS and initialise a campaign session.
Returns the active session_id.
"""
connected = await self._gabs.connect()
if not connected:
logger.warning(
"CampaignOrchestrator: GABS unavailable — campaign will run "
"in degraded mode (no game state updates)"
)
if self._session_id is None:
self._session_id = self._memory.start_session()
else:
# Resume existing session
existing = self._memory.get_session(self._session_id)
if not existing:
self._session_id = self._memory.start_session(self._session_id)
self._king.set_session(self._session_id)
logger.info("CampaignOrchestrator: session=%s", self._session_id)
return self._session_id
async def stop(self) -> None:
"""Gracefully stop the campaign and disconnect from GABS."""
self._running = False
await self._gabs.disconnect()
logger.info("CampaignOrchestrator: stopped")
# -- main campaign loop ------------------------------------------------
async def run(self, max_ticks: int = 0) -> dict[str, Any]:
"""Run the campaign loop.
Args:
max_ticks: Stop after this many ticks. 0 = run indefinitely.
Returns:
Campaign summary dict.
"""
if not self._session_id:
await self.start()
self._running = True
tick = 0
logger.info(
"CampaignOrchestrator: starting campaign loop (max_ticks=%d)",
max_ticks,
)
try:
while self._running:
if max_ticks > 0 and tick >= max_ticks:
break
await self._tick(tick)
tick += 1
# Check done condition
if self._prev_state and self._king.is_done_condition_met(
self._prev_state
):
logger.info(
"CampaignOrchestrator: M3 DONE condition met on tick %d!", tick
)
self._memory.add_note(
self._session_id or "",
self._prev_state.in_game_day,
"milestone",
"M3 done condition met — kingdom with 3+ fiefs, 100 days survived",
)
break
await asyncio.sleep(self._tick_interval)
except asyncio.CancelledError:
logger.info("CampaignOrchestrator: loop cancelled")
finally:
await self.stop()
return self._summary(tick)
async def _tick(self, tick: int) -> None:
"""Execute one campaign tick."""
# 1. Observe
state = await self._gabs.get_game_state()
state.tick = tick
# 2. King decides
subgoal = self._king.decide(state)
# 3. Log subgoal to session memory
if self._session_id:
row_id = self._memory.log_subgoal(
self._session_id, tick, state.in_game_day, subgoal
)
self._memory.update_session(
self._session_id,
in_game_day=state.in_game_day,
fief_count=state.fief_count(),
kingdom_name=state.kingdom.name or None,
)
# 4. Vassal planning
tasks: list[TaskMessage] = []
if self._war.is_relevant(subgoal):
tasks.extend(self._war.plan(state, subgoal))
if self._economy.is_relevant(subgoal):
tasks.extend(self._economy.plan(state, subgoal))
if self._diplomacy.is_relevant(subgoal):
tasks.extend(self._diplomacy.plan(state, subgoal))
# 5. Companion evaluation
companion_actions = (
self._logistics.evaluate(state, subgoal)
+ self._caravan.evaluate(state, subgoal)
+ self._scout.evaluate(state, subgoal)
)
# 6. Dispatch tasks + companion primitives to GABS
await self._dispatch_tasks(tasks, state)
await self._dispatch_primitives(companion_actions)
# 7. Kingdom establishment check
if not state.has_kingdom() and state.fief_count() > 0:
# We have a fief but no kingdom yet — establish one
ok = await self._gabs.establish_kingdom(_KINGDOM_NAME)
if ok and self._session_id:
self._memory.record_kingdom_established(
self._session_id, state.in_game_day, _KINGDOM_NAME
)
self._prev_state = state
logger.debug(
"Tick %d: day=%d, subgoal=%s, tasks=%d, companions=%d",
tick,
state.in_game_day,
subgoal.token,
len(tasks),
len(companion_actions),
)
# -- task dispatch -----------------------------------------------------
async def _dispatch_tasks(
self, tasks: list[TaskMessage], state: GameState
) -> None:
"""Dispatch vassal TaskMessages to GABS."""
for task in sorted(tasks, key=lambda t: t.priority, reverse=True):
if task.to_agent != "gabs":
# Companion-directed tasks are handled via companion.evaluate()
continue
await self._gabs._call(task.primitive, task.args)
async def _dispatch_primitives(self, actions: list[dict]) -> None:
"""Dispatch companion primitive actions to GABS."""
for action in actions:
primitive = action.get("primitive", "")
args = action.get("args", {})
if primitive:
await self._gabs._call(primitive, args)
# -- summary -----------------------------------------------------------
def _summary(self, ticks_run: int) -> dict[str, Any]:
state = self._prev_state or GameState()
summary = self._king.campaign_summary(state)
summary["ticks_run"] = ticks_run
summary["session_id"] = self._session_id
return summary
# -- accessors ---------------------------------------------------------
@property
def session_id(self) -> str | None:
return self._session_id
@property
def memory(self) -> SessionMemory:
return self._memory

View File

@@ -1,200 +0,0 @@
"""Bannerlord M2 campaign action primitives.
Wraps the four key GABS tools for the M2 milestone:
- party/move_to_settlement → move the party to a named settlement
- inventory/buy_item → purchase trade goods
- party/recruit_all → hire all available recruits
- party/engage_party → engage a nearby bandit party
All functions are async and return an ``ActionResult`` that is compatible
with the ``WorldInterface`` contract.
Error handling follows Pattern 3 (Feature Disable): if GABS rejects an
action, log a warning and return a FAILURE result — never raise.
"""
from __future__ import annotations
import logging
from enum import StrEnum
from typing import TYPE_CHECKING
from infrastructure.world.types import ActionResult, ActionStatus
if TYPE_CHECKING:
from bannerlord.gabs_client import GabsClient
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# GABS method names — canonical reference
# ---------------------------------------------------------------------------
class GabsTool(StrEnum):
"""GABS JSON-RPC method names for the M2 action set."""
MOVE_TO_SETTLEMENT = "party/move_to_settlement"
BUY_ITEM = "inventory/buy_item"
RECRUIT_ALL = "party/recruit_all"
ENGAGE_PARTY = "party/engage_party"
# ---------------------------------------------------------------------------
# Action functions
# ---------------------------------------------------------------------------
async def move_to_settlement(
client: "GabsClient",
settlement_id: str,
*,
settlement_name: str = "",
) -> ActionResult:
"""Move the party to a target settlement.
Parameters
----------
client:
Connected ``GabsClient`` instance.
settlement_id:
GABS settlement identifier (e.g. ``"town_A1"``).
settlement_name:
Human-readable name for logging only.
"""
label = settlement_name or settlement_id
try:
result = await client.call(
GabsTool.MOVE_TO_SETTLEMENT,
{"settlement_id": settlement_id},
)
logger.info("MOVE → %s: %s", label, result)
return ActionResult(
status=ActionStatus.SUCCESS,
message=f"Moving to {label}",
data=result or {},
)
except Exception as exc: # noqa: BLE001
logger.warning("MOVE → %s failed: %s", label, exc)
return ActionResult(
status=ActionStatus.FAILURE,
message=f"Move to {label} failed: {exc}",
data={},
)
async def buy_item(
client: "GabsClient",
item_id: str,
quantity: int,
*,
settlement_id: str = "",
) -> ActionResult:
"""Purchase trade goods from the current or specified settlement.
Parameters
----------
client:
Connected ``GabsClient`` instance.
item_id:
Item identifier (e.g. ``"grain"``, ``"iron"``, ``"wool"``).
quantity:
Number of units to purchase.
settlement_id:
Optional target settlement; empty means current location.
"""
try:
params: dict = {"item_id": item_id, "quantity": quantity}
if settlement_id:
params["settlement_id"] = settlement_id
result = await client.call(GabsTool.BUY_ITEM, params)
logger.info("BUY %dx %s: %s", quantity, item_id, result)
return ActionResult(
status=ActionStatus.SUCCESS,
message=f"Purchased {quantity}x {item_id}",
data=result or {},
)
except Exception as exc: # noqa: BLE001
logger.warning("BUY %dx %s failed: %s", quantity, item_id, exc)
return ActionResult(
status=ActionStatus.FAILURE,
message=f"Buy {item_id} failed: {exc}",
data={},
)
async def recruit_all(
client: "GabsClient",
*,
settlement_id: str = "",
) -> ActionResult:
"""Recruit all available troops at the current or specified settlement.
Parameters
----------
client:
Connected ``GabsClient`` instance.
settlement_id:
Optional settlement to recruit from; empty means current.
"""
try:
params: dict = {}
if settlement_id:
params["settlement_id"] = settlement_id
result = await client.call(GabsTool.RECRUIT_ALL, params)
recruited = (result or {}).get("recruited", "?")
logger.info("RECRUIT_ALL: recruited %s troops", recruited)
return ActionResult(
status=ActionStatus.SUCCESS,
message=f"Recruited {recruited} troops",
data=result or {},
)
except Exception as exc: # noqa: BLE001
logger.warning("RECRUIT_ALL failed: %s", exc)
return ActionResult(
status=ActionStatus.FAILURE,
message=f"Recruit all failed: {exc}",
data={},
)
async def engage_party(
client: "GabsClient",
party_id: str,
*,
party_name: str = "",
) -> ActionResult:
"""Engage a nearby party (typically a bandit gang) in combat.
Auto-resolve is expected at high Tactics skill — the agent relies
on GABS to handle the battle outcome.
Parameters
----------
client:
Connected ``GabsClient`` instance.
party_id:
GABS party identifier of the target.
party_name:
Human-readable name for logging only.
"""
label = party_name or party_id
try:
result = await client.call(GabsTool.ENGAGE_PARTY, {"party_id": party_id})
outcome = (result or {}).get("outcome", "unknown")
logger.info("ENGAGE %s: %s", label, outcome)
return ActionResult(
status=ActionStatus.SUCCESS,
message=f"Engaged {label}: {outcome}",
data=result or {},
)
except Exception as exc: # noqa: BLE001
logger.warning("ENGAGE %s failed: %s", label, exc)
return ActionResult(
status=ActionStatus.FAILURE,
message=f"Engage {label} failed: {exc}",
data={},
)

View File

@@ -1,316 +0,0 @@
"""Bannerlord M2 campaign action loop.
Implements the observe → decide → act → wait pipeline described in
issue #1094. The loop runs until the M2 victory conditions are met
(100 troops + 10 000 gold) or until stopped externally.
Architecture:
CampaignLoop.run()
while not m2_complete:
state = gabs.get_game_state() # observe
decision = decide(state) # decide (local Qwen3)
result = dispatch(decision, gabs) # act (GABS)
await asyncio.sleep(tick_seconds) # wait
Error handling:
- GABS connection failures → log + retry with backoff (max 3 attempts)
- LLM failures → WAIT action (graceful degradation)
- Action failures → log + continue to next tick
Progress tracking:
Loop publishes heartbeat events via the event bus so the dashboard
can display live party size and gold.
"""
from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime
from bannerlord.campaign_actions import buy_item, engage_party, move_to_settlement, recruit_all
from bannerlord.campaign_state import parse_campaign_state
from bannerlord.decision import M2Action, decide
from bannerlord.gabs_client import GabsClient
from config import settings
from infrastructure.world.types import ActionResult, ActionStatus
logger = logging.getLogger(__name__)
_MAX_RECONNECT_ATTEMPTS = 3
_RECONNECT_DELAY = 5.0 # seconds
# ---------------------------------------------------------------------------
# Progress snapshot (emitted each tick)
# ---------------------------------------------------------------------------
@dataclass
class TickResult:
"""Summary of one campaign tick."""
tick: int
timestamp: str
party_size: int
gold: int
action: str
action_status: str
reasoning: str
duration_ms: int
m2_complete: bool = False
error: str = ""
# ---------------------------------------------------------------------------
# Campaign loop
# ---------------------------------------------------------------------------
class CampaignLoop:
"""Runs the Bannerlord M2 autonomous campaign.
Parameters
----------
gabs_host:
Override GABS server host.
gabs_port:
Override GABS server port.
tick_seconds:
Real-time pause between in-game ticks.
on_tick:
Optional async callback invoked after each tick with the
``TickResult``. Used by the dashboard for live updates.
max_ticks:
Hard cap for testing / benchmarking. 0 = unlimited.
"""
def __init__(
self,
*,
gabs_host: str | None = None,
gabs_port: int | None = None,
tick_seconds: float | None = None,
on_tick=None,
max_ticks: int = 0,
) -> None:
self._host = gabs_host or settings.gabs_host
self._port = gabs_port or settings.gabs_port
self._tick_seconds = tick_seconds if tick_seconds is not None else settings.bannerlord_tick_seconds
self._on_tick = on_tick
self._max_ticks = max_ticks
self._running = False
self.history: list[TickResult] = []
# -- public API --------------------------------------------------------
@property
def is_running(self) -> bool:
return self._running
def stop(self) -> None:
"""Signal the loop to stop after the current tick."""
self._running = False
logger.info("CampaignLoop stop requested")
async def run(self) -> list[TickResult]:
"""Start the campaign loop.
Returns the list of tick results (for testing / benchmarking).
Runs until M2 complete, externally stopped, or max_ticks reached.
"""
self._running = True
logger.info(
"CampaignLoop starting — gabs=%s:%d tick=%.1fs",
self._host,
self._port,
self._tick_seconds,
)
client = GabsClient(host=self._host, port=self._port)
try:
await self._connect_with_retry(client)
except RuntimeError as exc: # noqa: BLE001
logger.error("CampaignLoop: could not connect to GABS — aborting: %s", exc)
self._running = False
return self.history
tick_num = 0
try:
while self._running:
tick_num += 1
if self._max_ticks > 0 and tick_num > self._max_ticks:
logger.info("CampaignLoop: max_ticks=%d reached", self._max_ticks)
break
result = await self._run_tick(client, tick_num)
self.history.append(result)
await self._emit(result)
if result.m2_complete:
logger.info(
"M2 COMPLETE! Party=%d troops, Gold=%d denars",
result.party_size,
result.gold,
)
break
if result.error and not self._running:
break
await asyncio.sleep(self._tick_seconds)
finally:
await client.disconnect()
self._running = False
logger.info("CampaignLoop stopped after %d ticks", tick_num)
return self.history
# -- internal: single tick ---------------------------------------------
async def _run_tick(self, client: "Any", tick_num: int) -> TickResult:
"""Execute one observe → decide → act cycle."""
start = time.monotonic()
# 1. Observe
raw_state = await client.get_game_state()
state = parse_campaign_state(raw_state)
state = _override_tick(state, tick_num)
# 2. Decide
decision = await decide(state)
# 3. Act
action_result = await self._dispatch(decision, client)
duration_ms = int((time.monotonic() - start) * 1000)
return TickResult(
tick=tick_num,
timestamp=datetime.now(UTC).isoformat(),
party_size=state.party.party_size,
gold=state.economy.gold,
action=decision.action,
action_status=action_result.status.value,
reasoning=decision.reasoning,
duration_ms=duration_ms,
m2_complete=state.m2_complete,
)
async def _dispatch(self, decision: "Any", client: "Any") -> "Any":
"""Route the decision to the correct GABS action function."""
action = decision.action
if action == M2Action.MOVE:
if not decision.settlement_id:
logger.warning("MOVE decision has no settlement_id — skipping")
return ActionResult(
status=ActionStatus.FAILURE,
message="MOVE missing settlement_id",
)
return await move_to_settlement(
client,
decision.settlement_id,
settlement_name=decision.settlement_name,
)
elif action == M2Action.TRADE:
if not decision.item_id:
logger.warning("TRADE decision has no item_id — skipping")
return ActionResult(
status=ActionStatus.FAILURE,
message="TRADE missing item_id",
)
return await buy_item(
client,
decision.item_id,
decision.quantity,
settlement_id=decision.settlement_id,
)
elif action == M2Action.RECRUIT:
return await recruit_all(
client,
settlement_id=decision.settlement_id,
)
elif action == M2Action.ENGAGE:
if not decision.party_id:
logger.warning("ENGAGE decision has no party_id — skipping")
return ActionResult(
status=ActionStatus.FAILURE,
message="ENGAGE missing party_id",
)
return await engage_party(
client,
decision.party_id,
party_name=decision.party_name,
)
else: # WAIT or unknown
logger.debug("Tick %s: WAIT — %s", decision.action, decision.reasoning)
return ActionResult(
status=ActionStatus.NOOP,
message=f"WAIT: {decision.reasoning}",
)
# -- internal: connectivity --------------------------------------------
async def _connect_with_retry(self, client: "Any") -> None:
"""Try to connect, retrying up to _MAX_RECONNECT_ATTEMPTS times."""
for attempt in range(1, _MAX_RECONNECT_ATTEMPTS + 1):
try:
await client.connect()
return
except Exception as exc: # noqa: BLE001
logger.warning(
"GABS connect attempt %d/%d failed: %s",
attempt,
_MAX_RECONNECT_ATTEMPTS,
exc,
)
if attempt < _MAX_RECONNECT_ATTEMPTS:
await asyncio.sleep(_RECONNECT_DELAY)
raise RuntimeError(
f"Could not connect to GABS at {self._host}:{self._port} "
f"after {_MAX_RECONNECT_ATTEMPTS} attempts"
)
# -- internal: event emission ------------------------------------------
async def _emit(self, result: TickResult) -> None:
"""Emit tick data to the event bus (best-effort)."""
try:
from infrastructure.events.bus import event_bus # noqa: PLC0415
await event_bus.publish(
"bannerlord.tick",
{
"tick": result.tick,
"party_size": result.party_size,
"gold": result.gold,
"action": result.action,
"action_status": result.action_status,
"m2_complete": result.m2_complete,
"duration_ms": result.duration_ms,
},
)
except Exception as exc: # noqa: BLE001
logger.debug("CampaignLoop emit skipped: %s", exc)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _override_tick(state: "Any", tick_num: int) -> "Any":
"""Set the tick counter from the loop (GABS may not provide it)."""
if state.tick == 0:
state.tick = tick_num
return state

View File

@@ -1,213 +0,0 @@
"""Bannerlord campaign state models.
Parses the raw GABS ``game/get_state`` payload into typed models and
tracks the M2 progress counters: party size and gold accumulation.
Done-condition (from issue #1094):
party_size >= 100 AND gold >= 10_000
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
logger = logging.getLogger(__name__)
# M2 victory conditions
M2_TROOP_GOAL = 100
M2_GOLD_GOAL = 10_000
@dataclass
class PartyState:
"""Current party composition and position."""
party_size: int = 0
wounded: int = 0
prisoners: int = 0
food_days: float = 0.0
morale: float = 100.0
current_settlement: str = ""
speed: float = 0.0
@dataclass
class EconomyState:
"""Current gold and trade state."""
gold: int = 0
daily_income: int = 0
daily_expenses: int = 0
@property
def net_income(self) -> int:
return self.daily_income - self.daily_expenses
@dataclass
class NearbyParty:
"""A nearby lord/bandit party visible on the map."""
party_id: str
name: str
faction: str
is_hostile: bool
troop_count: int
distance: float
@dataclass
class Settlement:
"""A settlement visible or reachable from the current position."""
settlement_id: str
name: str
faction: str
is_friendly: bool
distance: float
has_recruits: bool = False
has_trade_goods: bool = False
@dataclass
class CampaignState:
"""Full parsed snapshot of the GABS game state.
Built from the raw ``dict`` returned by ``GabsClient.get_game_state()``.
"""
tick: int = 0
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
party: PartyState = field(default_factory=PartyState)
economy: EconomyState = field(default_factory=EconomyState)
nearby_parties: list[NearbyParty] = field(default_factory=list)
settlements: list[Settlement] = field(default_factory=list)
raw: dict[str, Any] = field(default_factory=dict)
# -- M2 progress -------------------------------------------------------
@property
def troops_progress(self) -> str:
"""Human-readable M2 troop progress."""
return f"{self.party.party_size}/{M2_TROOP_GOAL}"
@property
def gold_progress(self) -> str:
"""Human-readable M2 gold progress."""
return f"{self.economy.gold:,}/{M2_GOLD_GOAL:,}"
@property
def m2_complete(self) -> bool:
"""True when both M2 victory conditions are met."""
return self.party.party_size >= M2_TROOP_GOAL and self.economy.gold >= M2_GOLD_GOAL
# -- hostile detection -------------------------------------------------
def hostile_bandits_nearby(self, max_distance: float = 5.0) -> list[NearbyParty]:
"""Return hostile bandit parties within *max_distance* map units."""
return [
p
for p in self.nearby_parties
if p.is_hostile and "bandit" in p.faction.lower() and p.distance <= max_distance
]
def nearest_settlement(self, *, friendly_only: bool = False) -> Settlement | None:
"""Return the closest (optionally friendly) settlement."""
candidates = [s for s in self.settlements if not friendly_only or s.is_friendly]
if not candidates:
return None
return min(candidates, key=lambda s: s.distance)
def nearest_recruit_settlement(self) -> Settlement | None:
"""Return the nearest settlement that has recruits available."""
candidates = [s for s in self.settlements if s.has_recruits]
if not candidates:
return None
return min(candidates, key=lambda s: s.distance)
# ---------------------------------------------------------------------------
# Parser
# ---------------------------------------------------------------------------
def parse_campaign_state(raw: dict[str, Any]) -> CampaignState:
"""Build a ``CampaignState`` from the raw GABS state dict.
Unknown / missing fields are silently defaulted so the parser never
crashes when GABS returns partial data.
"""
if not raw:
logger.debug("parse_campaign_state: empty payload — returning default state")
return CampaignState(raw=raw)
# -- party -------------------------------------------------------------
party_raw = raw.get("party", {})
party = PartyState(
party_size=int(party_raw.get("size", 0)),
wounded=int(party_raw.get("wounded", 0)),
prisoners=int(party_raw.get("prisoners", 0)),
food_days=float(party_raw.get("food_days", 0.0)),
morale=float(party_raw.get("morale", 100.0)),
current_settlement=str(party_raw.get("current_settlement", "")),
speed=float(party_raw.get("speed", 0.0)),
)
# -- economy -----------------------------------------------------------
economy_raw = raw.get("economy", {})
economy = EconomyState(
gold=int(economy_raw.get("gold", 0)),
daily_income=int(economy_raw.get("daily_income", 0)),
daily_expenses=int(economy_raw.get("daily_expenses", 0)),
)
# -- nearby parties ----------------------------------------------------
nearby_parties = []
for p in raw.get("nearby_parties", []):
try:
if not isinstance(p, dict) or not p.get("id"):
logger.debug("Skipping malformed nearby_party entry: missing id")
continue
nearby_parties.append(
NearbyParty(
party_id=str(p.get("id", "")),
name=str(p.get("name", "")),
faction=str(p.get("faction", "")),
is_hostile=bool(p.get("is_hostile", False)),
troop_count=int(p.get("troop_count", 0)),
distance=float(p.get("distance", 999.0)),
)
)
except (KeyError, ValueError, TypeError, AttributeError) as exc:
logger.debug("Skipping malformed nearby_party entry: %s", exc)
# -- settlements -------------------------------------------------------
settlements = []
for s in raw.get("settlements", []):
try:
settlements.append(
Settlement(
settlement_id=str(s.get("id", "")),
name=str(s.get("name", "")),
faction=str(s.get("faction", "")),
is_friendly=bool(s.get("is_friendly", False)),
distance=float(s.get("distance", 999.0)),
has_recruits=bool(s.get("has_recruits", False)),
has_trade_goods=bool(s.get("has_trade_goods", False)),
)
)
except (KeyError, ValueError, TypeError, AttributeError) as exc:
logger.debug("Skipping malformed settlement entry: %s", exc)
return CampaignState(
tick=int(raw.get("tick", 0)),
timestamp=datetime.now(UTC),
party=party,
economy=economy,
nearby_parties=nearby_parties,
settlements=settlements,
raw=raw,
)

View File

@@ -1,284 +0,0 @@
"""LLM-powered campaign decision engine for Bannerlord M2.
Builds a structured prompt from the current ``CampaignState`` and asks
the local Qwen3 model (via Ollama) to choose one action from the M2
action vocabulary. Returns a ``CampaignDecision`` pydantic model with
the chosen action and its parameters.
The decision model is intentionally simple for M2:
MOVE → move to a named settlement
TRADE → buy a trade item
RECRUIT → hire troops at current/nearby settlement
ENGAGE → fight a nearby bandit party
WAIT → idle (e.g. low food, waiting for morale to recover)
Qwen3 responds in JSON mode with temperature=0.1 for deterministic play.
"""
from __future__ import annotations
import json
import logging
from enum import StrEnum
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Decision schema
# ---------------------------------------------------------------------------
class M2Action(StrEnum):
"""Vocabulary of actions available in the M2 milestone."""
MOVE = "MOVE"
TRADE = "TRADE"
RECRUIT = "RECRUIT"
ENGAGE = "ENGAGE"
WAIT = "WAIT"
class CampaignDecision:
"""Parsed LLM decision for one campaign tick.
Attributes
----------
action:
One of the ``M2Action`` values.
settlement_id:
Target settlement ID (for MOVE / RECRUIT / TRADE).
settlement_name:
Human-readable settlement name (for logging).
item_id:
Trade item to buy (for TRADE).
quantity:
Trade quantity (for TRADE).
party_id:
Target party ID (for ENGAGE).
party_name:
Human-readable party name (for ENGAGE / logging).
reasoning:
LLM's brief explanation of the choice.
"""
def __init__(
self,
action: M2Action = M2Action.WAIT,
*,
settlement_id: str = "",
settlement_name: str = "",
item_id: str = "",
quantity: int = 1,
party_id: str = "",
party_name: str = "",
reasoning: str = "",
) -> None:
self.action = action
self.settlement_id = settlement_id
self.settlement_name = settlement_name
self.item_id = item_id
self.quantity = quantity
self.party_id = party_id
self.party_name = party_name
self.reasoning = reasoning
def __repr__(self) -> str:
return (
f"CampaignDecision(action={self.action!r}, "
f"reasoning={self.reasoning[:60]!r})"
)
# ---------------------------------------------------------------------------
# Prompt builder
# ---------------------------------------------------------------------------
def build_decision_prompt(state: "Any") -> list[dict[str, str]]:
"""Return an OpenAI-style message list for the decision LLM.
Parameters
----------
state:
A ``CampaignState`` instance.
"""
# Build a compact context block
party = state.party
econ = state.economy
ctx_lines = [
f"Campaign tick: {state.tick}",
f"Party size: {party.party_size} troops ({party.wounded} wounded)",
f"Food: {party.food_days:.1f} days remaining",
f"Morale: {party.morale:.0f}/100",
f"Gold: {econ.gold:,} denars (net {econ.net_income:+d}/day)",
f"Current location: {party.current_settlement or 'travelling'}",
"",
"== M2 GOALS ==",
f"Troops: {state.troops_progress} (need 100)",
f"Gold: {state.gold_progress} (need 10,000)",
"",
]
# Nearby bandits
bandits = state.hostile_bandits_nearby()
if bandits:
ctx_lines.append("== NEARBY HOSTILE BANDITS ==")
for b in bandits[:3]:
ctx_lines.append(
f" - {b.name} (id={b.party_id}, {b.troop_count} troops, "
f"{b.distance:.1f} away)"
)
ctx_lines.append("")
# Settlements
settlements = state.settlements[:5]
if settlements:
ctx_lines.append("== REACHABLE SETTLEMENTS ==")
for s in settlements:
flags = []
if s.has_recruits:
flags.append("recruits")
if s.has_trade_goods:
flags.append("trade")
if not s.is_friendly:
flags.append("hostile-faction")
flag_str = f" [{', '.join(flags)}]" if flags else ""
ctx_lines.append(
f" - {s.name} (id={s.settlement_id}, "
f"{s.distance:.1f} away{flag_str})"
)
ctx_lines.append("")
context = "\n".join(ctx_lines)
system_prompt = (
"You are the campaign manager for Timmy, an autonomous Bannerlord agent. "
"Your job is to choose the single best action for this campaign tick. "
"Respond ONLY with a JSON object — no prose, no markdown fences.\n\n"
"JSON schema:\n"
'{\n'
' "action": "MOVE|TRADE|RECRUIT|ENGAGE|WAIT",\n'
' "settlement_id": "<id or empty>",\n'
' "settlement_name": "<name or empty>",\n'
' "item_id": "<item or empty>",\n'
' "quantity": <int>,\n'
' "party_id": "<id or empty>",\n'
' "party_name": "<name or empty>",\n'
' "reasoning": "<one sentence>"\n'
"}\n\n"
"Priority rules:\n"
"1. ENGAGE bandits only if they are weak (< 15 troops) and we have > 25 troops.\n"
"2. RECRUIT when a nearby settlement has recruits and party < 80 troops.\n"
"3. TRADE when gold < 5000 and a settlement has trade goods.\n"
"4. MOVE toward the nearest settlement with recruits or trade goods.\n"
"5. WAIT only if food < 1 day or morale < 40."
)
user_prompt = f"Current game state:\n\n{context}\nChoose the best action."
return [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
# ---------------------------------------------------------------------------
# Response parser
# ---------------------------------------------------------------------------
def parse_decision(raw_response: str) -> CampaignDecision:
"""Parse the LLM JSON response into a ``CampaignDecision``.
Falls back to ``WAIT`` on any parse error so the loop never crashes.
"""
# Strip accidental markdown code fences
text = raw_response.strip()
if text.startswith("```"):
lines = text.splitlines()
text = "\n".join(
line for line in lines if not line.startswith("```")
).strip()
try:
data = json.loads(text)
except json.JSONDecodeError as exc:
logger.warning("Decision parse error (bad JSON): %s | raw=%r", exc, raw_response[:200])
return CampaignDecision(action=M2Action.WAIT, reasoning="parse error")
try:
action_str = str(data.get("action", "WAIT")).upper()
try:
action = M2Action(action_str)
except ValueError:
logger.warning("Unknown action %r — defaulting to WAIT", action_str)
action = M2Action.WAIT
return CampaignDecision(
action=action,
settlement_id=str(data.get("settlement_id", "")),
settlement_name=str(data.get("settlement_name", "")),
item_id=str(data.get("item_id", "")),
quantity=max(1, int(data.get("quantity", 1))),
party_id=str(data.get("party_id", "")),
party_name=str(data.get("party_name", "")),
reasoning=str(data.get("reasoning", "")),
)
except (KeyError, ValueError, TypeError) as exc:
logger.warning("Decision parse error (bad fields): %s", exc)
return CampaignDecision(action=M2Action.WAIT, reasoning=f"field error: {exc}")
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
async def decide(state: "Any") -> CampaignDecision:
"""Ask the local LLM to choose a campaign action.
Uses the cascade router (Ollama → Claude fallback) configured in
``config/providers.yaml``. Gracefully returns WAIT on any LLM failure.
Parameters
----------
state:
A ``CampaignState`` instance.
Returns
-------
CampaignDecision
The chosen action and its parameters.
"""
from config import settings
messages = build_decision_prompt(state)
model = settings.bannerlord_model
try:
from infrastructure.router import get_router
router = get_router()
response = await router.complete(
messages=messages,
model=model,
temperature=0.1,
)
raw_text: str = response.get("content", "")
decision = parse_decision(raw_text)
logger.info(
"Decision [tick=%d]: %s%s",
state.tick,
decision.action,
decision.reasoning,
)
return decision
except Exception as exc: # noqa: BLE001
logger.warning("Decision LLM call failed: %s — defaulting to WAIT", exc)
return CampaignDecision(
action=M2Action.WAIT,
reasoning=f"LLM unavailable: {exc}",
)

View File

@@ -1,18 +1,18 @@
"""GABS TCP/JSON-RPC client for Bannerlord.
"""GABS TCP JSON-RPC client — connects to the Bannerlord.GABS mod.
Connects to the GABS C# mod (Bannerlord.GABS) over TCP on port 4825
and dispatches JSON-RPC 2.0 requests. All I/O is async; synchronous
callers must wrap in ``asyncio.to_thread()``.
The GABS (Game Agent Behavior System) mod exposes 90+ tools via a
TCP JSON-RPC 2.0 server on port 4825. This client wraps the transport
into a clean async interface used by all Bannerlord agents.
Architecture:
Bannerlord (Windows VM) ← GABS C# mod ← TCP:4825 ← this client
Degrades gracefully: if GABS is unreachable, methods return sensible
fallbacks and log a warning (never crash).
Usage::
async with GabsClient() as client:
state = await client.get_game_state()
result = await client.call("party/move_to_settlement",
{"settlement_id": "town_A1"})
Architecture reference:
Timmy (Qwen3 on Ollama, M3 Max)
→ GABSClient (this module, TCP JSON-RPC, port 4825)
→ Bannerlord.GABS C# mod
→ Game API + Harmony
→ Bannerlord (Windows VM)
"""
from __future__ import annotations
@@ -20,70 +20,92 @@ from __future__ import annotations
import asyncio
import json
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from config import settings
from bannerlord.types import (
FactionState,
GameState,
KingdomState,
PartyState,
)
logger = logging.getLogger(__name__)
# JSON-RPC framing: each message is newline-delimited UTF-8 JSON.
_ENCODING = "utf-8"
_NEWLINE = b"\n"
_DEFAULT_TIMEOUT = 30.0
_DEFAULT_HOST = "127.0.0.1"
_DEFAULT_PORT = 4825
_DEFAULT_TIMEOUT = 10.0 # seconds
_RECONNECT_DELAY = 5.0 # seconds between reconnect attempts
class GabsError(Exception):
"""Raised when GABS returns a JSON-RPC error response."""
@dataclass
class GABSTool:
"""Metadata for a single GABS tool."""
def __init__(self, code: int, message: str, data: Any = None) -> None:
super().__init__(f"GABS error {code}: {message}")
self.code = code
self.data = data
name: str
description: str
parameters: dict[str, Any] = field(default_factory=dict)
class GabsClient:
"""Async TCP JSON-RPC 2.0 client for the GABS Bannerlord mod.
class GABSConnectionError(Exception):
"""Raised when GABS is unreachable and a fallback is not possible."""
Parameters
----------
host:
GABS server host (Windows VM IP or ``localhost`` for port-forwarded).
port:
GABS server port (default 4825).
timeout:
Per-call timeout in seconds.
class GABSClient:
"""Async TCP JSON-RPC 2.0 client for the Bannerlord.GABS mod.
Usage::
async with GABSClient() as client:
state = await client.get_game_state()
await client.move_party("Vlandia")
All public methods degrade gracefully — they return ``None`` or an
empty structure when GABS is unavailable.
"""
def __init__(
self,
*,
host: str | None = None,
port: int | None = None,
host: str = _DEFAULT_HOST,
port: int = _DEFAULT_PORT,
timeout: float = _DEFAULT_TIMEOUT,
) -> None:
self._host = host or settings.gabs_host
self._port = port or settings.gabs_port
self._host = host
self._port = port
self._timeout = timeout
self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None
self._req_id = 0
self._connected = False
self._call_id = 0
self._available_tools: list[GABSTool] = []
# -- lifecycle ---------------------------------------------------------
async def connect(self) -> None:
"""Open the TCP connection to GABS."""
async def connect(self) -> bool:
"""Open a TCP connection to GABS.
Returns:
True if connected successfully, False if GABS is unavailable.
"""
try:
self._reader, self._writer = await asyncio.wait_for(
asyncio.open_connection(self._host, self._port),
timeout=self._timeout,
)
self._connected = True
logger.info("GabsClient connected to %s:%d", self._host, self._port)
except (OSError, asyncio.TimeoutError) as exc:
logger.warning("GabsClient could not connect to GABS: %s", exc)
logger.info("GABSClient connected to %s:%d", self._host, self._port)
await self._discover_tools()
return True
except (ConnectionRefusedError, OSError, TimeoutError, asyncio.TimeoutError) as exc:
logger.warning(
"GABSClient could not connect to %s:%d%s",
self._host,
self._port,
exc,
)
self._connected = False
raise
return False
async def disconnect(self) -> None:
"""Close the TCP connection."""
@@ -91,105 +113,322 @@ class GabsClient:
try:
self._writer.close()
await self._writer.wait_closed()
except Exception as exc: # noqa: BLE001
logger.debug("GabsClient disconnect error (ignored): %s", exc)
except Exception as exc:
logger.debug("GABSClient disconnect error: %s", exc)
self._connected = False
self._reader = None
self._writer = None
logger.info("GabsClient disconnected")
logger.info("GABSClient disconnected")
async def __aenter__(self) -> GABSClient:
await self.connect()
return self
async def __aexit__(self, *_: object) -> None:
await self.disconnect()
@property
def is_connected(self) -> bool:
return self._connected
# -- context manager ---------------------------------------------------
# -- raw JSON-RPC transport --------------------------------------------
async def __aenter__(self) -> "GabsClient":
await self.connect()
return self
def _next_id(self) -> int:
self._call_id += 1
return self._call_id
async def __aexit__(self, *_: Any) -> None:
await self.disconnect()
async def _call(self, method: str, params: dict[str, Any] | None = None) -> Any:
"""Send a JSON-RPC 2.0 request and return the result.
# -- public API --------------------------------------------------------
async def call(self, method: str, params: dict[str, Any] | None = None) -> Any:
"""Call a GABS tool and return the result.
Parameters
----------
method:
GABS tool name, e.g. ``"party/move_to_settlement"``.
params:
Tool parameters dict.
Returns
-------
Any
The ``result`` field from the JSON-RPC response.
Raises
------
GabsError
If GABS returns an error response.
RuntimeError
If not connected.
Returns ``None`` and logs a warning on any error.
"""
if not self._connected or self._writer is None or self._reader is None:
raise RuntimeError("GabsClient is not connected — call connect() first")
if not self._connected:
logger.warning("GABSClient._call(%s): not connected", method)
return None
self._req_id += 1
request = {
"jsonrpc": "2.0",
"id": self._req_id,
"id": self._next_id(),
"method": method,
"params": params or {},
}
raw = json.dumps(request).encode(_ENCODING) + _NEWLINE
try:
self._writer.write(raw)
payload = json.dumps(request) + "\n"
assert self._writer is not None
self._writer.write(payload.encode())
await asyncio.wait_for(self._writer.drain(), timeout=self._timeout)
line = await asyncio.wait_for(
assert self._reader is not None
raw = await asyncio.wait_for(
self._reader.readline(), timeout=self._timeout
)
except asyncio.TimeoutError as exc:
raise RuntimeError(f"GABS call '{method}' timed out after {self._timeout}s") from exc
except (OSError, ConnectionResetError) as exc:
response = json.loads(raw.decode().strip())
if "error" in response:
logger.warning(
"GABS error for %s: %s", method, response["error"].get("message")
)
return None
return response.get("result")
except (asyncio.TimeoutError, json.JSONDecodeError, AssertionError, OSError) as exc:
logger.warning("GABSClient._call(%s) failed: %s", method, exc)
self._connected = False
raise RuntimeError(f"GABS connection lost during '{method}': {exc}") from exc
return None
response = json.loads(line.decode(_ENCODING))
# -- tool discovery ----------------------------------------------------
if "error" in response:
err = response["error"]
raise GabsError(
code=err.get("code", -1),
message=err.get("message", "unknown error"),
data=err.get("data"),
async def _discover_tools(self) -> None:
"""Populate self._available_tools via GABS tools/list."""
result = await self._call("tools/list")
if not result:
return
self._available_tools = [
GABSTool(
name=t.get("name", ""),
description=t.get("description", ""),
parameters=t.get("parameters", {}),
)
for t in (result if isinstance(result, list) else [])
]
logger.info("GABS: discovered %d tools", len(self._available_tools))
return response.get("result")
@property
def available_tools(self) -> list[GABSTool]:
"""Return the list of tools discovered from GABS."""
return list(self._available_tools)
async def get_game_state(self) -> dict[str, Any]:
"""Return the full game state snapshot from GABS.
def tool_count(self) -> int:
return len(self._available_tools)
Returns an empty dict and logs a warning if GABS is unreachable.
# -- game state --------------------------------------------------------
async def get_game_state(self) -> GameState:
"""Return the full campaign state snapshot.
Falls back to an empty GameState if GABS is unavailable.
"""
try:
result = await self.call("game/get_state")
return result if isinstance(result, dict) else {}
except (GabsError, RuntimeError) as exc:
logger.warning("GABS get_game_state failed: %s", exc)
return {}
raw = await self._call("game/get_state")
if raw is None:
return GameState()
async def ping(self) -> bool:
"""Return True if GABS responds to a ping."""
try:
await self.call("game/ping")
return True
except Exception as exc: # noqa: BLE001
logger.debug("GABS ping failed: %s", exc)
return False
return self._parse_game_state(raw)
except Exception as exc:
logger.warning("Failed to parse GABS game state: %s", exc)
return GameState()
def _parse_game_state(self, raw: dict[str, Any]) -> GameState:
"""Convert raw GABS state dict into a typed GameState."""
party_raw = raw.get("party", {})
kingdom_raw = raw.get("kingdom", {})
factions_raw = raw.get("factions", [])
party = PartyState(
location=party_raw.get("location", ""),
troops=party_raw.get("troops", 0),
food_days=party_raw.get("food_days", 0),
wounded_pct=party_raw.get("wounded_pct", 0.0),
denars=party_raw.get("denars", 0),
morale=party_raw.get("morale", 100.0),
prisoners=party_raw.get("prisoners", 0),
)
kingdom = KingdomState(
name=kingdom_raw.get("name", ""),
fiefs=kingdom_raw.get("fiefs", []),
daily_income=kingdom_raw.get("daily_income", 0),
daily_expenses=kingdom_raw.get("daily_expenses", 0),
vassal_lords=kingdom_raw.get("vassal_lords", []),
active_wars=kingdom_raw.get("active_wars", []),
active_alliances=kingdom_raw.get("active_alliances", []),
in_game_day=raw.get("in_game_day", 0),
)
factions = [
FactionState(
name=f.get("name", ""),
leader=f.get("leader", ""),
fiefs=f.get("fiefs", []),
army_strength=f.get("army_strength", 0),
treasury=f.get("treasury", 0),
is_at_war_with=f.get("is_at_war_with", []),
relations=f.get("relations", {}),
)
for f in (factions_raw if isinstance(factions_raw, list) else [])
]
return GameState(
tick=raw.get("tick", 0),
in_game_day=raw.get("in_game_day", 0),
timestamp=datetime.now(UTC),
party=party,
kingdom=kingdom,
factions=factions,
raw=raw,
)
# -- party actions -----------------------------------------------------
async def move_party(self, destination: str) -> bool:
"""Command Timmy's party to move toward *destination*."""
result = await self._call("party/move", {"destination": destination})
return result is not None
async def recruit_troops(self, troop_type: str, quantity: int) -> bool:
"""Recruit *quantity* troops of *troop_type* at current location."""
result = await self._call(
"party/recruit",
{"troop_type": troop_type, "quantity": quantity},
)
return result is not None
async def buy_supplies(self, quantity: int) -> bool:
"""Purchase food supplies for *quantity* days of march."""
result = await self._call("party/buy_supplies", {"quantity": quantity})
return result is not None
async def rest_party(self, days: int) -> bool:
"""Rest the party in current location for *days* in-game days."""
result = await self._call("party/rest", {"days": days})
return result is not None
async def auto_resolve_battle(self) -> dict[str, Any]:
"""Trigger auto-resolve for the current battle.
Returns the battle outcome dict, or empty dict on failure.
"""
result = await self._call("battle/auto_resolve")
return result if isinstance(result, dict) else {}
async def upgrade_troops(self) -> bool:
"""Spend accumulated XP on troop tier upgrades."""
result = await self._call("party/upgrade_troops")
return result is not None
async def sell_prisoners(self, location: str) -> int:
"""Sell prisoners at *location*. Returns denars gained."""
result = await self._call("party/sell_prisoners", {"location": location})
if isinstance(result, dict):
return result.get("denars_gained", 0)
return 0
# -- trade actions -----------------------------------------------------
async def assess_prices(self, town: str) -> dict[str, Any]:
"""Query buy/sell prices at *town*."""
result = await self._call("trade/assess_prices", {"town": town})
return result if isinstance(result, dict) else {}
async def buy_goods(self, item: str, quantity: int) -> bool:
"""Purchase *quantity* of *item* at current location."""
result = await self._call("trade/buy", {"item": item, "quantity": quantity})
return result is not None
async def sell_goods(self, item: str, quantity: int, location: str) -> bool:
"""Sell *quantity* of *item* at *location*."""
result = await self._call(
"trade/sell",
{"item": item, "quantity": quantity, "location": location},
)
return result is not None
async def establish_caravan(self, town: str) -> bool:
"""Deploy a caravan NPC at *town*."""
result = await self._call("trade/establish_caravan", {"town": town})
return result is not None
# -- diplomacy actions -------------------------------------------------
async def send_envoy(self, faction: str, message: str) -> bool:
"""Send a diplomatic message to *faction*."""
result = await self._call(
"diplomacy/send_envoy",
{"faction": faction, "message": message},
)
return result is not None
async def propose_peace(self, faction: str, tribute: int = 0) -> bool:
"""Propose peace with *faction*, optionally offering *tribute* denars."""
result = await self._call(
"diplomacy/propose_peace",
{"faction": faction, "tribute": tribute},
)
return result is not None
async def request_alliance(self, faction: str) -> bool:
"""Request a military alliance with *faction*."""
result = await self._call(
"diplomacy/request_alliance",
{"faction": faction},
)
return result is not None
async def request_military_access(self, faction: str) -> bool:
"""Request military access through *faction*'s territory."""
result = await self._call(
"diplomacy/military_access",
{"faction": faction},
)
return result is not None
# -- settlement / kingdom actions --------------------------------------
async def siege_settlement(self, settlement: str) -> bool:
"""Begin siege of *settlement*."""
result = await self._call("military/siege", {"settlement": settlement})
return result is not None
async def raid_village(self, village: str) -> bool:
"""Raid *village* for food and denars."""
result = await self._call("military/raid_village", {"village": village})
return result is not None
async def build_project(self, settlement: str, project: str) -> bool:
"""Queue a construction *project* in *settlement*."""
result = await self._call(
"settlement/build",
{"settlement": settlement, "project": project},
)
return result is not None
async def set_tax_policy(self, settlement: str, policy: str) -> bool:
"""Set tax *policy* for *settlement* (e.g. 'low', 'normal', 'high')."""
result = await self._call(
"settlement/set_tax",
{"settlement": settlement, "policy": policy},
)
return result is not None
async def appoint_governor(self, settlement: str, lord: str) -> bool:
"""Appoint *lord* as governor of *settlement*."""
result = await self._call(
"settlement/appoint_governor",
{"settlement": settlement, "lord": lord},
)
return result is not None
async def establish_kingdom(self, name: str) -> bool:
"""Declare a new kingdom with *name* (requires a captured fief)."""
result = await self._call("kingdom/establish", {"name": name})
ok = result is not None
if ok:
logger.info("Kingdom '%s' established!", name)
return ok
# -- scouting ----------------------------------------------------------
async def track_lord(self, lord_name: str) -> dict[str, Any]:
"""Shadow *lord_name* and return their last-known position."""
result = await self._call("scout/track_lord", {"lord": lord_name})
return result if isinstance(result, dict) else {}
async def assess_garrison(self, settlement: str) -> dict[str, Any]:
"""Estimate defender count for *settlement*."""
result = await self._call("scout/assess_garrison", {"settlement": settlement})
return result if isinstance(result, dict) else {}
async def map_patrol_routes(self, region: str) -> list[dict[str, Any]]:
"""Log enemy patrol routes in *region*."""
result = await self._call("scout/patrol_routes", {"region": region})
return result if isinstance(result, list) else []

View File

@@ -0,0 +1,347 @@
"""Bannerlord M3 — Session memory for multi-day strategic plans.
Persists the King's strategic plans, completed subgoals, and kingdom
milestones to SQLite so the campaign can be interrupted and resumed
across multiple sessions.
Pattern follows the existing EventBus persistence model.
"""
from __future__ import annotations
import json
import logging
import sqlite3
from contextlib import closing
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from bannerlord.types import KingSubgoal, SubgoalToken
logger = logging.getLogger(__name__)
_SCHEMA = """
CREATE TABLE IF NOT EXISTS campaign_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
started_at TEXT NOT NULL,
last_updated TEXT NOT NULL,
kingdom_name TEXT DEFAULT '',
in_game_day INTEGER DEFAULT 0,
fief_count INTEGER DEFAULT 0,
meta TEXT DEFAULT '{}'
);
CREATE TABLE IF NOT EXISTS subgoal_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
tick INTEGER NOT NULL,
in_game_day INTEGER NOT NULL,
token TEXT NOT NULL,
target TEXT,
quantity INTEGER,
priority REAL DEFAULT 1.0,
deadline_days INTEGER,
context TEXT,
issued_at TEXT NOT NULL,
completed_at TEXT,
outcome TEXT DEFAULT 'pending'
);
CREATE TABLE IF NOT EXISTS strategy_notes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
in_game_day INTEGER NOT NULL,
note_type TEXT NOT NULL,
content TEXT NOT NULL,
recorded_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_subgoal_session ON subgoal_log(session_id);
CREATE INDEX IF NOT EXISTS idx_subgoal_tick ON subgoal_log(tick);
CREATE INDEX IF NOT EXISTS idx_notes_session ON strategy_notes(session_id);
"""
@dataclass
class CampaignMilestone:
"""A notable campaign achievement recorded in session memory."""
in_game_day: int
event: str
detail: str = ""
recorded_at: datetime = field(default_factory=lambda: datetime.now(UTC))
class SessionMemory:
"""SQLite-backed session memory for the Bannerlord campaign.
Stores:
- Active session metadata (kingdom name, in-game day, fief count)
- Full subgoal history (every KingSubgoal issued and its outcome)
- Strategy notes / milestones for campaign reflection
Usage::
mem = SessionMemory(Path("data/bannerlord/campaign.db"))
session_id = mem.start_session()
mem.log_subgoal(session_id, tick=1, day=42, subgoal)
mem.complete_subgoal(subgoal_id, outcome="success")
mem.add_note(session_id, day=42, note_type="milestone",
content="Kingdom established: House Timmerson")
history = mem.get_recent_subgoals(session_id, limit=10)
"""
def __init__(self, db_path: Path) -> None:
self._db_path = db_path
self._init_db()
def _init_db(self) -> None:
self._db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(self._db_path))) as conn:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
conn.executescript(_SCHEMA)
conn.commit()
def _conn(self) -> sqlite3.Connection:
conn = sqlite3.connect(str(self._db_path))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA busy_timeout=5000")
return conn
# -- session lifecycle -------------------------------------------------
def start_session(self, session_id: str | None = None) -> str:
"""Create a new campaign session. Returns the session_id."""
if session_id is None:
session_id = f"session_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}"
now = datetime.now(UTC).isoformat()
with closing(self._conn()) as conn:
conn.execute(
"INSERT OR IGNORE INTO campaign_sessions "
"(session_id, started_at, last_updated) VALUES (?, ?, ?)",
(session_id, now, now),
)
conn.commit()
logger.info("SessionMemory: started campaign session %s", session_id)
return session_id
def update_session(
self,
session_id: str,
*,
kingdom_name: str | None = None,
in_game_day: int | None = None,
fief_count: int | None = None,
meta: dict[str, Any] | None = None,
) -> None:
"""Update the campaign session state."""
now = datetime.now(UTC).isoformat()
with closing(self._conn()) as conn:
row = conn.execute(
"SELECT * FROM campaign_sessions WHERE session_id = ?",
(session_id,),
).fetchone()
if not row:
return
current_meta = json.loads(row["meta"] or "{}")
if meta:
current_meta.update(meta)
conn.execute(
"""UPDATE campaign_sessions SET
last_updated = ?,
kingdom_name = COALESCE(?, kingdom_name),
in_game_day = COALESCE(?, in_game_day),
fief_count = COALESCE(?, fief_count),
meta = ?
WHERE session_id = ?""",
(
now,
kingdom_name,
in_game_day,
fief_count,
json.dumps(current_meta),
session_id,
),
)
conn.commit()
def get_session(self, session_id: str) -> dict[str, Any] | None:
"""Return session metadata dict or None if not found."""
with closing(self._conn()) as conn:
row = conn.execute(
"SELECT * FROM campaign_sessions WHERE session_id = ?",
(session_id,),
).fetchone()
if not row:
return None
return dict(row)
def list_sessions(self) -> list[dict[str, Any]]:
"""Return all campaign sessions, most recent first."""
with closing(self._conn()) as conn:
rows = conn.execute(
"SELECT * FROM campaign_sessions ORDER BY last_updated DESC"
).fetchall()
return [dict(r) for r in rows]
# -- subgoal log -------------------------------------------------------
def log_subgoal(
self,
session_id: str,
tick: int,
in_game_day: int,
subgoal: KingSubgoal,
) -> int:
"""Record a subgoal emission. Returns the row id."""
with closing(self._conn()) as conn:
cursor = conn.execute(
"""INSERT INTO subgoal_log
(session_id, tick, in_game_day, token, target, quantity,
priority, deadline_days, context, issued_at, outcome)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending')""",
(
session_id,
tick,
in_game_day,
str(subgoal.token),
subgoal.target,
subgoal.quantity,
subgoal.priority,
subgoal.deadline_days,
subgoal.context,
subgoal.issued_at.isoformat(),
),
)
conn.commit()
return cursor.lastrowid or 0
def complete_subgoal(self, row_id: int, outcome: str = "success") -> None:
"""Mark a subgoal log entry as completed."""
with closing(self._conn()) as conn:
conn.execute(
"UPDATE subgoal_log SET completed_at = ?, outcome = ? WHERE id = ?",
(datetime.now(UTC).isoformat(), outcome, row_id),
)
conn.commit()
def get_recent_subgoals(
self, session_id: str, limit: int = 20
) -> list[dict[str, Any]]:
"""Return the *limit* most recent subgoal log entries."""
with closing(self._conn()) as conn:
rows = conn.execute(
"SELECT * FROM subgoal_log WHERE session_id = ? "
"ORDER BY tick DESC LIMIT ?",
(session_id, limit),
).fetchall()
return [dict(r) for r in rows]
def count_token(self, session_id: str, token: SubgoalToken) -> int:
"""Count how many times a subgoal token has been issued in a session."""
with closing(self._conn()) as conn:
row = conn.execute(
"SELECT COUNT(*) as n FROM subgoal_log "
"WHERE session_id = ? AND token = ?",
(session_id, str(token)),
).fetchone()
return row["n"] if row else 0
# -- strategy notes ----------------------------------------------------
def add_note(
self,
session_id: str,
in_game_day: int,
note_type: str,
content: str,
) -> None:
"""Record a strategy note or milestone."""
with closing(self._conn()) as conn:
conn.execute(
"INSERT INTO strategy_notes "
"(session_id, in_game_day, note_type, content, recorded_at) "
"VALUES (?, ?, ?, ?, ?)",
(
session_id,
in_game_day,
note_type,
content,
datetime.now(UTC).isoformat(),
),
)
conn.commit()
def get_notes(
self,
session_id: str,
note_type: str | None = None,
limit: int = 50,
) -> list[dict[str, Any]]:
"""Return strategy notes, optionally filtered by type."""
with closing(self._conn()) as conn:
if note_type:
rows = conn.execute(
"SELECT * FROM strategy_notes "
"WHERE session_id = ? AND note_type = ? "
"ORDER BY in_game_day DESC LIMIT ?",
(session_id, note_type, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM strategy_notes WHERE session_id = ? "
"ORDER BY in_game_day DESC LIMIT ?",
(session_id, limit),
).fetchall()
return [dict(r) for r in rows]
def get_milestones(self, session_id: str) -> list[dict[str, Any]]:
"""Return all milestone notes for a session."""
return self.get_notes(session_id, note_type="milestone", limit=200)
# -- diplomatic memory -------------------------------------------------
def record_war_declared(
self, session_id: str, in_game_day: int, faction: str
) -> None:
"""Log that Timmy declared war on *faction*."""
self.add_note(
session_id,
in_game_day,
"war_declared",
f"Declared war on {faction}",
)
def record_peace_agreed(
self, session_id: str, in_game_day: int, faction: str
) -> None:
"""Log that Timmy agreed to peace with *faction*."""
self.add_note(
session_id,
in_game_day,
"peace_agreed",
f"Peace agreed with {faction}",
)
def record_kingdom_established(
self, session_id: str, in_game_day: int, kingdom_name: str
) -> None:
"""Record the kingdom establishment milestone."""
self.add_note(
session_id,
in_game_day,
"milestone",
f"Kingdom established: {kingdom_name}",
)
self.update_session(session_id, kingdom_name=kingdom_name, in_game_day=in_game_day)
logger.info(
"SessionMemory: milestone — kingdom '%s' established on day %d",
kingdom_name,
in_game_day,
)

226
src/bannerlord/types.py Normal file
View File

@@ -0,0 +1,226 @@
"""Bannerlord M3 — core data types for the campaign strategy system.
KingSubgoal schema and all message types used by the feudal agent hierarchy.
Design follows the Feudal Multi-Agent Hierarchies (Ahilan & Dayan, 2019) model
specified in docs/research/bannerlord-feudal-hierarchy-design.md.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import StrEnum
from typing import Any, Literal
# ---------------------------------------------------------------------------
# Subgoal vocabulary
# ---------------------------------------------------------------------------
class SubgoalToken(StrEnum):
"""Fixed vocabulary of strategic intents the King can emit."""
EXPAND_TERRITORY = "EXPAND_TERRITORY"
RAID_ECONOMY = "RAID_ECONOMY"
FORTIFY = "FORTIFY"
RECRUIT = "RECRUIT"
TRADE = "TRADE"
ALLY = "ALLY"
SPY = "SPY"
HEAL = "HEAL"
CONSOLIDATE = "CONSOLIDATE"
TRAIN = "TRAIN"
@dataclass
class KingSubgoal:
"""A strategic directive issued by the King agent.
The King emits at most one subgoal per campaign tick. Vassals interpret
the token and prioritise actions accordingly.
Attributes:
token: Intent from the SubgoalToken vocabulary.
target: Named target (settlement, lord, or faction).
quantity: Scalar for RECRUIT / TRADE operations.
priority: 0.02.0 weighting that scales vassal reward.
deadline_days: Campaign-map days to complete (None = open-ended).
context: Free-text hint passed verbatim to vassals (not parsed).
issued_at: Timestamp when the King emitted this subgoal.
"""
token: SubgoalToken
target: str | None = None
quantity: int | None = None
priority: float = 1.0
deadline_days: int | None = None
context: str | None = None
issued_at: datetime = field(default_factory=lambda: datetime.now(UTC))
def to_dict(self) -> dict[str, Any]:
return {
"token": str(self.token),
"target": self.target,
"quantity": self.quantity,
"priority": self.priority,
"deadline_days": self.deadline_days,
"context": self.context,
"issued_at": self.issued_at.isoformat(),
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> KingSubgoal:
return cls(
token=SubgoalToken(data["token"]),
target=data.get("target"),
quantity=data.get("quantity"),
priority=data.get("priority", 1.0),
deadline_days=data.get("deadline_days"),
context=data.get("context"),
issued_at=datetime.fromisoformat(data["issued_at"])
if "issued_at" in data
else datetime.now(UTC),
)
# ---------------------------------------------------------------------------
# Game state snapshot
# ---------------------------------------------------------------------------
@dataclass
class FactionState:
"""Snapshot of a faction's status on the campaign map."""
name: str
leader: str
fiefs: list[str] = field(default_factory=list)
army_strength: int = 0
treasury: int = 0
is_at_war_with: list[str] = field(default_factory=list)
relations: dict[str, int] = field(default_factory=dict)
@dataclass
class PartyState:
"""Timmy's party snapshot."""
location: str = ""
troops: int = 0
food_days: int = 0
wounded_pct: float = 0.0
denars: int = 0
morale: float = 100.0
prisoners: int = 0
@dataclass
class KingdomState:
"""Timmy's kingdom snapshot (only populated after kingdom is established)."""
name: str = ""
fiefs: list[str] = field(default_factory=list)
daily_income: int = 0
daily_expenses: int = 0
vassal_lords: list[str] = field(default_factory=list)
active_wars: list[str] = field(default_factory=list)
active_alliances: list[str] = field(default_factory=list)
in_game_day: int = 0
@dataclass
class GameState:
"""Full campaign state snapshot delivered by GABS on each tick.
This is the primary input to the King agent's decision loop.
"""
tick: int = 0
in_game_day: int = 0
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
party: PartyState = field(default_factory=PartyState)
kingdom: KingdomState = field(default_factory=KingdomState)
factions: list[FactionState] = field(default_factory=list)
raw: dict[str, Any] = field(default_factory=dict)
def has_kingdom(self) -> bool:
return bool(self.kingdom.name)
def fief_count(self) -> int:
return len(self.kingdom.fiefs)
def active_war_count(self) -> int:
return len(self.kingdom.active_wars)
def is_two_front_war(self) -> bool:
"""Return True if Timmy is engaged in 2+ simultaneous wars."""
return self.active_war_count() >= 2
# ---------------------------------------------------------------------------
# Inter-agent message schemas
# ---------------------------------------------------------------------------
@dataclass
class SubgoalMessage:
"""King → Vassal directive."""
msg_type: Literal["subgoal"] = "subgoal"
from_agent: Literal["king"] = "king"
to_agent: str = ""
subgoal: KingSubgoal | None = None
issued_at: datetime = field(default_factory=lambda: datetime.now(UTC))
@dataclass
class TaskMessage:
"""Vassal → Companion work order."""
msg_type: Literal["task"] = "task"
from_agent: str = ""
to_agent: str = ""
primitive: str = ""
args: dict[str, Any] = field(default_factory=dict)
priority: float = 1.0
issued_at: datetime = field(default_factory=lambda: datetime.now(UTC))
@dataclass
class ResultMessage:
"""Companion/Vassal → Parent outcome report."""
msg_type: Literal["result"] = "result"
from_agent: str = ""
to_agent: str = ""
success: bool = True
outcome: dict[str, Any] = field(default_factory=dict)
reward_delta: float = 0.0
completed_at: datetime = field(default_factory=lambda: datetime.now(UTC))
@dataclass
class StateUpdateMessage:
"""GABS → All agents broadcast."""
msg_type: Literal["state"] = "state"
game_state: GameState = field(default_factory=GameState)
tick: int = 0
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
# ---------------------------------------------------------------------------
# Reward signals
# ---------------------------------------------------------------------------
@dataclass
class VassalReward:
"""Computed reward signal for a vassal agent after one decision cycle."""
agent_id: str
component_scores: dict[str, float] = field(default_factory=dict)
subgoal_bonus: float = 0.0
total: float = 0.0
computed_at: datetime = field(default_factory=lambda: datetime.now(UTC))

View File

@@ -147,6 +147,15 @@ class Settings(BaseSettings):
l402_macaroon_secret: str = ""
lightning_backend: Literal["mock", "lnd"] = "mock"
# ── Bannerlord / GABS ────────────────────────────────────────────────
# TCP JSON-RPC connection to the Bannerlord.GABS mod running on the
# Windows VM. Override with GABS_HOST / GABS_PORT env vars.
gabs_host: str = "127.0.0.1"
gabs_port: int = 4825
gabs_timeout: float = 10.0 # seconds per GABS call
bannerlord_tick_interval: float = 1.0 # real-time seconds between campaign ticks
bannerlord_db_path: str = "data/bannerlord/campaign.db"
# ── Privacy / Sovereignty ────────────────────────────────────────────
# Disable Agno telemetry for air-gapped/sovereign deployments.
# Default is False (telemetry disabled) to align with sovereign AI vision.
@@ -374,17 +383,6 @@ class Settings(BaseSettings):
error_feedback_enabled: bool = True # Auto-create bug report tasks
error_dedup_window_seconds: int = 300 # 5-min dedup window
# ── Bannerlord / GABS ─────────────────────────────────────────────
# GABS (Bannerlord Agent Bridge System) TCP/JSON-RPC server.
# Runs inside the Windows VM hosting Bannerlord.
# Override with GABS_HOST / GABS_PORT env vars.
gabs_host: str = "localhost"
gabs_port: int = 4825
# Decision model for the Bannerlord campaign agent (Qwen3 preferred).
bannerlord_model: str = "qwen3:14b"
# Campaign-tick interval in seconds (real-time pause between in-game days).
bannerlord_tick_seconds: float = 5.0
# ── Scripture / Biblical Integration ──────────────────────────────
# Enable the biblical text module.
scripture_enabled: bool = True

View File

@@ -12,6 +12,11 @@ Quick start::
register_adapter("mock", MockWorldAdapter)
world = get_adapter("mock")
perception = world.observe()
Registered adapters:
"mock" — in-memory stub for testing
"tes3mp" — Morrowind multiplayer (stub, pending PR #864)
"bannerlord" — Bannerlord via GABS mod (M3 campaign strategy)
"""
from infrastructure.world.registry import AdapterRegistry
@@ -22,6 +27,27 @@ register_adapter = _registry.register
get_adapter = _registry.get
list_adapters = _registry.list_adapters
# -- Built-in adapter registration -----------------------------------------
# Adapters are registered lazily to avoid import errors when their
# optional dependencies (e.g., GABS TCP connection) are unavailable.
def _register_builtin_adapters() -> None:
from infrastructure.world.adapters.mock import MockWorldAdapter
from infrastructure.world.adapters.tes3mp import TES3MPWorldAdapter
_registry.register("mock", MockWorldAdapter)
_registry.register("tes3mp", TES3MPWorldAdapter)
try:
from bannerlord.adapter import BannerlordWorldAdapter
_registry.register("bannerlord", BannerlordWorldAdapter)
except Exception:
# bannerlord package not installed or import error — skip silently
pass
_register_builtin_adapters()
__all__ = [
"register_adapter",
"get_adapter",

View File

@@ -1,234 +0,0 @@
"""Bannerlord world adapter — bridges GABS to the WorldInterface contract.
Allows the existing ``Heartbeat`` loop to drive the Bannerlord campaign
by treating it as just another game world. Wraps the async ``GabsClient``
for synchronous use (the ``Heartbeat`` calls ``observe()`` and ``act()``
synchronously).
Async callers should use ``CampaignLoop`` directly — it is more efficient
and handles the full M2 logic natively.
Usage::
adapter = BannerlordWorldAdapter()
adapter.connect()
heartbeat = Heartbeat(world=adapter, interval=5.0)
await heartbeat.run_once()
adapter.disconnect()
"""
from __future__ import annotations
import asyncio
import logging
from infrastructure.world.interface import WorldInterface
from infrastructure.world.types import (
ActionResult,
ActionStatus,
CommandInput,
PerceptionOutput,
)
logger = logging.getLogger(__name__)
class BannerlordWorldAdapter(WorldInterface):
"""WorldInterface adapter for Bannerlord via GABS.
Wraps ``GabsClient`` and ``CampaignState`` to present the Bannerlord
campaign map as a ``WorldInterface``-compatible world.
Parameters
----------
host:
Override GABS server host (defaults to ``settings.gabs_host``).
port:
Override GABS server port (defaults to ``settings.gabs_port``).
"""
def __init__(
self,
*,
host: str | None = None,
port: int | None = None,
) -> None:
from config import settings
self._host = host or settings.gabs_host
self._port = port or settings.gabs_port
self._connected = False
self._client = None
self._loop: asyncio.AbstractEventLoop | None = None
# -- lifecycle ---------------------------------------------------------
def connect(self) -> None:
"""Open the GABS TCP connection (synchronous wrapper)."""
from bannerlord.gabs_client import GabsClient
self._client = GabsClient(host=self._host, port=self._port)
try:
self._loop = asyncio.get_event_loop()
except RuntimeError:
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
try:
self._loop.run_until_complete(self._client.connect())
self._connected = True
logger.info("BannerlordWorldAdapter connected to GABS")
except Exception as exc: # noqa: BLE001
logger.warning("BannerlordWorldAdapter: GABS connect failed: %s", exc)
self._connected = False
def disconnect(self) -> None:
"""Close the GABS TCP connection (synchronous wrapper)."""
if self._client is not None and self._loop is not None:
try:
self._loop.run_until_complete(self._client.disconnect())
except Exception as exc: # noqa: BLE001
logger.debug("BannerlordWorldAdapter disconnect error: %s", exc)
self._connected = False
@property
def is_connected(self) -> bool:
return self._connected
# -- core contract -----------------------------------------------------
def observe(self) -> PerceptionOutput:
"""Poll GABS for current game state and return structured perception."""
from bannerlord.campaign_state import parse_campaign_state
if not self._connected or self._client is None or self._loop is None:
return PerceptionOutput(
location="disconnected",
entities=[],
events=["gabs_disconnected"],
raw={"error": "GABS not connected"},
)
try:
raw = self._loop.run_until_complete(self._client.get_game_state())
state = parse_campaign_state(raw)
# Build entities list from settlements and nearby parties
entities: list[str] = []
for s in state.settlements[:5]:
entities.append(f"settlement:{s.name}")
for p in state.nearby_parties[:3]:
prefix = "hostile" if p.is_hostile else "friendly"
entities.append(f"{prefix}_party:{p.name}")
# Build events list
events: list[str] = []
if state.party.food_days < 2.0:
events.append("low_food")
if state.party.morale < 40:
events.append("low_morale")
if state.hostile_bandits_nearby():
events.append("bandits_nearby")
if state.m2_complete:
events.append("m2_complete")
location = state.party.current_settlement or "campaign_map"
return PerceptionOutput(
location=location,
entities=entities,
events=events,
raw=raw,
)
except Exception as exc: # noqa: BLE001
logger.warning("BannerlordWorldAdapter.observe() failed: %s", exc)
return PerceptionOutput(
location="unknown",
entities=[],
events=[f"observe_error:{exc}"],
raw={"error": str(exc)},
)
def act(self, command: CommandInput) -> ActionResult:
"""Dispatch a campaign command to GABS.
Recognized ``command.action`` values:
- ``"move"`` → party/move_to_settlement (target = settlement_id)
- ``"trade"`` → inventory/buy_item (target = item_id)
- ``"recruit"`` → party/recruit_all
- ``"engage"`` → party/engage_party (target = party_id)
Parameters
----------
command:
WorldInterface ``CommandInput`` with action, target, parameters.
"""
if not self._connected or self._client is None or self._loop is None:
return ActionResult(
status=ActionStatus.FAILURE,
message="GABS not connected",
)
try:
return self._loop.run_until_complete(self._async_act(command))
except Exception as exc: # noqa: BLE001
logger.warning("BannerlordWorldAdapter.act() failed: %s", exc)
return ActionResult(
status=ActionStatus.FAILURE,
message=f"act failed: {exc}",
)
async def _async_act(self, command: CommandInput) -> ActionResult:
"""Async implementation of act()."""
from bannerlord.campaign_actions import (
buy_item,
engage_party,
move_to_settlement,
recruit_all,
)
action = command.action.lower()
params = command.parameters
if action == "move":
settlement_id = command.target or params.get("settlement_id", "")
return await move_to_settlement(
self._client,
settlement_id,
settlement_name=params.get("settlement_name", ""),
)
elif action == "trade":
item_id = command.target or params.get("item_id", "")
quantity = int(params.get("quantity", 1))
return await buy_item(
self._client,
item_id,
quantity,
settlement_id=params.get("settlement_id", ""),
)
elif action == "recruit":
return await recruit_all(
self._client,
settlement_id=params.get("settlement_id", ""),
)
elif action == "engage":
party_id = command.target or params.get("party_id", "")
return await engage_party(
self._client,
party_id,
party_name=params.get("party_name", ""),
)
else:
return ActionResult(
status=ActionStatus.NOOP,
message=f"Unknown action: {command.action}",
)
def speak(self, message: str, target: str | None = None) -> None:
"""Log the message — GABS has no chat mechanism in M2."""
logger.info("BannerlordWorldAdapter.speak: %r (target=%r)", message, target)

View File

@@ -0,0 +1,146 @@
"""Tests for the CampaignOrchestrator — mocked GABS client."""
from __future__ import annotations
import asyncio
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from bannerlord.campaign import CampaignOrchestrator
from bannerlord.types import (
FactionState,
GameState,
KingdomState,
PartyState,
)
def _make_state(
*,
in_game_day: int = 50,
troops: int = 150,
denars: int = 10_000,
food_days: int = 10,
kingdom_name: str = "House Timmerson",
fiefs: list | None = None,
active_wars: list | None = None,
) -> GameState:
return GameState(
in_game_day=in_game_day,
party=PartyState(
troops=troops,
denars=denars,
food_days=food_days,
location="Epicrotea",
),
kingdom=KingdomState(
name=kingdom_name,
fiefs=fiefs or ["Epicrotea"],
active_wars=active_wars or [],
daily_income=500,
daily_expenses=300,
),
factions=[
FactionState(
name="Vlandia",
leader="Derthert",
fiefs=["Pravend"],
army_strength=200,
)
],
)
@pytest.fixture
def tmp_db(tmp_path):
return tmp_path / "test_campaign.db"
@pytest.fixture
def orch(tmp_db):
return CampaignOrchestrator(
db_path=tmp_db,
tick_interval=0.0,
)
class TestCampaignOrchestratorLifecycle:
@pytest.mark.asyncio
async def test_start_creates_session(self, orch):
with patch.object(orch._gabs, "connect", return_value=False):
sid = await orch.start()
assert sid is not None
assert orch.session_id == sid
@pytest.mark.asyncio
async def test_start_resumes_existing_session(self, orch):
existing_sid = orch._memory.start_session("existing_run")
orch._session_id = existing_sid
with patch.object(orch._gabs, "connect", return_value=False):
sid = await orch.start()
assert sid == existing_sid
@pytest.mark.asyncio
async def test_stop_disconnects_gabs(self, orch):
disconnect_mock = AsyncMock()
orch._gabs.disconnect = disconnect_mock
await orch.stop()
disconnect_mock.assert_awaited_once()
class TestCampaignTick:
@pytest.mark.asyncio
async def test_single_tick_logs_subgoal(self, orch, tmp_db):
state = _make_state()
orch._gabs.get_game_state = AsyncMock(return_value=state)
orch._gabs._call = AsyncMock(return_value=None)
orch._session_id = orch._memory.start_session()
await orch._tick(1)
entries = orch.memory.get_recent_subgoals(orch.session_id, limit=5)
assert len(entries) == 1
@pytest.mark.asyncio
async def test_run_stops_at_max_ticks(self, orch):
state = _make_state()
orch._gabs.connect = AsyncMock(return_value=False)
orch._gabs.get_game_state = AsyncMock(return_value=state)
orch._gabs._call = AsyncMock(return_value=None)
orch._gabs.disconnect = AsyncMock()
summary = await orch.run(max_ticks=3)
assert summary["ticks_run"] == 3
@pytest.mark.asyncio
async def test_run_detects_done_condition(self, orch):
"""Campaign stops early when M3 done condition is met."""
state = _make_state(
in_game_day=110,
fiefs=["A", "B", "C"],
)
orch._gabs.connect = AsyncMock(return_value=False)
orch._gabs.get_game_state = AsyncMock(return_value=state)
orch._gabs._call = AsyncMock(return_value=None)
orch._gabs.disconnect = AsyncMock()
summary = await orch.run(max_ticks=100)
# Should stop at tick 1 because done condition is met immediately
assert summary["ticks_run"] <= 2
assert summary["survival_goal_met"]
@pytest.mark.asyncio
async def test_summary_shape(self, orch):
state = _make_state(in_game_day=110, fiefs=["A", "B", "C"])
orch._gabs.connect = AsyncMock(return_value=False)
orch._gabs.get_game_state = AsyncMock(return_value=state)
orch._gabs._call = AsyncMock(return_value=None)
orch._gabs.disconnect = AsyncMock()
summary = await orch.run(max_ticks=1)
assert "ticks_run" in summary
assert "session_id" in summary
assert "has_kingdom" in summary
assert "fief_count" in summary

View File

@@ -1,102 +0,0 @@
"""Unit tests for bannerlord.campaign_actions."""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock
import pytest
from bannerlord.campaign_actions import (
GabsTool,
buy_item,
engage_party,
move_to_settlement,
recruit_all,
)
from infrastructure.world.types import ActionStatus
def _mock_client(return_value=None, raise_exc=None):
"""Build a mock GabsClient."""
client = MagicMock()
if raise_exc is not None:
client.call = AsyncMock(side_effect=raise_exc)
else:
client.call = AsyncMock(return_value=return_value)
return client
class TestMoveToSettlement:
async def test_success(self):
client = _mock_client({"eta_days": 2})
result = await move_to_settlement(client, "town_A1", settlement_name="Marunath")
assert result.status == ActionStatus.SUCCESS
client.call.assert_called_once_with(
GabsTool.MOVE_TO_SETTLEMENT, {"settlement_id": "town_A1"}
)
async def test_failure_on_gabs_error(self):
client = _mock_client(raise_exc=RuntimeError("GABS timeout"))
result = await move_to_settlement(client, "town_A1")
assert result.status == ActionStatus.FAILURE
assert "GABS timeout" in result.message
async def test_uses_settlement_id_as_label_when_no_name(self):
client = _mock_client({})
result = await move_to_settlement(client, "town_B2")
assert result.status == ActionStatus.SUCCESS
assert "town_B2" in result.message
class TestBuyItem:
async def test_success(self):
client = _mock_client({"cost": 100})
result = await buy_item(client, "grain", 5)
assert result.status == ActionStatus.SUCCESS
assert "grain" in result.message
client.call.assert_called_once_with(
GabsTool.BUY_ITEM, {"item_id": "grain", "quantity": 5}
)
async def test_includes_settlement_id_when_given(self):
client = _mock_client({})
await buy_item(client, "iron", 2, settlement_id="town_A1")
call_params = client.call.call_args[0][1]
assert call_params["settlement_id"] == "town_A1"
async def test_failure_logged_gracefully(self):
client = _mock_client(raise_exc=Exception("inventory full"))
result = await buy_item(client, "wool", 10)
assert result.status == ActionStatus.FAILURE
class TestRecruitAll:
async def test_success(self):
client = _mock_client({"recruited": 15})
result = await recruit_all(client)
assert result.status == ActionStatus.SUCCESS
assert "15" in result.message
async def test_success_with_settlement(self):
client = _mock_client({"recruited": 8})
result = await recruit_all(client, settlement_id="town_A1")
call_params = client.call.call_args[0][1]
assert call_params["settlement_id"] == "town_A1"
async def test_failure_graceful(self):
client = _mock_client(raise_exc=RuntimeError("no recruits"))
result = await recruit_all(client)
assert result.status == ActionStatus.FAILURE
class TestEngageParty:
async def test_success(self):
client = _mock_client({"outcome": "victory", "loot": 200})
result = await engage_party(client, "bandit_1", party_name="Forest Bandits")
assert result.status == ActionStatus.SUCCESS
assert "victory" in result.message
async def test_failure_graceful(self):
client = _mock_client(raise_exc=RuntimeError("party not found"))
result = await engage_party(client, "bandit_1")
assert result.status == ActionStatus.FAILURE

View File

@@ -1,200 +0,0 @@
"""Unit tests for bannerlord.campaign_loop."""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from bannerlord.campaign_loop import CampaignLoop, TickResult
from bannerlord.decision import CampaignDecision, M2Action
from infrastructure.world.types import ActionResult, ActionStatus
def _make_game_state(*, troops: int = 30, gold: int = 2000) -> dict:
return {
"tick": 0,
"party": {
"size": troops,
"wounded": 0,
"food_days": 5.0,
"morale": 80.0,
"current_settlement": "town_A1",
},
"economy": {"gold": gold, "daily_income": 200, "daily_expenses": 150},
"nearby_parties": [],
"settlements": [
{
"id": "town_A1",
"name": "Marunath",
"faction": "aserai",
"is_friendly": True,
"distance": 0.0,
"has_recruits": True,
"has_trade_goods": False,
}
],
}
class TestCampaignLoopDispatch:
"""Tests for the internal _dispatch() routing."""
def _loop(self) -> CampaignLoop:
return CampaignLoop(tick_seconds=0.0, max_ticks=1)
async def test_dispatch_move(self):
loop = self._loop()
client = MagicMock()
decision = CampaignDecision(
action=M2Action.MOVE,
settlement_id="town_A1",
settlement_name="Marunath",
)
with patch("bannerlord.campaign_loop.move_to_settlement", new_callable=AsyncMock) as mock_move:
mock_move.return_value = ActionResult(status=ActionStatus.SUCCESS, message="ok")
await loop._dispatch(decision, client)
mock_move.assert_called_once_with(client, "town_A1", settlement_name="Marunath")
async def test_dispatch_recruit(self):
loop = self._loop()
client = MagicMock()
decision = CampaignDecision(
action=M2Action.RECRUIT,
settlement_id="town_A1",
)
with patch("bannerlord.campaign_loop.recruit_all", new_callable=AsyncMock) as mock_recruit:
mock_recruit.return_value = ActionResult(status=ActionStatus.SUCCESS, message="15 recruited")
await loop._dispatch(decision, client)
mock_recruit.assert_called_once()
async def test_dispatch_engage(self):
loop = self._loop()
client = MagicMock()
decision = CampaignDecision(
action=M2Action.ENGAGE,
party_id="bandit_1",
party_name="Forest Bandits",
)
with patch("bannerlord.campaign_loop.engage_party", new_callable=AsyncMock) as mock_engage:
mock_engage.return_value = ActionResult(status=ActionStatus.SUCCESS, message="victory")
await loop._dispatch(decision, client)
mock_engage.assert_called_once_with(client, "bandit_1", party_name="Forest Bandits")
async def test_dispatch_trade(self):
loop = self._loop()
client = MagicMock()
decision = CampaignDecision(
action=M2Action.TRADE,
item_id="grain",
quantity=5,
)
with patch("bannerlord.campaign_loop.buy_item", new_callable=AsyncMock) as mock_buy:
mock_buy.return_value = ActionResult(status=ActionStatus.SUCCESS, message="bought")
await loop._dispatch(decision, client)
mock_buy.assert_called_once_with(client, "grain", 5, settlement_id="")
async def test_dispatch_wait_returns_noop(self):
loop = self._loop()
client = MagicMock()
decision = CampaignDecision(action=M2Action.WAIT, reasoning="low food")
result = await loop._dispatch(decision, client)
assert result.status == ActionStatus.NOOP
async def test_dispatch_move_missing_settlement_id(self):
loop = self._loop()
client = MagicMock()
decision = CampaignDecision(action=M2Action.MOVE, settlement_id="")
result = await loop._dispatch(decision, client)
assert result.status == ActionStatus.FAILURE
async def test_dispatch_engage_missing_party_id(self):
loop = self._loop()
client = MagicMock()
decision = CampaignDecision(action=M2Action.ENGAGE, party_id="")
result = await loop._dispatch(decision, client)
assert result.status == ActionStatus.FAILURE
class TestCampaignLoopRun:
"""Integration-level tests for the full run() loop (mocked GABS)."""
async def test_run_stops_at_max_ticks(self):
"""Loop respects max_ticks and returns correct number of results."""
game_state = _make_game_state()
with (
patch("bannerlord.campaign_loop.GabsClient") as MockClient,
patch("bannerlord.campaign_loop.decide", new_callable=AsyncMock) as mock_decide,
patch("bannerlord.campaign_loop.move_to_settlement", new_callable=AsyncMock) as mock_move,
):
# Setup fake client
fake_client = AsyncMock()
fake_client.get_game_state = AsyncMock(return_value=game_state)
fake_client.connect = AsyncMock()
fake_client.disconnect = AsyncMock()
MockClient.return_value = fake_client
mock_decide.return_value = CampaignDecision(
action=M2Action.MOVE,
settlement_id="town_B1",
settlement_name="Epicrotea",
reasoning="moving",
)
mock_move.return_value = ActionResult(status=ActionStatus.SUCCESS, message="ok")
loop = CampaignLoop(tick_seconds=0.0, max_ticks=3)
results = await loop.run()
assert len(results) == 3
assert all(isinstance(r, TickResult) for r in results)
async def test_run_stops_when_m2_complete(self):
"""Loop exits early when M2 conditions are met."""
# State with M2 already complete
game_state = _make_game_state(troops=100, gold=10000)
with (
patch("bannerlord.campaign_loop.GabsClient") as MockClient,
patch("bannerlord.campaign_loop.decide", new_callable=AsyncMock) as mock_decide,
):
fake_client = AsyncMock()
fake_client.get_game_state = AsyncMock(return_value=game_state)
fake_client.connect = AsyncMock()
fake_client.disconnect = AsyncMock()
MockClient.return_value = fake_client
mock_decide.return_value = CampaignDecision(
action=M2Action.WAIT,
reasoning="done",
)
loop = CampaignLoop(tick_seconds=0.0, max_ticks=10)
results = await loop.run()
# Should exit after first tick (m2_complete = True)
assert len(results) == 1
assert results[0].m2_complete is True
async def test_run_aborts_on_connect_failure(self):
"""Loop returns empty history if GABS cannot be reached."""
with patch("bannerlord.campaign_loop.GabsClient") as MockClient:
fake_client = AsyncMock()
fake_client.connect = AsyncMock(side_effect=OSError("refused"))
fake_client.disconnect = AsyncMock()
MockClient.return_value = fake_client
loop = CampaignLoop(tick_seconds=0.0, max_ticks=5)
results = await loop.run()
assert results == []
def test_stop_sets_running_false(self):
loop = CampaignLoop()
loop._running = True
loop.stop()
assert not loop.is_running

View File

@@ -1,150 +0,0 @@
"""Unit tests for bannerlord.campaign_state."""
from __future__ import annotations
import pytest
from bannerlord.campaign_state import (
M2_GOLD_GOAL,
M2_TROOP_GOAL,
CampaignState,
NearbyParty,
Settlement,
parse_campaign_state,
)
class TestParseCampaignState:
def test_empty_dict_returns_defaults(self):
state = parse_campaign_state({})
assert state.party.party_size == 0
assert state.economy.gold == 0
assert state.nearby_parties == []
assert state.settlements == []
def test_full_payload_parsed(self):
raw = {
"tick": 5,
"party": {
"size": 30,
"wounded": 2,
"prisoners": 1,
"food_days": 3.5,
"morale": 75.0,
"current_settlement": "town_A1",
"speed": 5.2,
},
"economy": {
"gold": 4500,
"daily_income": 200,
"daily_expenses": 150,
},
"nearby_parties": [
{
"id": "bandit_1",
"name": "Forest Bandits",
"faction": "bandit",
"is_hostile": True,
"troop_count": 10,
"distance": 3.0,
}
],
"settlements": [
{
"id": "town_A1",
"name": "Marunath",
"faction": "aserai",
"is_friendly": True,
"distance": 0.0,
"has_recruits": True,
"has_trade_goods": False,
}
],
}
state = parse_campaign_state(raw)
assert state.tick == 5
assert state.party.party_size == 30
assert state.party.wounded == 2
assert state.economy.gold == 4500
assert state.economy.net_income == 50
assert len(state.nearby_parties) == 1
assert state.nearby_parties[0].name == "Forest Bandits"
assert len(state.settlements) == 1
assert state.settlements[0].name == "Marunath"
def test_malformed_entries_skipped(self):
raw = {
"nearby_parties": [{"id": "ok", "name": "Good", "faction": "bandit",
"is_hostile": True, "troop_count": 5, "distance": 2.0},
{"bad": "data"}],
"settlements": [None, "not_a_dict"],
}
state = parse_campaign_state(raw)
assert len(state.nearby_parties) == 1
assert state.settlements == []
class TestCampaignStateProperties:
def _make_state(self, *, troops: int, gold: int) -> CampaignState:
state = CampaignState()
state.party.party_size = troops
state.economy.gold = gold
return state
def test_m2_not_complete_by_default(self):
state = self._make_state(troops=20, gold=0)
assert not state.m2_complete
def test_m2_complete_when_both_goals_met(self):
state = self._make_state(troops=M2_TROOP_GOAL, gold=M2_GOLD_GOAL)
assert state.m2_complete
def test_m2_not_complete_if_only_troops_met(self):
state = self._make_state(troops=M2_TROOP_GOAL, gold=M2_GOLD_GOAL - 1)
assert not state.m2_complete
def test_m2_not_complete_if_only_gold_met(self):
state = self._make_state(troops=M2_TROOP_GOAL - 1, gold=M2_GOLD_GOAL)
assert not state.m2_complete
def test_troops_progress_string(self):
state = self._make_state(troops=45, gold=0)
assert state.troops_progress == f"45/{M2_TROOP_GOAL}"
def test_gold_progress_string(self):
state = self._make_state(troops=0, gold=3000)
assert "3,000" in state.gold_progress
def test_hostile_bandits_nearby_filter(self):
state = CampaignState()
state.nearby_parties = [
NearbyParty("b1", "Bandits", "bandit", True, 10, 2.0),
NearbyParty("l1", "Lord", "empire", False, 50, 1.0),
NearbyParty("b2", "Far Bandits", "bandit", True, 5, 10.0),
]
nearby = state.hostile_bandits_nearby(max_distance=5.0)
assert len(nearby) == 1
assert nearby[0].party_id == "b1"
def test_nearest_settlement_returns_closest(self):
state = CampaignState()
state.settlements = [
Settlement("s1", "Far Town", "empire", True, 10.0),
Settlement("s2", "Near Town", "empire", True, 2.0),
]
nearest = state.nearest_settlement()
assert nearest.settlement_id == "s2"
def test_nearest_recruit_settlement(self):
state = CampaignState()
state.settlements = [
Settlement("s1", "Town A", "empire", True, 5.0, has_recruits=False),
Settlement("s2", "Town B", "empire", True, 8.0, has_recruits=True),
]
recruit = state.nearest_recruit_settlement()
assert recruit.settlement_id == "s2"
def test_nearest_settlement_none_when_empty(self):
state = CampaignState()
assert state.nearest_settlement() is None

View File

@@ -0,0 +1,161 @@
"""Tests for Bannerlord companion worker agents."""
from bannerlord.agents.companions.caravan import CaravanCompanion
from bannerlord.agents.companions.logistics import LogisticsCompanion
from bannerlord.agents.companions.scout import ScoutCompanion
from bannerlord.types import (
FactionState,
GameState,
KingSubgoal,
KingdomState,
PartyState,
SubgoalToken,
)
def _state(
*,
troops: int = 150,
denars: int = 10_000,
food_days: int = 10,
wounded_pct: float = 0.0,
prisoners: int = 0,
location: str = "Epicrotea",
active_wars: list | None = None,
factions: list | None = None,
) -> GameState:
return GameState(
party=PartyState(
troops=troops,
denars=denars,
food_days=food_days,
wounded_pct=wounded_pct,
prisoners=prisoners,
location=location,
),
kingdom=KingdomState(
name="House Timmerson",
active_wars=active_wars or [],
),
factions=factions or [],
)
class TestLogisticsCompanion:
def setup_method(self):
self.companion = LogisticsCompanion()
def test_recruits_on_recruit_subgoal(self):
state = _state()
sg = KingSubgoal(token=SubgoalToken.RECRUIT, quantity=30)
actions = self.companion.evaluate(state, sg)
assert any(a["primitive"] == "recruit_troop" for a in actions)
def test_rests_on_heal_subgoal(self):
state = _state()
sg = KingSubgoal(token=SubgoalToken.HEAL)
actions = self.companion.evaluate(state, sg)
assert any(a["primitive"] == "rest_party" for a in actions)
def test_rests_when_heavily_wounded(self):
state = _state(wounded_pct=0.25)
sg = KingSubgoal(token=SubgoalToken.CONSOLIDATE)
actions = self.companion.evaluate(state, sg)
assert any(a["primitive"] == "rest_party" for a in actions)
def test_buys_food_when_low(self):
state = _state(food_days=2)
sg = KingSubgoal(token=SubgoalToken.CONSOLIDATE)
actions = self.companion.evaluate(state, sg)
assert any(a["primitive"] == "buy_supplies" for a in actions)
def test_no_food_purchase_when_stocked(self):
state = _state(food_days=10)
sg = KingSubgoal(token=SubgoalToken.CONSOLIDATE)
actions = self.companion.evaluate(state, sg)
assert not any(a["primitive"] == "buy_supplies" for a in actions)
def test_sells_prisoners_at_cap(self):
state = _state(prisoners=20)
sg = KingSubgoal(token=SubgoalToken.CONSOLIDATE)
actions = self.companion.evaluate(state, sg)
assert any(a["primitive"] == "sell_prisoners" for a in actions)
def test_upgrades_troops_on_train(self):
state = _state()
sg = KingSubgoal(token=SubgoalToken.TRAIN)
actions = self.companion.evaluate(state, sg)
assert any(a["primitive"] == "upgrade_troops" for a in actions)
class TestCaravanCompanion:
def setup_method(self):
self.companion = CaravanCompanion()
def test_no_actions_when_not_trade_subgoal(self):
state = _state()
sg = KingSubgoal(token=SubgoalToken.RAID_ECONOMY)
actions = self.companion.evaluate(state, sg)
assert actions == []
def test_assesses_prices_on_trade(self):
state = _state()
sg = KingSubgoal(token=SubgoalToken.TRADE)
actions = self.companion.evaluate(state, sg)
assert any(a["primitive"] == "assess_prices" for a in actions)
def test_deploys_caravan_when_flush(self):
state = _state(denars=15_000)
sg = KingSubgoal(token=SubgoalToken.TRADE)
actions = self.companion.evaluate(state, sg)
assert any(a["primitive"] == "establish_caravan" for a in actions)
def test_no_caravan_when_broke(self):
state = _state(denars=5_000)
sg = KingSubgoal(token=SubgoalToken.TRADE)
actions = self.companion.evaluate(state, sg)
assert not any(a["primitive"] == "establish_caravan" for a in actions)
def test_profitable_trade_threshold(self):
assert CaravanCompanion.is_profitable_trade(100, 116) # 16% margin = ok
assert not CaravanCompanion.is_profitable_trade(100, 114) # 14% = below threshold
assert not CaravanCompanion.is_profitable_trade(0, 100) # zero buy price = no
class TestScoutCompanion:
def setup_method(self):
self.companion = ScoutCompanion()
def test_tracks_lord_on_spy_subgoal(self):
state = _state()
sg = KingSubgoal(token=SubgoalToken.SPY, target="Derthert")
actions = self.companion.evaluate(state, sg)
assert any(a["primitive"] == "track_lord" for a in actions)
def test_assesses_garrison_on_expand(self):
state = _state()
sg = KingSubgoal(token=SubgoalToken.EXPAND_TERRITORY, target="Pravend")
actions = self.companion.evaluate(state, sg)
assert any(a["primitive"] == "assess_garrison" for a in actions)
def test_maps_patrols_in_war_regions(self):
state = _state(
active_wars=["Vlandia"],
factions=[
FactionState(
name="Vlandia",
leader="Derthert",
fiefs=["Pravend"],
army_strength=300,
)
],
)
sg = KingSubgoal(token=SubgoalToken.CONSOLIDATE)
actions = self.companion.evaluate(state, sg)
assert any(a["primitive"] == "map_patrol_routes" for a in actions)
def test_no_patrol_map_when_no_wars(self):
state = _state(active_wars=[])
sg = KingSubgoal(token=SubgoalToken.CONSOLIDATE)
actions = self.companion.evaluate(state, sg)
assert not any(a["primitive"] == "map_patrol_routes" for a in actions)

View File

@@ -1,154 +0,0 @@
"""Unit tests for bannerlord.decision."""
from __future__ import annotations
import json
import pytest
from bannerlord.campaign_state import (
CampaignState,
EconomyState,
NearbyParty,
PartyState,
Settlement,
)
from bannerlord.decision import (
M2Action,
CampaignDecision,
build_decision_prompt,
parse_decision,
)
def _make_state(
*,
troops: int = 30,
gold: int = 2000,
food_days: float = 5.0,
morale: float = 80.0,
settlements: list | None = None,
nearby_parties: list | None = None,
) -> CampaignState:
state = CampaignState()
state.party = PartyState(
party_size=troops,
food_days=food_days,
morale=morale,
)
state.economy = EconomyState(gold=gold, daily_income=200, daily_expenses=150)
state.settlements = settlements or []
state.nearby_parties = nearby_parties or []
return state
class TestBuildDecisionPrompt:
def test_returns_two_messages(self):
state = _make_state()
messages = build_decision_prompt(state)
assert len(messages) == 2
assert messages[0]["role"] == "system"
assert messages[1]["role"] == "user"
def test_user_message_includes_party_info(self):
state = _make_state(troops=45, gold=3000)
messages = build_decision_prompt(state)
user_content = messages[1]["content"]
assert "45" in user_content
assert "3,000" in user_content
def test_bandits_appear_in_prompt_when_nearby(self):
state = _make_state(
nearby_parties=[NearbyParty("b1", "Forest Bandits", "bandit", True, 10, 2.0)]
)
messages = build_decision_prompt(state)
user_content = messages[1]["content"]
assert "Forest Bandits" in user_content
def test_settlements_appear_in_prompt(self):
state = _make_state(
settlements=[Settlement("s1", "Marunath", "aserai", True, 3.0, has_recruits=True)]
)
messages = build_decision_prompt(state)
user_content = messages[1]["content"]
assert "Marunath" in user_content
def test_system_prompt_contains_action_vocabulary(self):
state = _make_state()
messages = build_decision_prompt(state)
system = messages[0]["content"]
for action in ("MOVE", "TRADE", "RECRUIT", "ENGAGE", "WAIT"):
assert action in system
class TestParseDecision:
def test_valid_move_decision(self):
raw = json.dumps({
"action": "MOVE",
"settlement_id": "town_A1",
"settlement_name": "Marunath",
"item_id": "",
"quantity": 1,
"party_id": "",
"party_name": "",
"reasoning": "Moving to recruit troops",
})
decision = parse_decision(raw)
assert decision.action == M2Action.MOVE
assert decision.settlement_id == "town_A1"
assert decision.settlement_name == "Marunath"
def test_valid_recruit_decision(self):
raw = json.dumps({
"action": "RECRUIT",
"settlement_id": "town_A1",
"settlement_name": "Marunath",
"item_id": "",
"quantity": 1,
"party_id": "",
"party_name": "",
"reasoning": "Has recruits available",
})
decision = parse_decision(raw)
assert decision.action == M2Action.RECRUIT
def test_valid_engage_decision(self):
raw = json.dumps({
"action": "ENGAGE",
"settlement_id": "",
"settlement_name": "",
"item_id": "",
"quantity": 1,
"party_id": "bandit_1",
"party_name": "Forest Bandits",
"reasoning": "Weak bandits — easy XP",
})
decision = parse_decision(raw)
assert decision.action == M2Action.ENGAGE
assert decision.party_id == "bandit_1"
def test_wait_on_invalid_json(self):
decision = parse_decision("not json at all")
assert decision.action == M2Action.WAIT
def test_wait_on_unknown_action(self):
raw = json.dumps({"action": "TELEPORT", "reasoning": "hack"})
decision = parse_decision(raw)
assert decision.action == M2Action.WAIT
def test_strips_markdown_fences(self):
raw = '```json\n{"action": "WAIT", "reasoning": "low food"}\n```'
decision = parse_decision(raw)
assert decision.action == M2Action.WAIT
def test_quantity_minimum_one(self):
raw = json.dumps({"action": "TRADE", "item_id": "grain", "quantity": -5, "reasoning": "x"})
decision = parse_decision(raw)
assert decision.quantity == 1
def test_missing_optional_fields_default_to_empty(self):
raw = json.dumps({"action": "WAIT", "reasoning": "resting"})
decision = parse_decision(raw)
assert decision.settlement_id == ""
assert decision.party_id == ""
assert decision.item_id == ""

View File

@@ -1,120 +1,162 @@
"""Unit tests for bannerlord.gabs_client."""
"""Tests for the GABSClient — uses mocked asyncio streams, no real TCP."""
from __future__ import annotations
import asyncio
import json
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from bannerlord.gabs_client import GabsClient, GabsError
from bannerlord.gabs_client import GABSClient
from bannerlord.types import GameState
class TestGabsClientCall:
"""Tests for GabsClient.call() using mock StreamReader/Writer."""
@pytest.fixture
def client():
return GABSClient(host="127.0.0.1", port=4825, timeout=2.0)
def _make_client(self, response: dict) -> GabsClient:
"""Return a pre-connected GabsClient with mocked I/O."""
client = GabsClient(host="localhost", port=4825, timeout=5.0)
client._connected = True
writer = MagicMock()
writer.write = MagicMock()
writer.drain = AsyncMock()
raw_response = json.dumps(response).encode() + b"\n"
reader = MagicMock()
reader.readline = AsyncMock(return_value=raw_response)
client._reader = reader
client._writer = writer
return client
async def test_successful_call_returns_result(self):
client = self._make_client({"jsonrpc": "2.0", "id": 1, "result": {"status": "ok"}})
result = await client.call("game/ping")
assert result == {"status": "ok"}
async def test_error_response_raises_gabs_error(self):
client = self._make_client({
"jsonrpc": "2.0",
"id": 1,
"error": {"code": -32601, "message": "Method not found"},
})
with pytest.raises(GabsError) as exc_info:
await client.call("unknown/method")
assert exc_info.value.code == -32601
async def test_not_connected_raises_runtime_error(self):
client = GabsClient()
with pytest.raises(RuntimeError, match="not connected"):
await client.call("game/ping")
async def test_request_id_increments(self):
client = self._make_client({"jsonrpc": "2.0", "id": 1, "result": {}})
await client.call("game/ping")
# Reset reader for second call
client._reader.readline = AsyncMock(
return_value=json.dumps({"jsonrpc": "2.0", "id": 2, "result": {}}).encode() + b"\n"
)
await client.call("game/ping")
assert client._req_id == 2
async def test_get_game_state_returns_empty_on_error(self):
client = GabsClient()
client._connected = True
writer = MagicMock()
writer.write = MagicMock()
writer.drain = AsyncMock()
reader = MagicMock()
reader.readline = AsyncMock(side_effect=OSError("connection reset"))
client._reader = reader
client._writer = writer
result = await client.get_game_state()
assert result == {}
async def test_ping_returns_true_on_success(self):
client = self._make_client({"jsonrpc": "2.0", "id": 1, "result": "pong"})
result = await client.ping()
assert result is True
async def test_ping_returns_false_on_failure(self):
client = GabsClient()
result = await client.ping()
class TestGABSClientConnection:
@pytest.mark.asyncio
async def test_connect_returns_false_when_refused(self, client):
with patch(
"asyncio.open_connection",
side_effect=ConnectionRefusedError("refused"),
):
result = await client.connect()
assert result is False
class TestGabsClientLifecycle:
async def test_connect_failure_sets_not_connected(self):
client = GabsClient(host="localhost", port=9999, timeout=0.1)
with pytest.raises(Exception):
await client.connect()
assert not client.is_connected
async def test_context_manager_calls_connect_and_disconnect(self):
client = GabsClient()
connect_called = False
disconnect_called = False
@pytest.mark.asyncio
async def test_connect_success(self, client):
mock_reader = AsyncMock()
mock_writer = MagicMock()
mock_writer.drain = AsyncMock()
mock_writer.close = MagicMock()
mock_writer.wait_closed = AsyncMock()
async def _fake_connect():
nonlocal connect_called
connect_called = True
client._connected = True
# Simulate tools/list response on connect
tools_response = json.dumps({"jsonrpc": "2.0", "id": 1, "result": []}) + "\n"
mock_reader.readline = AsyncMock(return_value=tools_response.encode())
async def _fake_disconnect():
nonlocal disconnect_called
disconnect_called = True
client._connected = False
with patch("asyncio.open_connection", return_value=(mock_reader, mock_writer)):
with patch("asyncio.wait_for", side_effect=_passthrough_wait_for):
result = await client.connect()
client.connect = _fake_connect
client.disconnect = _fake_disconnect
assert result is True
assert client.is_connected
async with client as c:
assert c is client
assert connect_called
@pytest.mark.asyncio
async def test_disconnect_when_not_connected(self, client):
# Should not raise
await client.disconnect()
assert not client.is_connected
assert disconnect_called
class TestGABSClientCall:
@pytest.mark.asyncio
async def test_call_returns_none_when_disconnected(self, client):
result = await client._call("game/get_state")
assert result is None
@pytest.mark.asyncio
async def test_call_id_increments(self, client):
assert client._next_id() == 1
assert client._next_id() == 2
assert client._next_id() == 3
@pytest.mark.asyncio
async def test_get_game_state_returns_empty_when_disconnected(self, client):
state = await client.get_game_state()
assert isinstance(state, GameState)
assert state.tick == 0
assert not state.has_kingdom()
@pytest.mark.asyncio
async def test_move_party_returns_false_when_disconnected(self, client):
result = await client.move_party("Vlandia")
assert result is False
@pytest.mark.asyncio
async def test_propose_peace_returns_false_when_disconnected(self, client):
result = await client.propose_peace("Vlandia")
assert result is False
@pytest.mark.asyncio
async def test_assess_prices_returns_empty_dict_when_disconnected(self, client):
result = await client.assess_prices("Pravend")
assert result == {}
@pytest.mark.asyncio
async def test_map_patrol_routes_returns_empty_list_when_disconnected(self, client):
result = await client.map_patrol_routes("Vlandia")
assert result == []
class TestGameStateParsing:
def test_parse_game_state_full(self):
client = GABSClient()
raw = {
"tick": 5,
"in_game_day": 42,
"party": {
"location": "Pravend",
"troops": 200,
"food_days": 8,
"wounded_pct": 0.1,
"denars": 15000,
"morale": 85.0,
"prisoners": 3,
},
"kingdom": {
"name": "House Timmerson",
"fiefs": ["Pravend", "Epicrotea"],
"daily_income": 500,
"daily_expenses": 300,
"vassal_lords": ["Lord A"],
"active_wars": ["Sturgia"],
"active_alliances": ["Battania"],
},
"factions": [
{
"name": "Sturgia",
"leader": "Raganvad",
"fiefs": ["Varcheg"],
"army_strength": 250,
"treasury": 5000,
"is_at_war_with": ["House Timmerson"],
"relations": {"House Timmerson": -50},
}
],
}
state = client._parse_game_state(raw)
assert state.tick == 5
assert state.in_game_day == 42
assert state.party.location == "Pravend"
assert state.party.troops == 200
assert state.kingdom.name == "House Timmerson"
assert state.fief_count() == 2
assert len(state.factions) == 1
assert state.factions[0].name == "Sturgia"
assert state.has_kingdom()
assert not state.is_two_front_war()
def test_parse_game_state_minimal(self):
client = GABSClient()
state = client._parse_game_state({})
assert isinstance(state, GameState)
assert not state.has_kingdom()
def test_tool_count_zero_before_connect(self):
client = GABSClient()
assert client.tool_count() == 0
assert client.available_tools == []
# ---------------------------------------------------------------------------
# Helper
# ---------------------------------------------------------------------------
async def _passthrough_wait_for(coro, timeout=None):
"""Stand-in for asyncio.wait_for that just awaits the coroutine."""
import asyncio
return await coro

View File

@@ -0,0 +1,176 @@
"""Tests for the King agent decision rules."""
import pytest
from bannerlord.agents.king import KingAgent, _MIN_DENARS, _MIN_TROOPS, _TARGET_FIEFS
from bannerlord.types import (
FactionState,
GameState,
KingdomState,
PartyState,
SubgoalToken,
)
def _make_state(
*,
troops: int = 150,
denars: int = 10_000,
wounded_pct: float = 0.0,
food_days: int = 10,
kingdom_name: str = "",
fiefs: list | None = None,
active_wars: list | None = None,
active_alliances: list | None = None,
factions: list | None = None,
in_game_day: int = 50,
) -> GameState:
return GameState(
in_game_day=in_game_day,
party=PartyState(
troops=troops,
denars=denars,
wounded_pct=wounded_pct,
food_days=food_days,
location="Epicrotea",
),
kingdom=KingdomState(
name=kingdom_name,
fiefs=fiefs or [],
active_wars=active_wars or [],
active_alliances=active_alliances or [],
),
factions=factions or [],
)
class TestKingAgentRules:
def setup_method(self):
self.king = KingAgent()
def test_heal_when_heavily_wounded(self):
state = _make_state(wounded_pct=0.35)
sg = self.king.decide(state)
assert sg.token == SubgoalToken.HEAL
def test_recruit_when_low_troops(self):
state = _make_state(troops=30, wounded_pct=0.0)
sg = self.king.decide(state)
assert sg.token == SubgoalToken.RECRUIT
assert sg.quantity == _MIN_TROOPS - 30
def test_trade_when_broke(self):
state = _make_state(troops=150, denars=2_000)
sg = self.king.decide(state)
assert sg.token == SubgoalToken.TRADE
def test_no_two_front_war_rule(self):
"""King must avoid 2-front wars by seeking peace."""
state = _make_state(
active_wars=["Vlandia", "Sturgia"],
kingdom_name="House Timmerson",
factions=[
FactionState(name="Vlandia", leader="Derthert", army_strength=500),
FactionState(name="Sturgia", leader="Raganvad", army_strength=200),
],
)
sg = self.king.decide(state)
assert sg.token == SubgoalToken.ALLY
# Should target weakest enemy (Sturgia at 200 strength)
assert sg.target == "Sturgia"
def test_expand_territory_when_no_kingdom(self):
state = _make_state(
troops=150,
denars=10_000,
kingdom_name="",
factions=[
FactionState(
name="Vlandia",
leader="Derthert",
fiefs=["Pravend"],
army_strength=100,
is_at_war_with=["Battania", "Aserai"],
)
],
)
sg = self.king.decide(state)
# Distracted faction should be the expansion target
assert sg.token == SubgoalToken.EXPAND_TERRITORY
def test_train_when_troops_insufficient_for_expansion(self):
state = _make_state(
troops=90,
denars=10_000,
kingdom_name="",
factions=[
FactionState(
name="Vlandia",
leader="Derthert",
fiefs=["Pravend"],
army_strength=100,
)
],
)
sg = self.king.decide(state)
assert sg.token == SubgoalToken.TRAIN
def test_expand_when_below_target_fiefs(self):
state = _make_state(
kingdom_name="House Timmerson",
fiefs=["Epicrotea"],
factions=[
FactionState(
name="Vlandia",
leader="Derthert",
fiefs=["Pravend", "Sargot"],
army_strength=100,
)
],
)
sg = self.king.decide(state)
assert sg.token == SubgoalToken.EXPAND_TERRITORY
def test_consolidate_when_stable(self):
state = _make_state(
kingdom_name="House Timmerson",
fiefs=["Epicrotea", "Pravend", "Sargot"],
active_alliances=["Battania"],
)
sg = self.king.decide(state)
assert sg.token in {SubgoalToken.CONSOLIDATE, SubgoalToken.FORTIFY}
def test_tick_increments(self):
king = KingAgent()
state = _make_state()
king.decide(state)
king.decide(state)
assert king.tick == 2
def test_done_condition_not_met_without_kingdom(self):
state = _make_state(in_game_day=200)
assert not self.king.is_done_condition_met(state)
def test_done_condition_met(self):
state = _make_state(
kingdom_name="House Timmerson",
fiefs=["A", "B", "C"],
in_game_day=110,
)
assert self.king.is_done_condition_met(state)
def test_done_condition_not_met_insufficient_days(self):
state = _make_state(
kingdom_name="House Timmerson",
fiefs=["A", "B", "C"],
in_game_day=50,
)
assert not self.king.is_done_condition_met(state)
def test_campaign_summary_shape(self):
state = _make_state(kingdom_name="House Timmerson", fiefs=["A"])
summary = self.king.campaign_summary(state)
assert "tick" in summary
assert "has_kingdom" in summary
assert "fief_count" in summary
assert "survival_goal_met" in summary

View File

@@ -0,0 +1,140 @@
"""Tests for SessionMemory — SQLite-backed campaign persistence."""
import tempfile
from pathlib import Path
import pytest
from bannerlord.session_memory import SessionMemory
from bannerlord.types import KingSubgoal, SubgoalToken
@pytest.fixture
def memory(tmp_path):
return SessionMemory(tmp_path / "test_campaign.db")
class TestSessionLifecycle:
def test_start_session_returns_id(self, memory):
sid = memory.start_session()
assert sid.startswith("session_")
def test_start_session_with_explicit_id(self, memory):
sid = memory.start_session("my_run_001")
assert sid == "my_run_001"
def test_start_idempotent(self, memory):
sid1 = memory.start_session("run")
sid2 = memory.start_session("run")
assert sid1 == sid2
def test_get_session_returns_dict(self, memory):
sid = memory.start_session()
row = memory.get_session(sid)
assert row is not None
assert row["session_id"] == sid
def test_get_unknown_session_returns_none(self, memory):
assert memory.get_session("does_not_exist") is None
def test_list_sessions(self, memory):
memory.start_session("s1")
memory.start_session("s2")
sessions = memory.list_sessions()
ids = [s["session_id"] for s in sessions]
assert "s1" in ids
assert "s2" in ids
def test_update_session(self, memory):
sid = memory.start_session()
memory.update_session(sid, kingdom_name="House Timmerson", fief_count=2, in_game_day=45)
row = memory.get_session(sid)
assert row["kingdom_name"] == "House Timmerson"
assert row["fief_count"] == 2
assert row["in_game_day"] == 45
class TestSubgoalLog:
def test_log_and_retrieve_subgoal(self, memory):
sid = memory.start_session()
sg = KingSubgoal(token=SubgoalToken.EXPAND_TERRITORY, target="Epicrotea")
row_id = memory.log_subgoal(sid, tick=1, in_game_day=10, subgoal=sg)
assert row_id > 0
entries = memory.get_recent_subgoals(sid, limit=5)
assert len(entries) == 1
assert entries[0]["token"] == "EXPAND_TERRITORY"
assert entries[0]["target"] == "Epicrotea"
assert entries[0]["outcome"] == "pending"
def test_complete_subgoal(self, memory):
sid = memory.start_session()
sg = KingSubgoal(token=SubgoalToken.TRADE)
row_id = memory.log_subgoal(sid, tick=2, in_game_day=11, subgoal=sg)
memory.complete_subgoal(row_id, outcome="success")
entries = memory.get_recent_subgoals(sid, limit=5)
assert entries[0]["outcome"] == "success"
assert entries[0]["completed_at"] is not None
def test_count_token(self, memory):
sid = memory.start_session()
for i in range(3):
memory.log_subgoal(
sid, tick=i, in_game_day=i,
subgoal=KingSubgoal(token=SubgoalToken.RECRUIT)
)
memory.log_subgoal(
sid, tick=10, in_game_day=10,
subgoal=KingSubgoal(token=SubgoalToken.TRADE)
)
assert memory.count_token(sid, SubgoalToken.RECRUIT) == 3
assert memory.count_token(sid, SubgoalToken.TRADE) == 1
assert memory.count_token(sid, SubgoalToken.ALLY) == 0
def test_recent_subgoals_respects_limit(self, memory):
sid = memory.start_session()
for i in range(10):
memory.log_subgoal(
sid, tick=i, in_game_day=i,
subgoal=KingSubgoal(token=SubgoalToken.CONSOLIDATE)
)
entries = memory.get_recent_subgoals(sid, limit=3)
assert len(entries) == 3
class TestStrategyNotes:
def test_add_and_get_notes(self, memory):
sid = memory.start_session()
memory.add_note(sid, in_game_day=5, note_type="intel", content="Vlandia weakened")
notes = memory.get_notes(sid)
assert len(notes) == 1
assert notes[0]["content"] == "Vlandia weakened"
def test_filter_notes_by_type(self, memory):
sid = memory.start_session()
memory.add_note(sid, 1, "milestone", "Kingdom established")
memory.add_note(sid, 2, "intel", "Enemy sighted")
milestones = memory.get_notes(sid, note_type="milestone")
assert len(milestones) == 1
assert milestones[0]["content"] == "Kingdom established"
def test_record_kingdom_established(self, memory):
sid = memory.start_session()
memory.record_kingdom_established(sid, in_game_day=42, kingdom_name="House Timmerson")
milestones = memory.get_milestones(sid)
assert any("House Timmerson" in m["content"] for m in milestones)
row = memory.get_session(sid)
assert row["kingdom_name"] == "House Timmerson"
def test_record_war_declared(self, memory):
sid = memory.start_session()
memory.record_war_declared(sid, in_game_day=20, faction="Vlandia")
notes = memory.get_notes(sid, note_type="war_declared")
assert len(notes) == 1
assert "Vlandia" in notes[0]["content"]
def test_record_peace_agreed(self, memory):
sid = memory.start_session()
memory.record_peace_agreed(sid, in_game_day=30, faction="Sturgia")
notes = memory.get_notes(sid, note_type="peace_agreed")
assert len(notes) == 1
assert "Sturgia" in notes[0]["content"]

View File

@@ -0,0 +1,103 @@
"""Tests for Bannerlord M3 core data types."""
from datetime import UTC, datetime
from bannerlord.types import (
FactionState,
GameState,
KingSubgoal,
KingdomState,
PartyState,
SubgoalToken,
VassalReward,
)
class TestSubgoalToken:
def test_all_tokens_are_strings(self):
for token in SubgoalToken:
assert isinstance(str(token), str)
def test_round_trip_from_string(self):
assert SubgoalToken("EXPAND_TERRITORY") == SubgoalToken.EXPAND_TERRITORY
assert SubgoalToken("ALLY") == SubgoalToken.ALLY
class TestKingSubgoal:
def test_defaults(self):
sg = KingSubgoal(token=SubgoalToken.CONSOLIDATE)
assert sg.priority == 1.0
assert sg.target is None
assert sg.quantity is None
assert isinstance(sg.issued_at, datetime)
def test_to_dict_round_trip(self):
sg = KingSubgoal(
token=SubgoalToken.EXPAND_TERRITORY,
target="Epicrotea",
quantity=None,
priority=1.5,
deadline_days=10,
context="capture castle",
)
d = sg.to_dict()
assert d["token"] == "EXPAND_TERRITORY"
assert d["target"] == "Epicrotea"
assert d["priority"] == 1.5
restored = KingSubgoal.from_dict(d)
assert restored.token == SubgoalToken.EXPAND_TERRITORY
assert restored.target == "Epicrotea"
assert restored.priority == 1.5
assert restored.deadline_days == 10
def test_from_dict_without_issued_at(self):
d = {"token": "TRADE"}
sg = KingSubgoal.from_dict(d)
assert sg.token == SubgoalToken.TRADE
assert isinstance(sg.issued_at, datetime)
class TestGameState:
def test_empty_state(self):
state = GameState()
assert not state.has_kingdom()
assert state.fief_count() == 0
assert state.active_war_count() == 0
assert not state.is_two_front_war()
def test_has_kingdom(self):
state = GameState(kingdom=KingdomState(name="House Timmerson"))
assert state.has_kingdom()
def test_fief_count(self):
state = GameState(
kingdom=KingdomState(name="House Timmerson", fiefs=["Pravend", "Epicrotea"])
)
assert state.fief_count() == 2
def test_two_front_war(self):
state = GameState(
kingdom=KingdomState(
name="House Timmerson",
active_wars=["Vlandia", "Sturgia"],
)
)
assert state.is_two_front_war()
def test_single_war_not_two_front(self):
state = GameState(
kingdom=KingdomState(
name="House Timmerson",
active_wars=["Vlandia"],
)
)
assert not state.is_two_front_war()
class TestVassalReward:
def test_defaults(self):
reward = VassalReward(agent_id="war_vassal")
assert reward.total == 0.0
assert reward.subgoal_bonus == 0.0
assert isinstance(reward.computed_at, datetime)

View File

@@ -0,0 +1,179 @@
"""Tests for Bannerlord vassal agents (War, Economy, Diplomacy)."""
from bannerlord.agents.diplomacy_vassal import DiplomacyVassal
from bannerlord.agents.economy_vassal import EconomyVassal
from bannerlord.agents.war_vassal import WarVassal
from bannerlord.types import (
FactionState,
GameState,
KingSubgoal,
KingdomState,
PartyState,
SubgoalToken,
)
def _state(
*,
troops: int = 150,
denars: int = 10_000,
food_days: int = 10,
kingdom_name: str = "House Timmerson",
fiefs: list | None = None,
active_wars: list | None = None,
active_alliances: list | None = None,
factions: list | None = None,
) -> GameState:
return GameState(
party=PartyState(
troops=troops,
denars=denars,
food_days=food_days,
location="Epicrotea",
),
kingdom=KingdomState(
name=kingdom_name,
fiefs=fiefs or ["Epicrotea"],
active_wars=active_wars or [],
active_alliances=active_alliances or [],
),
factions=factions or [],
)
class TestWarVassal:
def setup_method(self):
self.vassal = WarVassal()
def test_is_relevant_expand(self):
sg = KingSubgoal(token=SubgoalToken.EXPAND_TERRITORY)
assert self.vassal.is_relevant(sg)
def test_is_relevant_raid(self):
sg = KingSubgoal(token=SubgoalToken.RAID_ECONOMY)
assert self.vassal.is_relevant(sg)
def test_not_relevant_for_trade(self):
sg = KingSubgoal(token=SubgoalToken.TRADE)
assert not self.vassal.is_relevant(sg)
def test_plan_expansion_with_target(self):
state = _state()
sg = KingSubgoal(token=SubgoalToken.EXPAND_TERRITORY, target="Pravend")
tasks = self.vassal.plan(state, sg)
primitives = [t.primitive for t in tasks]
assert "siege_settlement" in primitives
assert "auto_resolve_battle" in primitives
def test_plan_expansion_scouts_garrison_first(self):
state = _state()
sg = KingSubgoal(token=SubgoalToken.EXPAND_TERRITORY, target="Pravend")
tasks = self.vassal.plan(state, sg)
# Scout garrison should be the first task (highest priority)
assert tasks[0].primitive == "assess_garrison"
def test_plan_expansion_recruits_when_low(self):
state = _state(troops=80)
sg = KingSubgoal(token=SubgoalToken.EXPAND_TERRITORY, target="Pravend")
tasks = self.vassal.plan(state, sg)
primitives = [t.primitive for t in tasks]
assert "recruit_troop" in primitives
def test_plan_expansion_no_target(self):
state = _state()
sg = KingSubgoal(token=SubgoalToken.EXPAND_TERRITORY, target=None)
tasks = self.vassal.plan(state, sg)
assert tasks == []
def test_plan_raid_with_target(self):
state = _state()
sg = KingSubgoal(token=SubgoalToken.RAID_ECONOMY, target="Enemy Village")
tasks = self.vassal.plan(state, sg)
assert any(t.primitive == "raid_village" for t in tasks)
def test_compute_reward_territory_gain(self):
prev = _state(fiefs=["A"])
curr = _state(fiefs=["A", "B"])
sg = KingSubgoal(token=SubgoalToken.EXPAND_TERRITORY)
reward = self.vassal.compute_reward(prev, curr, sg)
assert reward.total > 0
assert reward.component_scores["territory"] > 0
class TestEconomyVassal:
def setup_method(self):
self.vassal = EconomyVassal()
def test_is_relevant_fortify(self):
sg = KingSubgoal(token=SubgoalToken.FORTIFY)
assert self.vassal.is_relevant(sg)
def test_is_relevant_consolidate(self):
sg = KingSubgoal(token=SubgoalToken.CONSOLIDATE)
assert self.vassal.is_relevant(sg)
def test_not_relevant_for_raid(self):
sg = KingSubgoal(token=SubgoalToken.RAID_ECONOMY)
assert not self.vassal.is_relevant(sg)
def test_plan_fortify_queues_build(self):
state = _state()
sg = KingSubgoal(token=SubgoalToken.FORTIFY, target="Epicrotea")
tasks = self.vassal.plan(state, sg)
primitives = [t.primitive for t in tasks]
assert "build_project" in primitives
assert "set_tax_policy" in primitives
def test_plan_buys_food_when_low(self):
state = _state(food_days=2)
sg = KingSubgoal(token=SubgoalToken.CONSOLIDATE)
tasks = self.vassal.plan(state, sg)
primitives = [t.primitive for t in tasks]
assert "buy_supplies" in primitives
def test_plan_consolidate_sets_tax(self):
state = _state()
sg = KingSubgoal(token=SubgoalToken.CONSOLIDATE)
tasks = self.vassal.plan(state, sg)
assert any(t.primitive == "set_tax_policy" for t in tasks)
class TestDiplomacyVassal:
def setup_method(self):
self.vassal = DiplomacyVassal()
def test_is_relevant_ally(self):
sg = KingSubgoal(token=SubgoalToken.ALLY)
assert self.vassal.is_relevant(sg)
def test_not_relevant_for_train(self):
sg = KingSubgoal(token=SubgoalToken.TRAIN)
assert not self.vassal.is_relevant(sg)
def test_plan_proposes_peace_with_enemy(self):
state = _state(active_wars=["Vlandia"])
sg = KingSubgoal(token=SubgoalToken.ALLY, target="Vlandia")
tasks = self.vassal.plan(state, sg)
assert any(t.primitive == "propose_peace" for t in tasks)
def test_plan_requests_alliance_with_non_enemy(self):
state = _state()
sg = KingSubgoal(token=SubgoalToken.ALLY, target="Battania")
tasks = self.vassal.plan(state, sg)
primitives = [t.primitive for t in tasks]
assert "send_envoy" in primitives
assert "request_alliance" in primitives
def test_plan_no_target_returns_empty(self):
state = _state()
sg = KingSubgoal(token=SubgoalToken.ALLY, target=None)
tasks = self.vassal.plan(state, sg)
assert tasks == []
def test_should_avoid_war_when_two_fronts(self):
state = _state(active_wars=["A", "B"])
assert self.vassal.should_avoid_war(state)
def test_should_not_avoid_war_when_one_front(self):
state = _state(active_wars=["A"])
assert not self.vassal.should_avoid_war(state)