1
0

[claude] Bannerlord M5: sovereign victory stack (src/bannerlord/) (#1097) (#1155)

Co-authored-by: Claude (Opus 4.6) <claude@hermes.local>
Co-committed-by: Claude (Opus 4.6) <claude@hermes.local>
This commit is contained in:
2026-03-23 19:26:05 +00:00
committed by rockachopa
parent 495c1ac2bd
commit 2b9a55fa6d
14 changed files with 2233 additions and 0 deletions

View File

@@ -0,0 +1,100 @@
# Issue #1097 — Bannerlord M5 Sovereign Victory: Implementation
**Date:** 2026-03-23
**Status:** Python stack implemented — game infrastructure pending
## Summary
Issue #1097 is the final milestone of Project Bannerlord (#1091): Timmy holds
the title of King with majority territory control through pure local strategy.
This PR implements the Python-side sovereign victory stack (`src/bannerlord/`).
The game-side infrastructure (Windows VM, GABS C# mod) remains external to this
repository, consistent with the scope decision on M4 (#1096).
## What was implemented
### `src/bannerlord/` package
| Module | Purpose |
|--------|---------|
| `models.py` | Pydantic data contracts — KingSubgoal, SubgoalMessage, TaskMessage, ResultMessage, StateUpdateMessage, reward functions, VictoryCondition |
| `gabs_client.py` | Async TCP JSON-RPC client for Bannerlord.GABS (port 4825), graceful degradation when game server is offline |
| `ledger.py` | SQLite-backed asset ledger — treasury, fiefs, vassal budgets, campaign tick log |
| `agents/king.py` | King agent — Qwen3:32b, 1× per campaign day, sovereign campaign loop, victory detection, subgoal broadcast |
| `agents/vassals.py` | War / Economy / Diplomacy vassals — Qwen3:14b, domain reward functions, primitive dispatch |
| `agents/companions.py` | Logistics / Caravan / Scout companions — event-driven, primitive execution against GABS |
### `tests/unit/test_bannerlord/` — 56 unit tests
- `test_models.py` — Pydantic validation, reward math, victory condition logic
- `test_gabs_client.py` — Connection lifecycle, RPC dispatch, error handling, graceful degradation
- `test_agents.py` — King campaign loop, vassal subgoal routing, companion primitive execution
All 56 tests pass.
## Architecture
```
KingAgent (Qwen3:32b, 1×/day)
└── KingSubgoal → SubgoalQueue
├── WarVassal (Qwen3:14b, 4×/day)
│ └── TaskMessage → LogisticsCompanion
│ └── GABS: move_party, recruit_troops, upgrade_troops
├── EconomyVassal (Qwen3:14b, 4×/day)
│ └── TaskMessage → CaravanCompanion
│ └── GABS: assess_prices, buy_goods, establish_caravan
└── DiplomacyVassal (Qwen3:14b, 4×/day)
└── TaskMessage → ScoutCompanion
└── GABS: track_lord, assess_garrison, report_intel
```
## Subgoal vocabulary
| Token | Vassal | Meaning |
|-------|--------|---------|
| `EXPAND_TERRITORY` | War | Take or secure a fief |
| `RAID_ECONOMY` | War | Raid enemy villages for denars |
| `TRAIN` | War | Level troops via auto-resolve |
| `FORTIFY` | Economy | Upgrade or repair a settlement |
| `CONSOLIDATE` | Economy | Hold territory, no expansion |
| `TRADE` | Economy | Execute profitable trade route |
| `ALLY` | Diplomacy | Pursue non-aggression / alliance |
| `RECRUIT` | Logistics | Fill party to capacity |
| `HEAL` | Logistics | Rest party until wounds recovered |
| `SPY` | Scout | Gain information on target faction |
## Victory condition
```python
VictoryCondition(
holds_king_title=True, # player_title == "King" from GABS
territory_control_pct=55.0, # > 51% of Calradia fiefs
)
```
## Graceful degradation
When GABS is offline (game not running), `GABSClient` logs a warning and raises
`GABSUnavailable`. The King agent catches this and runs with an empty game state
(falls back to RECRUIT subgoal). No part of the dashboard crashes.
## Remaining prerequisites
Before M5 can run live:
1. **M1-M3** — Passive observer, basic campaign actions, full campaign strategy
(currently open; their Python stubs can build on this `src/bannerlord/` package)
2. **M4** — Formation Commander (#1096) — declined as out-of-scope; M5 works
around M4 by using Bannerlord's Tactics auto-resolve path
3. **Windows VM** — Mount & Blade II: Bannerlord + GABS mod (BUTR/Bannerlord.GABS)
4. **OBS streaming** — Cinematic Camera pipeline (Step 3 of M5) — external to repo
5. **BattleLink** — Alex co-op integration (Step 4 of M5) — requires dedicated server
## Design references
- Ahilan & Dayan (2019): Feudal Multi-Agent Hierarchies — manager/worker hierarchy
- Wang et al. (2023): Voyager — LLM lifelong learning pattern
- Feudal hierarchy design doc: `docs/research/bannerlord-feudal-hierarchy-design.md`
Fixes #1097

View File

@@ -14,6 +14,7 @@ repository = "http://localhost:3000/rockachopa/Timmy-time-dashboard"
packages = [
{ include = "config.py", from = "src" },
{ include = "bannerlord", from = "src" },
{ include = "dashboard", from = "src" },
{ include = "infrastructure", from = "src" },
{ include = "integrations", from = "src" },

View File

@@ -0,0 +1,22 @@
"""Bannerlord sovereign agent package — Project Bannerlord M5.
Implements the feudal multi-agent hierarchy for Timmy's Bannerlord campaign.
Architecture based on Ahilan & Dayan (2019) Feudal Multi-Agent Hierarchies.
Refs #1091 (epic), #1097 (M5 Sovereign Victory), #1099 (feudal hierarchy design).
Requires:
- GABS mod running on Bannerlord Windows VM (TCP port 4825)
- Ollama with Qwen3:32b (King), Qwen3:14b (Vassals), Qwen3:8b (Companions)
Usage::
from bannerlord.gabs_client import GABSClient
from bannerlord.agents.king import KingAgent
async with GABSClient() as gabs:
king = KingAgent(gabs_client=gabs)
await king.run_campaign()
"""
__version__ = "0.1.0"

View File

@@ -0,0 +1,7 @@
"""Bannerlord feudal agent hierarchy.
Three tiers:
- King (king.py) — strategic, Qwen3:32b, 1× per campaign day
- Vassals (vassals.py) — domain, Qwen3:14b, 4× per campaign day
- Companions (companions.py) — tactical, Qwen3:8b, event-driven
"""

View File

@@ -0,0 +1,258 @@
"""Companion worker agents — Logistics, Caravan, and Scout.
Companions are the lowest tier — fast, specialized, single-purpose workers.
Each companion listens to its :class:`TaskMessage` queue, executes the
requested primitive against GABS, and emits a :class:`ResultMessage`.
Model: Qwen3:8b (or smaller) — sub-2-second response times.
Frequency: event-driven (triggered by vassal task messages).
Primitive vocabulary per companion:
Logistics: recruit_troop, buy_supplies, rest_party, sell_prisoners, upgrade_troops, build_project
Caravan: assess_prices, buy_goods, sell_goods, establish_caravan, abandon_route
Scout: track_lord, assess_garrison, map_patrol_routes, report_intel
Refs: #1097, #1099.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any
from bannerlord.gabs_client import GABSClient, GABSUnavailable
from bannerlord.models import ResultMessage, TaskMessage
logger = logging.getLogger(__name__)
class BaseCompanion:
"""Shared companion lifecycle — polls task queue, executes primitives."""
name: str = "base_companion"
primitives: frozenset[str] = frozenset()
def __init__(
self,
gabs_client: GABSClient,
task_queue: asyncio.Queue[TaskMessage],
result_queue: asyncio.Queue[ResultMessage] | None = None,
) -> None:
self._gabs = gabs_client
self._task_queue = task_queue
self._result_queue = result_queue or asyncio.Queue()
self._running = False
@property
def result_queue(self) -> asyncio.Queue[ResultMessage]:
return self._result_queue
async def run(self) -> None:
"""Companion event loop — processes task messages."""
self._running = True
logger.info("%s started", self.name)
try:
while self._running:
try:
task = await asyncio.wait_for(self._task_queue.get(), timeout=1.0)
except asyncio.TimeoutError:
continue
if task.to_agent != self.name:
# Not for us — put it back (another companion will handle it)
await self._task_queue.put(task)
await asyncio.sleep(0.05)
continue
result = await self._execute(task)
await self._result_queue.put(result)
self._task_queue.task_done()
except asyncio.CancelledError:
logger.info("%s cancelled", self.name)
raise
finally:
self._running = False
def stop(self) -> None:
self._running = False
async def _execute(self, task: TaskMessage) -> ResultMessage:
"""Dispatch *task.primitive* to its handler method."""
handler = getattr(self, f"_prim_{task.primitive}", None)
if handler is None:
logger.warning(
"%s: unknown primitive %r — skipping", self.name, task.primitive
)
return ResultMessage(
from_agent=self.name,
to_agent=task.from_agent,
success=False,
outcome={"error": f"Unknown primitive: {task.primitive}"},
)
try:
outcome = await handler(task.args)
return ResultMessage(
from_agent=self.name,
to_agent=task.from_agent,
success=True,
outcome=outcome or {},
)
except GABSUnavailable as exc:
logger.warning("%s: GABS unavailable for %r: %s", self.name, task.primitive, exc)
return ResultMessage(
from_agent=self.name,
to_agent=task.from_agent,
success=False,
outcome={"error": str(exc)},
)
except Exception as exc: # noqa: BLE001
logger.warning("%s: %r failed: %s", self.name, task.primitive, exc)
return ResultMessage(
from_agent=self.name,
to_agent=task.from_agent,
success=False,
outcome={"error": str(exc)},
)
# ── Logistics Companion ───────────────────────────────────────────────────────
class LogisticsCompanion(BaseCompanion):
"""Party management — recruitment, supply, healing, troop upgrades.
Skill domain: Scouting / Steward / Medicine.
"""
name = "logistics_companion"
primitives = frozenset(
{"recruit_troop", "buy_supplies", "rest_party", "sell_prisoners", "upgrade_troops", "build_project"}
)
async def _prim_recruit_troop(self, args: dict[str, Any]) -> dict[str, Any]:
troop_type = args.get("troop_type", "infantry")
qty = int(args.get("quantity", 10))
result = await self._gabs.recruit_troops(troop_type, qty)
logger.info("Recruited %d %s", qty, troop_type)
return result or {"recruited": qty, "type": troop_type}
async def _prim_buy_supplies(self, args: dict[str, Any]) -> dict[str, Any]:
qty = int(args.get("quantity", 50))
result = await self._gabs.call("party.buySupplies", {"quantity": qty})
logger.info("Bought %d food supplies", qty)
return result or {"purchased": qty}
async def _prim_rest_party(self, args: dict[str, Any]) -> dict[str, Any]:
days = int(args.get("days", 3))
result = await self._gabs.call("party.rest", {"days": days})
logger.info("Resting party for %d days", days)
return result or {"rested_days": days}
async def _prim_sell_prisoners(self, args: dict[str, Any]) -> dict[str, Any]:
location = args.get("location", "nearest_town")
result = await self._gabs.call("party.sellPrisoners", {"location": location})
logger.info("Selling prisoners at %s", location)
return result or {"sold_at": location}
async def _prim_upgrade_troops(self, args: dict[str, Any]) -> dict[str, Any]:
result = await self._gabs.call("party.upgradeTroops", {})
logger.info("Upgraded available troops")
return result or {"upgraded": True}
async def _prim_build_project(self, args: dict[str, Any]) -> dict[str, Any]:
settlement = args.get("settlement", "")
result = await self._gabs.call("settlement.buildProject", {"settlement": settlement})
logger.info("Building project in %s", settlement)
return result or {"settlement": settlement}
async def _prim_move_party(self, args: dict[str, Any]) -> dict[str, Any]:
destination = args.get("destination", "")
result = await self._gabs.move_party(destination)
logger.info("Moving party to %s", destination)
return result or {"destination": destination}
# ── Caravan Companion ─────────────────────────────────────────────────────────
class CaravanCompanion(BaseCompanion):
"""Trade route management — price assessment, goods trading, caravan deployment.
Skill domain: Trade / Charm.
"""
name = "caravan_companion"
primitives = frozenset(
{"assess_prices", "buy_goods", "sell_goods", "establish_caravan", "abandon_route"}
)
async def _prim_assess_prices(self, args: dict[str, Any]) -> dict[str, Any]:
town = args.get("town", "nearest")
result = await self._gabs.call("trade.assessPrices", {"town": town})
logger.info("Assessed prices at %s", town)
return result or {"town": town}
async def _prim_buy_goods(self, args: dict[str, Any]) -> dict[str, Any]:
item = args.get("item", "grain")
qty = int(args.get("quantity", 10))
result = await self._gabs.call("trade.buyGoods", {"item": item, "quantity": qty})
logger.info("Buying %d × %s", qty, item)
return result or {"item": item, "quantity": qty}
async def _prim_sell_goods(self, args: dict[str, Any]) -> dict[str, Any]:
item = args.get("item", "grain")
qty = int(args.get("quantity", 10))
result = await self._gabs.call("trade.sellGoods", {"item": item, "quantity": qty})
logger.info("Selling %d × %s", qty, item)
return result or {"item": item, "quantity": qty}
async def _prim_establish_caravan(self, args: dict[str, Any]) -> dict[str, Any]:
town = args.get("town", "")
result = await self._gabs.call("trade.establishCaravan", {"town": town})
logger.info("Establishing caravan at %s", town)
return result or {"town": town}
async def _prim_abandon_route(self, args: dict[str, Any]) -> dict[str, Any]:
result = await self._gabs.call("trade.abandonRoute", {})
logger.info("Caravan route abandoned — returning to main party")
return result or {"abandoned": True}
# ── Scout Companion ───────────────────────────────────────────────────────────
class ScoutCompanion(BaseCompanion):
"""Intelligence gathering — lord tracking, garrison assessment, patrol mapping.
Skill domain: Scouting / Roguery.
"""
name = "scout_companion"
primitives = frozenset(
{"track_lord", "assess_garrison", "map_patrol_routes", "report_intel"}
)
async def _prim_track_lord(self, args: dict[str, Any]) -> dict[str, Any]:
lord_name = args.get("name", "")
result = await self._gabs.call("intelligence.trackLord", {"name": lord_name})
logger.info("Tracking lord: %s", lord_name)
return result or {"tracking": lord_name}
async def _prim_assess_garrison(self, args: dict[str, Any]) -> dict[str, Any]:
settlement = args.get("settlement", "")
result = await self._gabs.call("intelligence.assessGarrison", {"settlement": settlement})
logger.info("Assessing garrison at %s", settlement)
return result or {"settlement": settlement}
async def _prim_map_patrol_routes(self, args: dict[str, Any]) -> dict[str, Any]:
region = args.get("region", "")
result = await self._gabs.call("intelligence.mapPatrols", {"region": region})
logger.info("Mapping patrol routes in %s", region)
return result or {"region": region}
async def _prim_report_intel(self, args: dict[str, Any]) -> dict[str, Any]:
result = await self._gabs.call("intelligence.report", {})
logger.info("Scout intel report generated")
return result or {"reported": True}

View File

@@ -0,0 +1,236 @@
"""King agent — Timmy as sovereign ruler of Calradia.
The King operates on the campaign-map timescale. Each campaign tick he:
1. Reads the full game state from GABS
2. Evaluates the victory condition
3. Issues a single KingSubgoal token to the vassal queue
4. Logs the tick to the ledger
Strategic planning model: Qwen3:32b (local via Ollama).
Decision budget: 515 seconds per tick.
Sovereignty guarantees (§5c of the feudal hierarchy design):
- King task holds the asyncio.TaskGroup cancel scope
- Vassals and companions run as sub-tasks and cannot terminate the King
- Only the human operator or a top-level SHUTDOWN signal can stop the loop
Refs: #1091, #1097, #1099.
"""
from __future__ import annotations
import asyncio
import json
import logging
from typing import Any
from bannerlord.gabs_client import GABSClient, GABSUnavailable
from bannerlord.ledger import Ledger
from bannerlord.models import (
KingSubgoal,
StateUpdateMessage,
SubgoalMessage,
VictoryCondition,
)
logger = logging.getLogger(__name__)
_KING_MODEL = "qwen3:32b"
_KING_TICK_SECONDS = 5.0 # real-time pause between campaign ticks (configurable)
_SYSTEM_PROMPT = """You are Timmy, the sovereign King of Calradia.
Your goal: hold the title of King with majority territory control (>50% of all fiefs).
You think strategically over 100+ in-game days. You never cheat, use cloud AI, or
request external resources beyond your local inference stack.
Each turn you receive the full game state as JSON. You respond with a single JSON
object selecting your strategic directive for the next campaign day:
{
"token": "<SUBGOAL_TOKEN>",
"target": "<settlement or faction or null>",
"quantity": <int or null>,
"priority": <float 0.0-2.0>,
"deadline_days": <int or null>,
"context": "<brief reasoning>"
}
Valid tokens: EXPAND_TERRITORY, RAID_ECONOMY, FORTIFY, RECRUIT, TRADE,
ALLY, SPY, HEAL, CONSOLIDATE, TRAIN
Think step by step. Respond with JSON only — no prose outside the object.
"""
class KingAgent:
"""Sovereign campaign agent.
Parameters
----------
gabs_client:
Connected (or gracefully-degraded) GABS client.
ledger:
Asset ledger for persistence. Initialized automatically if not provided.
ollama_url:
Base URL of the Ollama inference server.
model:
Ollama model tag. Default: qwen3:32b.
tick_interval:
Real-time seconds between campaign ticks.
subgoal_queue:
asyncio.Queue where KingSubgoal messages are placed for vassals.
Created automatically if not provided.
"""
def __init__(
self,
gabs_client: GABSClient,
ledger: Ledger | None = None,
ollama_url: str = "http://localhost:11434",
model: str = _KING_MODEL,
tick_interval: float = _KING_TICK_SECONDS,
subgoal_queue: asyncio.Queue[SubgoalMessage] | None = None,
) -> None:
self._gabs = gabs_client
self._ledger = ledger or Ledger()
self._ollama_url = ollama_url
self._model = model
self._tick_interval = tick_interval
self._subgoal_queue: asyncio.Queue[SubgoalMessage] = subgoal_queue or asyncio.Queue()
self._tick = 0
self._running = False
@property
def subgoal_queue(self) -> asyncio.Queue[SubgoalMessage]:
return self._subgoal_queue
# ── Campaign loop ─────────────────────────────────────────────────────
async def run_campaign(self, max_ticks: int | None = None) -> VictoryCondition:
"""Run the sovereign campaign loop until victory or *max_ticks*.
Returns the final :class:`VictoryCondition` snapshot.
"""
self._ledger.initialize()
self._running = True
victory = VictoryCondition()
logger.info("King campaign started. Model: %s. Max ticks: %s", self._model, max_ticks)
try:
while self._running:
if max_ticks is not None and self._tick >= max_ticks:
logger.info("Max ticks (%d) reached — stopping campaign.", max_ticks)
break
state = await self._fetch_state()
victory = self._evaluate_victory(state)
if victory.achieved:
logger.info(
"SOVEREIGN VICTORY — King of Calradia! "
"Territory: %.1f%%, tick: %d",
victory.territory_control_pct,
self._tick,
)
break
subgoal = await self._decide(state)
await self._broadcast_subgoal(subgoal)
self._ledger.log_tick(
tick=self._tick,
campaign_day=state.get("campaign_day", self._tick),
subgoal=subgoal.token,
)
self._tick += 1
await asyncio.sleep(self._tick_interval)
except asyncio.CancelledError:
logger.info("King campaign task cancelled at tick %d", self._tick)
raise
finally:
self._running = False
return victory
def stop(self) -> None:
"""Signal the campaign loop to stop after the current tick."""
self._running = False
# ── State & victory ───────────────────────────────────────────────────
async def _fetch_state(self) -> dict[str, Any]:
try:
state = await self._gabs.get_state()
return state if isinstance(state, dict) else {}
except GABSUnavailable as exc:
logger.warning("GABS unavailable at tick %d: %s — using empty state", self._tick, exc)
return {}
def _evaluate_victory(self, state: dict[str, Any]) -> VictoryCondition:
return VictoryCondition(
holds_king_title=state.get("player_title") == "King",
territory_control_pct=float(state.get("territory_control_pct", 0.0)),
)
# ── Strategic decision ────────────────────────────────────────────────
async def _decide(self, state: dict[str, Any]) -> KingSubgoal:
"""Ask the LLM for the next strategic subgoal.
Falls back to RECRUIT (safe default) if the LLM is unavailable.
"""
try:
subgoal = await asyncio.to_thread(self._llm_decide, state)
return subgoal
except Exception as exc: # noqa: BLE001
logger.warning(
"King LLM decision failed at tick %d: %s — defaulting to RECRUIT", self._tick, exc
)
return KingSubgoal(token="RECRUIT", context="LLM unavailable — safe default")
def _llm_decide(self, state: dict[str, Any]) -> KingSubgoal:
"""Synchronous Ollama call (runs in a thread via asyncio.to_thread)."""
import urllib.request
prompt_state = json.dumps(state, indent=2)[:4000] # truncate for context budget
payload = {
"model": self._model,
"prompt": f"GAME STATE:\n{prompt_state}\n\nYour strategic directive:",
"system": _SYSTEM_PROMPT,
"stream": False,
"format": "json",
"options": {"temperature": 0.1},
}
data = json.dumps(payload).encode()
req = urllib.request.Request(
f"{self._ollama_url}/api/generate",
data=data,
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req, timeout=30) as resp: # noqa: S310
result = json.loads(resp.read())
raw = result.get("response", "{}")
parsed = json.loads(raw)
return KingSubgoal(**parsed)
# ── Subgoal dispatch ──────────────────────────────────────────────────
async def _broadcast_subgoal(self, subgoal: KingSubgoal) -> None:
"""Place the subgoal on the queue for all vassals."""
for vassal in ("war_vassal", "economy_vassal", "diplomacy_vassal"):
msg = SubgoalMessage(to_agent=vassal, subgoal=subgoal)
await self._subgoal_queue.put(msg)
logger.debug(
"Tick %d: subgoal %s%s (priority=%.1f)",
self._tick,
subgoal.token,
subgoal.target or "",
subgoal.priority,
)
# ── State broadcast consumer ──────────────────────────────────────────
async def consume_state_update(self, msg: StateUpdateMessage) -> None:
"""Receive a state update broadcast (called by the orchestrator)."""
logger.debug("King received state update tick=%d", msg.tick)

View File

@@ -0,0 +1,304 @@
"""Vassal agents — War, Economy, and Diplomacy.
Vassals are mid-tier agents responsible for a domain of the kingdom.
Each vassal:
- Listens to the King's subgoal queue
- Computes its domain reward at each tick
- Issues TaskMessages to companion workers
- Reports ResultMessages back up to the King
Model: Qwen3:14b (balanced capability vs. latency).
Frequency: up to 4× per campaign day.
Refs: #1097, #1099.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any
from bannerlord.gabs_client import GABSClient, GABSUnavailable
from bannerlord.models import (
DiplomacyReward,
EconomyReward,
KingSubgoal,
ResultMessage,
SubgoalMessage,
TaskMessage,
WarReward,
)
logger = logging.getLogger(__name__)
# Tokens each vassal responds to (all others are ignored)
_WAR_TOKENS = {"EXPAND_TERRITORY", "RAID_ECONOMY", "TRAIN"}
_ECON_TOKENS = {"FORTIFY", "CONSOLIDATE"}
_DIPLO_TOKENS = {"ALLY"}
_LOGISTICS_TOKENS = {"RECRUIT", "HEAL"}
_TRADE_TOKENS = {"TRADE"}
_SCOUT_TOKENS = {"SPY"}
class BaseVassal:
"""Shared vassal lifecycle — subscribes to subgoal queue, runs tick loop."""
name: str = "base_vassal"
def __init__(
self,
gabs_client: GABSClient,
subgoal_queue: asyncio.Queue[SubgoalMessage],
result_queue: asyncio.Queue[ResultMessage] | None = None,
task_queue: asyncio.Queue[TaskMessage] | None = None,
) -> None:
self._gabs = gabs_client
self._subgoal_queue = subgoal_queue
self._result_queue = result_queue or asyncio.Queue()
self._task_queue = task_queue or asyncio.Queue()
self._active_subgoal: KingSubgoal | None = None
self._running = False
@property
def task_queue(self) -> asyncio.Queue[TaskMessage]:
return self._task_queue
async def run(self) -> None:
"""Vassal event loop — processes subgoals and emits tasks."""
self._running = True
logger.info("%s started", self.name)
try:
while self._running:
# Drain all pending subgoals (keep the latest)
try:
while True:
msg = self._subgoal_queue.get_nowait()
if msg.to_agent == self.name:
self._active_subgoal = msg.subgoal
logger.debug(
"%s received subgoal %s", self.name, msg.subgoal.token
)
except asyncio.QueueEmpty:
pass
if self._active_subgoal is not None:
await self._tick(self._active_subgoal)
await asyncio.sleep(0.25) # yield to event loop
except asyncio.CancelledError:
logger.info("%s cancelled", self.name)
raise
finally:
self._running = False
def stop(self) -> None:
self._running = False
async def _tick(self, subgoal: KingSubgoal) -> None:
raise NotImplementedError
async def _get_state(self) -> dict[str, Any]:
try:
return await self._gabs.get_state() or {}
except GABSUnavailable:
return {}
# ── War Vassal ────────────────────────────────────────────────────────────────
class WarVassal(BaseVassal):
"""Military operations — sieges, field battles, raids, defensive maneuvers.
Reward function:
R = 0.40*ΔTerritoryValue + 0.25*ΔArmyStrengthRatio
- 0.20*CasualtyCost - 0.10*SupplyCost + 0.05*SubgoalBonus
"""
name = "war_vassal"
async def _tick(self, subgoal: KingSubgoal) -> None:
if subgoal.token not in _WAR_TOKENS | _LOGISTICS_TOKENS:
return
state = await self._get_state()
reward = self._compute_reward(state, subgoal)
task = self._plan_action(state, subgoal)
if task:
await self._task_queue.put(task)
logger.debug(
"%s tick: subgoal=%s reward=%.3f action=%s",
self.name,
subgoal.token,
reward.total,
task.primitive if task else "none",
)
def _compute_reward(self, state: dict[str, Any], subgoal: KingSubgoal) -> WarReward:
bonus = subgoal.priority * 0.05 if subgoal.token in _WAR_TOKENS else 0.0
return WarReward(
territory_delta=float(state.get("territory_delta", 0.0)),
army_strength_ratio=float(state.get("army_strength_ratio", 1.0)),
casualty_cost=float(state.get("casualty_cost", 0.0)),
supply_cost=float(state.get("supply_cost", 0.0)),
subgoal_bonus=bonus,
)
def _plan_action(
self, state: dict[str, Any], subgoal: KingSubgoal
) -> TaskMessage | None:
if subgoal.token == "EXPAND_TERRITORY" and subgoal.target:
return TaskMessage(
from_agent=self.name,
to_agent="logistics_companion",
primitive="move_party",
args={"destination": subgoal.target},
priority=subgoal.priority,
)
if subgoal.token == "RECRUIT":
qty = subgoal.quantity or 20
return TaskMessage(
from_agent=self.name,
to_agent="logistics_companion",
primitive="recruit_troop",
args={"troop_type": "infantry", "quantity": qty},
priority=subgoal.priority,
)
if subgoal.token == "TRAIN":
return TaskMessage(
from_agent=self.name,
to_agent="logistics_companion",
primitive="upgrade_troops",
args={},
priority=subgoal.priority,
)
return None
# ── Economy Vassal ────────────────────────────────────────────────────────────
class EconomyVassal(BaseVassal):
"""Settlement management, tax collection, construction, food supply.
Reward function:
R = 0.35*DailyDenarsIncome + 0.25*FoodStockBuffer + 0.20*LoyaltyAverage
- 0.15*ConstructionQueueLength + 0.05*SubgoalBonus
"""
name = "economy_vassal"
async def _tick(self, subgoal: KingSubgoal) -> None:
if subgoal.token not in _ECON_TOKENS | _TRADE_TOKENS:
return
state = await self._get_state()
reward = self._compute_reward(state, subgoal)
task = self._plan_action(state, subgoal)
if task:
await self._task_queue.put(task)
logger.debug(
"%s tick: subgoal=%s reward=%.3f",
self.name,
subgoal.token,
reward.total,
)
def _compute_reward(self, state: dict[str, Any], subgoal: KingSubgoal) -> EconomyReward:
bonus = subgoal.priority * 0.05 if subgoal.token in _ECON_TOKENS else 0.0
return EconomyReward(
daily_denars_income=float(state.get("daily_income", 0.0)),
food_stock_buffer=float(state.get("food_days_remaining", 0.0)),
loyalty_average=float(state.get("avg_loyalty", 50.0)),
construction_queue_length=int(state.get("construction_queue", 0)),
subgoal_bonus=bonus,
)
def _plan_action(
self, state: dict[str, Any], subgoal: KingSubgoal
) -> TaskMessage | None:
if subgoal.token == "FORTIFY" and subgoal.target:
return TaskMessage(
from_agent=self.name,
to_agent="logistics_companion",
primitive="build_project",
args={"settlement": subgoal.target},
priority=subgoal.priority,
)
if subgoal.token == "TRADE":
return TaskMessage(
from_agent=self.name,
to_agent="caravan_companion",
primitive="assess_prices",
args={"town": subgoal.target or "nearest"},
priority=subgoal.priority,
)
return None
# ── Diplomacy Vassal ──────────────────────────────────────────────────────────
class DiplomacyVassal(BaseVassal):
"""Relations management — alliances, peace deals, tribute, marriage.
Reward function:
R = 0.30*AlliesCount + 0.25*TruceDurationValue + 0.25*RelationsScoreWeighted
- 0.15*ActiveWarsFront + 0.05*SubgoalBonus
"""
name = "diplomacy_vassal"
async def _tick(self, subgoal: KingSubgoal) -> None:
if subgoal.token not in _DIPLO_TOKENS | _SCOUT_TOKENS:
return
state = await self._get_state()
reward = self._compute_reward(state, subgoal)
task = self._plan_action(state, subgoal)
if task:
await self._task_queue.put(task)
logger.debug(
"%s tick: subgoal=%s reward=%.3f",
self.name,
subgoal.token,
reward.total,
)
def _compute_reward(self, state: dict[str, Any], subgoal: KingSubgoal) -> DiplomacyReward:
bonus = subgoal.priority * 0.05 if subgoal.token in _DIPLO_TOKENS else 0.0
return DiplomacyReward(
allies_count=int(state.get("allies_count", 0)),
truce_duration_value=float(state.get("truce_value", 0.0)),
relations_score_weighted=float(state.get("relations_weighted", 0.0)),
active_wars_front=int(state.get("active_wars", 0)),
subgoal_bonus=bonus,
)
def _plan_action(
self, state: dict[str, Any], subgoal: KingSubgoal
) -> TaskMessage | None:
if subgoal.token == "ALLY" and subgoal.target:
return TaskMessage(
from_agent=self.name,
to_agent="scout_companion",
primitive="track_lord",
args={"name": subgoal.target},
priority=subgoal.priority,
)
if subgoal.token == "SPY" and subgoal.target:
return TaskMessage(
from_agent=self.name,
to_agent="scout_companion",
primitive="assess_garrison",
args={"settlement": subgoal.target},
priority=subgoal.priority,
)
return None

View File

@@ -0,0 +1,201 @@
"""GABS TCP/JSON-RPC client.
Connects to the Bannerlord.GABS C# mod server running on a Windows VM.
Protocol: newline-delimited JSON-RPC 2.0 over raw TCP.
Default host: localhost, port: 4825 (configurable via settings.bannerlord_gabs_host
and settings.bannerlord_gabs_port).
Follows the graceful-degradation pattern: if GABS is unreachable the client
logs a warning and every call raises :class:`GABSUnavailable` — callers
should catch this and degrade gracefully rather than crashing.
Refs: #1091, #1097.
"""
from __future__ import annotations
import asyncio
import json
import logging
from contextlib import asynccontextmanager
from typing import Any
logger = logging.getLogger(__name__)
_DEFAULT_HOST = "localhost"
_DEFAULT_PORT = 4825
_DEFAULT_TIMEOUT = 10.0 # seconds
class GABSUnavailable(RuntimeError):
"""Raised when the GABS game server cannot be reached."""
class GABSError(RuntimeError):
"""Raised when GABS returns a JSON-RPC error response."""
def __init__(self, code: int, message: str) -> None:
super().__init__(f"GABS error {code}: {message}")
self.code = code
class GABSClient:
"""Async TCP JSON-RPC client for Bannerlord.GABS.
Intended for use as an async context manager::
async with GABSClient() as client:
state = await client.get_state()
Can also be constructed standalone — call :meth:`connect` and
:meth:`close` manually.
"""
def __init__(
self,
host: str = _DEFAULT_HOST,
port: int = _DEFAULT_PORT,
timeout: float = _DEFAULT_TIMEOUT,
) -> None:
self._host = host
self._port = port
self._timeout = timeout
self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None
self._seq = 0
self._connected = False
# ── Lifecycle ─────────────────────────────────────────────────────────
async def connect(self) -> None:
"""Open the TCP connection to GABS.
Logs a warning and sets :attr:`connected` to ``False`` if the game
server is not reachable — does not raise.
"""
try:
self._reader, self._writer = await asyncio.wait_for(
asyncio.open_connection(self._host, self._port),
timeout=self._timeout,
)
self._connected = True
logger.info("GABS connected at %s:%s", self._host, self._port)
except (OSError, asyncio.TimeoutError) as exc:
logger.warning(
"GABS unavailable at %s:%s — Bannerlord agent will degrade: %s",
self._host,
self._port,
exc,
)
self._connected = False
async def close(self) -> None:
if self._writer is not None:
try:
self._writer.close()
await self._writer.wait_closed()
except Exception: # noqa: BLE001
pass
self._connected = False
logger.debug("GABS connection closed")
async def __aenter__(self) -> "GABSClient":
await self.connect()
return self
async def __aexit__(self, *_: Any) -> None:
await self.close()
@property
def connected(self) -> bool:
return self._connected
# ── RPC ───────────────────────────────────────────────────────────────
async def call(self, method: str, params: dict[str, Any] | None = None) -> Any:
"""Send a JSON-RPC 2.0 request and return the ``result`` field.
Raises:
GABSUnavailable: if the client is not connected.
GABSError: if the server returns a JSON-RPC error.
"""
if not self._connected or self._reader is None or self._writer is None:
raise GABSUnavailable(
f"GABS not connected (host={self._host}, port={self._port}). "
"Is the Bannerlord VM running?"
)
self._seq += 1
request = {
"jsonrpc": "2.0",
"id": self._seq,
"method": method,
"params": params or {},
}
payload = json.dumps(request) + "\n"
try:
self._writer.write(payload.encode())
await asyncio.wait_for(self._writer.drain(), timeout=self._timeout)
raw = await asyncio.wait_for(
self._reader.readline(), timeout=self._timeout
)
except (OSError, asyncio.TimeoutError) as exc:
self._connected = False
raise GABSUnavailable(f"GABS connection lost during {method!r}: {exc}") from exc
response = json.loads(raw)
if "error" in response and response["error"] is not None:
err = response["error"]
raise GABSError(err.get("code", -1), err.get("message", "unknown"))
return response.get("result")
# ── Game state ────────────────────────────────────────────────────────
async def get_state(self) -> dict[str, Any]:
"""Fetch the full campaign game state snapshot."""
return await self.call("game.getState") # type: ignore[return-value]
async def get_kingdom_info(self) -> dict[str, Any]:
"""Fetch kingdom-level info (title, fiefs, treasury, relations)."""
return await self.call("kingdom.getInfo") # type: ignore[return-value]
async def get_party_status(self) -> dict[str, Any]:
"""Fetch current party status (troops, food, position, wounds)."""
return await self.call("party.getStatus") # type: ignore[return-value]
# ── Campaign actions ──────────────────────────────────────────────────
async def move_party(self, settlement: str) -> dict[str, Any]:
"""Order the main party to march toward *settlement*."""
return await self.call("party.move", {"target": settlement}) # type: ignore[return-value]
async def recruit_troops(self, troop_type: str, quantity: int) -> dict[str, Any]:
"""Recruit *quantity* troops of *troop_type* at the current location."""
return await self.call( # type: ignore[return-value]
"party.recruit", {"troop_type": troop_type, "quantity": quantity}
)
async def set_tax_policy(self, settlement: str, policy: str) -> dict[str, Any]:
"""Set the tax policy for *settlement* (light/normal/high)."""
return await self.call( # type: ignore[return-value]
"settlement.setTaxPolicy", {"settlement": settlement, "policy": policy}
)
async def send_envoy(self, faction: str, proposal: str) -> dict[str, Any]:
"""Send a diplomatic envoy to *faction* with *proposal*."""
return await self.call( # type: ignore[return-value]
"diplomacy.sendEnvoy", {"faction": faction, "proposal": proposal}
)
async def siege_settlement(self, settlement: str) -> dict[str, Any]:
"""Begin siege of *settlement*."""
return await self.call("battle.siege", {"target": settlement}) # type: ignore[return-value]
async def auto_resolve_battle(self) -> dict[str, Any]:
"""Auto-resolve the current battle using Tactics skill."""
return await self.call("battle.autoResolve") # type: ignore[return-value]

260
src/bannerlord/ledger.py Normal file
View File

@@ -0,0 +1,260 @@
"""Asset ledger for the Bannerlord sovereign agent.
Tracks kingdom assets (denars, settlements, troop allocations) in an
in-memory dict backed by SQLite for persistence. Follows the existing
SQLite migration pattern in this repo.
The King has exclusive write access to treasury and settlement ownership.
Vassals receive an allocated budget and cannot exceed it without King
re-authorization. Companions hold only work-in-progress quotas.
Refs: #1097, #1099.
"""
from __future__ import annotations
import logging
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
from typing import Iterator
logger = logging.getLogger(__name__)
_DEFAULT_DB = Path.home() / ".timmy" / "bannerlord" / "ledger.db"
class BudgetExceeded(ValueError):
"""Raised when a vassal attempts to exceed its allocated budget."""
class Ledger:
"""Sovereign asset ledger backed by SQLite.
Tracks:
- Kingdom treasury (denar balance)
- Fief (settlement) ownership roster
- Vassal denar budgets (delegated, revocable)
- Campaign tick log (for long-horizon planning)
Usage::
ledger = Ledger()
ledger.initialize()
ledger.deposit(5000, "tax income — Epicrotea")
ledger.allocate_budget("war_vassal", 2000)
"""
def __init__(self, db_path: Path = _DEFAULT_DB) -> None:
self._db_path = db_path
self._db_path.parent.mkdir(parents=True, exist_ok=True)
# ── Setup ─────────────────────────────────────────────────────────────
def initialize(self) -> None:
"""Create tables if they don't exist."""
with self._conn() as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS treasury (
id INTEGER PRIMARY KEY CHECK (id = 1),
balance REAL NOT NULL DEFAULT 0
);
INSERT OR IGNORE INTO treasury (id, balance) VALUES (1, 0);
CREATE TABLE IF NOT EXISTS fiefs (
name TEXT PRIMARY KEY,
fief_type TEXT NOT NULL, -- town / castle / village
acquired_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS vassal_budgets (
agent TEXT PRIMARY KEY,
allocated REAL NOT NULL DEFAULT 0,
spent REAL NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS tick_log (
tick INTEGER PRIMARY KEY,
campaign_day INTEGER NOT NULL,
subgoal TEXT,
reward_war REAL,
reward_econ REAL,
reward_diplo REAL,
logged_at TEXT NOT NULL
);
"""
)
logger.debug("Ledger initialized at %s", self._db_path)
# ── Treasury ──────────────────────────────────────────────────────────
def balance(self) -> float:
with self._conn() as conn:
row = conn.execute("SELECT balance FROM treasury WHERE id = 1").fetchone()
return float(row[0]) if row else 0.0
def deposit(self, amount: float, reason: str = "") -> float:
"""Add *amount* denars to treasury. Returns new balance."""
if amount < 0:
raise ValueError("Use withdraw() for negative amounts")
with self._conn() as conn:
conn.execute(
"UPDATE treasury SET balance = balance + ? WHERE id = 1", (amount,)
)
bal = self.balance()
logger.info("Treasury +%.0f denars (%s) → balance %.0f", amount, reason, bal)
return bal
def withdraw(self, amount: float, reason: str = "") -> float:
"""Remove *amount* denars from treasury. Returns new balance."""
if amount < 0:
raise ValueError("Amount must be positive")
bal = self.balance()
if amount > bal:
raise BudgetExceeded(
f"Cannot withdraw {amount:.0f} denars — treasury balance is only {bal:.0f}"
)
with self._conn() as conn:
conn.execute(
"UPDATE treasury SET balance = balance - ? WHERE id = 1", (amount,)
)
new_bal = self.balance()
logger.info("Treasury -%.0f denars (%s) → balance %.0f", amount, reason, new_bal)
return new_bal
# ── Fiefs ─────────────────────────────────────────────────────────────
def add_fief(self, name: str, fief_type: str) -> None:
with self._conn() as conn:
conn.execute(
"INSERT OR REPLACE INTO fiefs (name, fief_type, acquired_at) VALUES (?, ?, ?)",
(name, fief_type, datetime.utcnow().isoformat()),
)
logger.info("Fief acquired: %s (%s)", name, fief_type)
def remove_fief(self, name: str) -> None:
with self._conn() as conn:
conn.execute("DELETE FROM fiefs WHERE name = ?", (name,))
logger.info("Fief lost: %s", name)
def list_fiefs(self) -> list[dict[str, str]]:
with self._conn() as conn:
rows = conn.execute("SELECT name, fief_type, acquired_at FROM fiefs").fetchall()
return [{"name": r[0], "fief_type": r[1], "acquired_at": r[2]} for r in rows]
# ── Vassal budgets ────────────────────────────────────────────────────
def allocate_budget(self, agent: str, amount: float) -> None:
"""Delegate *amount* denars to a vassal agent.
Withdraws from treasury. Raises :class:`BudgetExceeded` if
the treasury cannot cover the allocation.
"""
self.withdraw(amount, reason=f"budget → {agent}")
with self._conn() as conn:
conn.execute(
"""
INSERT INTO vassal_budgets (agent, allocated, spent)
VALUES (?, ?, 0)
ON CONFLICT(agent) DO UPDATE SET allocated = allocated + excluded.allocated
""",
(agent, amount),
)
logger.info("Allocated %.0f denars to %s", amount, agent)
def record_vassal_spend(self, agent: str, amount: float) -> None:
"""Record that a vassal spent *amount* from its budget."""
with self._conn() as conn:
row = conn.execute(
"SELECT allocated, spent FROM vassal_budgets WHERE agent = ?", (agent,)
).fetchone()
if row is None:
raise BudgetExceeded(f"{agent} has no allocated budget")
allocated, spent = row
if spent + amount > allocated:
raise BudgetExceeded(
f"{agent} budget exhausted: {spent:.0f}/{allocated:.0f} spent, "
f"requested {amount:.0f}"
)
with self._conn() as conn:
conn.execute(
"UPDATE vassal_budgets SET spent = spent + ? WHERE agent = ?",
(amount, agent),
)
def vassal_remaining(self, agent: str) -> float:
with self._conn() as conn:
row = conn.execute(
"SELECT allocated - spent FROM vassal_budgets WHERE agent = ?", (agent,)
).fetchone()
return float(row[0]) if row else 0.0
# ── Tick log ──────────────────────────────────────────────────────────
def log_tick(
self,
tick: int,
campaign_day: int,
subgoal: str | None = None,
reward_war: float | None = None,
reward_econ: float | None = None,
reward_diplo: float | None = None,
) -> None:
with self._conn() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO tick_log
(tick, campaign_day, subgoal, reward_war, reward_econ, reward_diplo, logged_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
tick,
campaign_day,
subgoal,
reward_war,
reward_econ,
reward_diplo,
datetime.utcnow().isoformat(),
),
)
def tick_history(self, last_n: int = 100) -> list[dict]:
with self._conn() as conn:
rows = conn.execute(
"""
SELECT tick, campaign_day, subgoal, reward_war, reward_econ, reward_diplo, logged_at
FROM tick_log
ORDER BY tick DESC
LIMIT ?
""",
(last_n,),
).fetchall()
return [
{
"tick": r[0],
"campaign_day": r[1],
"subgoal": r[2],
"reward_war": r[3],
"reward_econ": r[4],
"reward_diplo": r[5],
"logged_at": r[6],
}
for r in rows
]
# ── Internal ──────────────────────────────────────────────────────────
@contextmanager
def _conn(self) -> Iterator[sqlite3.Connection]:
conn = sqlite3.connect(self._db_path)
conn.execute("PRAGMA journal_mode=WAL")
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()

200
src/bannerlord/models.py Normal file
View File

@@ -0,0 +1,200 @@
"""Bannerlord feudal hierarchy data models.
All inter-agent communication uses typed Pydantic models. No raw dicts
cross agent boundaries — every message is validated at construction time.
Design: Ahilan & Dayan (2019) Feudal Multi-Agent Hierarchies.
Refs: #1097, #1099.
"""
from __future__ import annotations
from datetime import datetime
from typing import Any, Literal
from pydantic import BaseModel, Field
# ── Subgoal vocabulary ────────────────────────────────────────────────────────
SUBGOAL_TOKENS = frozenset(
{
"EXPAND_TERRITORY", # Take or secure a fief — War Vassal
"RAID_ECONOMY", # Raid enemy villages for denars — War Vassal
"FORTIFY", # Upgrade or repair a settlement — Economy Vassal
"RECRUIT", # Fill party to capacity — Logistics Companion
"TRADE", # Execute profitable trade route — Caravan Companion
"ALLY", # Pursue non-aggression / alliance — Diplomacy Vassal
"SPY", # Gain information on target faction — Scout Companion
"HEAL", # Rest party until wounds recovered — Logistics Companion
"CONSOLIDATE", # Hold territory, no expansion — Economy Vassal
"TRAIN", # Level troops via auto-resolve bandits — War Vassal
}
)
# ── King subgoal ──────────────────────────────────────────────────────────────
class KingSubgoal(BaseModel):
"""Strategic directive issued by the King agent to vassals.
The King operates on campaign-map timescale (days to weeks of in-game
time). His sole output is one subgoal token plus optional parameters.
He never micro-manages primitives.
"""
token: str = Field(..., description="One of SUBGOAL_TOKENS")
target: str | None = Field(
None, description="Named target (settlement, lord, faction)"
)
quantity: int | None = Field(None, description="For RECRUIT, TRADE tokens", ge=1)
priority: float = Field(
1.0, ge=0.0, le=2.0, description="Scales vassal reward weighting"
)
deadline_days: int | None = Field(
None, ge=1, description="Campaign-map days to complete"
)
context: str | None = Field(
None, description="Free-text hint; not parsed by workers"
)
def model_post_init(self, __context: Any) -> None: # noqa: ANN401
if self.token not in SUBGOAL_TOKENS:
raise ValueError(
f"Unknown subgoal token {self.token!r}. "
f"Must be one of: {sorted(SUBGOAL_TOKENS)}"
)
# ── Inter-agent messages ──────────────────────────────────────────────────────
class SubgoalMessage(BaseModel):
"""King → Vassal direction."""
msg_type: Literal["subgoal"] = "subgoal"
from_agent: Literal["king"] = "king"
to_agent: str = Field(..., description="e.g. 'war_vassal', 'economy_vassal'")
subgoal: KingSubgoal
issued_at: datetime = Field(default_factory=datetime.utcnow)
class TaskMessage(BaseModel):
"""Vassal → Companion direction."""
msg_type: Literal["task"] = "task"
from_agent: str = Field(..., description="e.g. 'war_vassal'")
to_agent: str = Field(..., description="e.g. 'logistics_companion'")
primitive: str = Field(..., description="One of the companion primitives")
args: dict[str, Any] = Field(default_factory=dict)
priority: float = Field(1.0, ge=0.0, le=2.0)
issued_at: datetime = Field(default_factory=datetime.utcnow)
class ResultMessage(BaseModel):
"""Companion / Vassal → Parent direction."""
msg_type: Literal["result"] = "result"
from_agent: str
to_agent: str
success: bool
outcome: dict[str, Any] = Field(default_factory=dict, description="Primitive-specific result")
reward_delta: float = Field(0.0, description="Computed reward contribution")
completed_at: datetime = Field(default_factory=datetime.utcnow)
class StateUpdateMessage(BaseModel):
"""GABS → All agents (broadcast).
Sent every campaign tick. Agents consume at their own cadence.
"""
msg_type: Literal["state"] = "state"
game_state: dict[str, Any] = Field(..., description="Full GABS state snapshot")
tick: int = Field(..., ge=0)
timestamp: datetime = Field(default_factory=datetime.utcnow)
# ── Reward snapshots ──────────────────────────────────────────────────────────
class WarReward(BaseModel):
"""Computed reward for the War Vassal at a given tick."""
territory_delta: float = 0.0
army_strength_ratio: float = 1.0
casualty_cost: float = 0.0
supply_cost: float = 0.0
subgoal_bonus: float = 0.0
@property
def total(self) -> float:
w1, w2, w3, w4, w5 = 0.40, 0.25, 0.20, 0.10, 0.05
return (
w1 * self.territory_delta
+ w2 * self.army_strength_ratio
- w3 * self.casualty_cost
- w4 * self.supply_cost
+ w5 * self.subgoal_bonus
)
class EconomyReward(BaseModel):
"""Computed reward for the Economy Vassal at a given tick."""
daily_denars_income: float = 0.0
food_stock_buffer: float = 0.0
loyalty_average: float = 50.0
construction_queue_length: int = 0
subgoal_bonus: float = 0.0
@property
def total(self) -> float:
w1, w2, w3, w4, w5 = 0.35, 0.25, 0.20, 0.15, 0.05
return (
w1 * self.daily_denars_income
+ w2 * self.food_stock_buffer
+ w3 * self.loyalty_average
- w4 * self.construction_queue_length
+ w5 * self.subgoal_bonus
)
class DiplomacyReward(BaseModel):
"""Computed reward for the Diplomacy Vassal at a given tick."""
allies_count: int = 0
truce_duration_value: float = 0.0
relations_score_weighted: float = 0.0
active_wars_front: int = 0
subgoal_bonus: float = 0.0
@property
def total(self) -> float:
w1, w2, w3, w4, w5 = 0.30, 0.25, 0.25, 0.15, 0.05
return (
w1 * self.allies_count
+ w2 * self.truce_duration_value
+ w3 * self.relations_score_weighted
- w4 * self.active_wars_front
+ w5 * self.subgoal_bonus
)
# ── Victory condition ─────────────────────────────────────────────────────────
class VictoryCondition(BaseModel):
"""Sovereign Victory (M5) — evaluated each campaign tick."""
holds_king_title: bool = False
territory_control_pct: float = Field(
0.0, ge=0.0, le=100.0, description="% of Calradia fiefs held"
)
majority_threshold: float = Field(
51.0, ge=0.0, le=100.0, description="Required % for majority control"
)
@property
def achieved(self) -> bool:
return self.holds_king_title and self.territory_control_pct >= self.majority_threshold

View File

View File

@@ -0,0 +1,307 @@
"""Unit tests for bannerlord agents — King, Vassals, Companions."""
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from bannerlord.agents.companions import (
CaravanCompanion,
LogisticsCompanion,
ScoutCompanion,
)
from bannerlord.agents.king import KingAgent
from bannerlord.agents.vassals import DiplomacyVassal, EconomyVassal, WarVassal
from bannerlord.gabs_client import GABSClient, GABSUnavailable
from bannerlord.ledger import Ledger
from bannerlord.models import (
KingSubgoal,
ResultMessage,
SubgoalMessage,
TaskMessage,
VictoryCondition,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _mock_gabs(state: dict | None = None) -> GABSClient:
"""Return a disconnected GABS stub that returns *state* from get_state."""
gabs = MagicMock(spec=GABSClient)
gabs.connected = False
if state is not None:
gabs.get_state = AsyncMock(return_value=state)
else:
gabs.get_state = AsyncMock(side_effect=GABSUnavailable("no game"))
gabs.call = AsyncMock(return_value={})
gabs.recruit_troops = AsyncMock(return_value={"recruited": 10})
gabs.move_party = AsyncMock(return_value={"moving": True})
return gabs
def _mock_ledger(tmp_path) -> Ledger:
ledger = Ledger(db_path=tmp_path / "ledger.db")
ledger.initialize()
return ledger
# ── King agent ────────────────────────────────────────────────────────────────
class TestKingAgent:
async def test_victory_detected(self, tmp_path):
"""Campaign stops immediately when victory condition is met."""
gabs = _mock_gabs({"player_title": "King", "territory_control_pct": 55.0})
ledger = _mock_ledger(tmp_path)
king = KingAgent(gabs_client=gabs, ledger=ledger, tick_interval=0)
victory = await king.run_campaign(max_ticks=10)
assert victory.achieved
async def test_max_ticks_respected(self, tmp_path):
"""Campaign stops after max_ticks when victory not yet achieved."""
gabs = _mock_gabs({"player_title": "Lord", "territory_control_pct": 10.0})
ledger = _mock_ledger(tmp_path)
# Patch LLM to return a valid subgoal without calling Ollama
king = KingAgent(gabs_client=gabs, ledger=ledger, tick_interval=0)
with patch.object(king, "_decide", AsyncMock(return_value=KingSubgoal(token="RECRUIT"))):
victory = await king.run_campaign(max_ticks=3)
assert not victory.achieved
assert king._tick == 3
async def test_llm_failure_falls_back_to_recruit(self, tmp_path):
"""If LLM fails, King defaults to RECRUIT subgoal."""
gabs = _mock_gabs({"player_title": "Lord", "territory_control_pct": 5.0})
ledger = _mock_ledger(tmp_path)
king = KingAgent(gabs_client=gabs, ledger=ledger, tick_interval=0)
with patch.object(king, "_llm_decide", side_effect=RuntimeError("Ollama down")):
subgoal = await king._decide({})
assert subgoal.token == "RECRUIT"
async def test_subgoal_broadcast_to_all_vassals(self, tmp_path):
"""King broadcasts subgoal to all three vassals."""
gabs = _mock_gabs({})
ledger = _mock_ledger(tmp_path)
king = KingAgent(gabs_client=gabs, ledger=ledger)
subgoal = KingSubgoal(token="EXPAND_TERRITORY", target="Epicrotea")
await king._broadcast_subgoal(subgoal)
messages = []
while not king.subgoal_queue.empty():
messages.append(king.subgoal_queue.get_nowait())
assert len(messages) == 3
recipients = {m.to_agent for m in messages}
assert recipients == {"war_vassal", "economy_vassal", "diplomacy_vassal"}
async def test_gabs_unavailable_uses_empty_state(self, tmp_path):
"""King handles GABS being offline gracefully."""
gabs = _mock_gabs() # raises GABSUnavailable
ledger = _mock_ledger(tmp_path)
king = KingAgent(gabs_client=gabs, ledger=ledger)
state = await king._fetch_state()
assert state == {}
def test_evaluate_victory_king_with_majority(self, tmp_path):
gabs = _mock_gabs()
ledger = _mock_ledger(tmp_path)
king = KingAgent(gabs_client=gabs, ledger=ledger)
v = king._evaluate_victory({"player_title": "King", "territory_control_pct": 60.0})
assert v.achieved
def test_evaluate_victory_not_king(self, tmp_path):
gabs = _mock_gabs()
ledger = _mock_ledger(tmp_path)
king = KingAgent(gabs_client=gabs, ledger=ledger)
v = king._evaluate_victory({"player_title": "Lord", "territory_control_pct": 80.0})
assert not v.achieved
# ── Vassals ───────────────────────────────────────────────────────────────────
class TestWarVassal:
async def test_expand_territory_emits_move_task(self):
gabs = _mock_gabs({"territory_delta": 1.0, "army_strength_ratio": 1.5})
queue = asyncio.Queue()
vassal = WarVassal(gabs_client=gabs, subgoal_queue=queue)
subgoal = KingSubgoal(token="EXPAND_TERRITORY", target="Seonon")
await vassal._tick(subgoal)
task: TaskMessage = vassal.task_queue.get_nowait()
assert task.primitive == "move_party"
assert task.args["destination"] == "Seonon"
async def test_recruit_emits_recruit_task(self):
gabs = _mock_gabs({})
queue = asyncio.Queue()
vassal = WarVassal(gabs_client=gabs, subgoal_queue=queue)
subgoal = KingSubgoal(token="RECRUIT", quantity=15)
await vassal._tick(subgoal)
task: TaskMessage = vassal.task_queue.get_nowait()
assert task.primitive == "recruit_troop"
assert task.args["quantity"] == 15
async def test_irrelevant_token_emits_no_task(self):
gabs = _mock_gabs({})
queue = asyncio.Queue()
vassal = WarVassal(gabs_client=gabs, subgoal_queue=queue)
subgoal = KingSubgoal(token="ALLY")
await vassal._tick(subgoal)
assert vassal.task_queue.empty()
class TestEconomyVassal:
async def test_fortify_emits_build_task(self):
gabs = _mock_gabs({"daily_income": 200.0})
queue = asyncio.Queue()
vassal = EconomyVassal(gabs_client=gabs, subgoal_queue=queue)
subgoal = KingSubgoal(token="FORTIFY", target="Epicrotea")
await vassal._tick(subgoal)
task: TaskMessage = vassal.task_queue.get_nowait()
assert task.primitive == "build_project"
assert task.args["settlement"] == "Epicrotea"
async def test_trade_emits_assess_prices(self):
gabs = _mock_gabs({})
queue = asyncio.Queue()
vassal = EconomyVassal(gabs_client=gabs, subgoal_queue=queue)
subgoal = KingSubgoal(token="TRADE", target="Pravend")
await vassal._tick(subgoal)
task: TaskMessage = vassal.task_queue.get_nowait()
assert task.primitive == "assess_prices"
class TestDiplomacyVassal:
async def test_ally_emits_track_lord(self):
gabs = _mock_gabs({"allies_count": 1})
queue = asyncio.Queue()
vassal = DiplomacyVassal(gabs_client=gabs, subgoal_queue=queue)
subgoal = KingSubgoal(token="ALLY", target="Derthert")
await vassal._tick(subgoal)
task: TaskMessage = vassal.task_queue.get_nowait()
assert task.primitive == "track_lord"
assert task.args["name"] == "Derthert"
async def test_spy_emits_assess_garrison(self):
gabs = _mock_gabs({})
queue = asyncio.Queue()
vassal = DiplomacyVassal(gabs_client=gabs, subgoal_queue=queue)
subgoal = KingSubgoal(token="SPY", target="Marunath")
await vassal._tick(subgoal)
task: TaskMessage = vassal.task_queue.get_nowait()
assert task.primitive == "assess_garrison"
assert task.args["settlement"] == "Marunath"
# ── Companions ────────────────────────────────────────────────────────────────
class TestLogisticsCompanion:
async def test_recruit_troop(self):
gabs = _mock_gabs()
gabs.recruit_troops = AsyncMock(return_value={"recruited": 10, "type": "infantry"})
q: asyncio.Queue[TaskMessage] = asyncio.Queue()
comp = LogisticsCompanion(gabs_client=gabs, task_queue=q)
task = TaskMessage(
from_agent="war_vassal",
to_agent="logistics_companion",
primitive="recruit_troop",
args={"troop_type": "infantry", "quantity": 10},
)
result = await comp._execute(task)
assert result.success is True
assert result.outcome["recruited"] == 10
async def test_unknown_primitive_fails_gracefully(self):
gabs = _mock_gabs()
q: asyncio.Queue[TaskMessage] = asyncio.Queue()
comp = LogisticsCompanion(gabs_client=gabs, task_queue=q)
task = TaskMessage(
from_agent="war_vassal",
to_agent="logistics_companion",
primitive="launch_nukes",
args={},
)
result = await comp._execute(task)
assert result.success is False
assert "Unknown primitive" in result.outcome["error"]
async def test_gabs_unavailable_returns_failure(self):
gabs = _mock_gabs()
gabs.recruit_troops = AsyncMock(side_effect=GABSUnavailable("offline"))
q: asyncio.Queue[TaskMessage] = asyncio.Queue()
comp = LogisticsCompanion(gabs_client=gabs, task_queue=q)
task = TaskMessage(
from_agent="war_vassal",
to_agent="logistics_companion",
primitive="recruit_troop",
args={"troop_type": "infantry", "quantity": 5},
)
result = await comp._execute(task)
assert result.success is False
class TestCaravanCompanion:
async def test_assess_prices(self):
gabs = _mock_gabs()
gabs.call = AsyncMock(return_value={"grain": 12, "linen": 45})
q: asyncio.Queue[TaskMessage] = asyncio.Queue()
comp = CaravanCompanion(gabs_client=gabs, task_queue=q)
task = TaskMessage(
from_agent="economy_vassal",
to_agent="caravan_companion",
primitive="assess_prices",
args={"town": "Pravend"},
)
result = await comp._execute(task)
assert result.success is True
async def test_abandon_route(self):
gabs = _mock_gabs()
gabs.call = AsyncMock(return_value={"abandoned": True})
q: asyncio.Queue[TaskMessage] = asyncio.Queue()
comp = CaravanCompanion(gabs_client=gabs, task_queue=q)
task = TaskMessage(
from_agent="economy_vassal",
to_agent="caravan_companion",
primitive="abandon_route",
args={},
)
result = await comp._execute(task)
assert result.success is True
assert result.outcome["abandoned"] is True
class TestScoutCompanion:
async def test_assess_garrison(self):
gabs = _mock_gabs()
gabs.call = AsyncMock(return_value={"garrison_size": 120, "settlement": "Marunath"})
q: asyncio.Queue[TaskMessage] = asyncio.Queue()
comp = ScoutCompanion(gabs_client=gabs, task_queue=q)
task = TaskMessage(
from_agent="diplomacy_vassal",
to_agent="scout_companion",
primitive="assess_garrison",
args={"settlement": "Marunath"},
)
result = await comp._execute(task)
assert result.success is True
assert result.outcome["garrison_size"] == 120
async def test_report_intel(self):
gabs = _mock_gabs()
gabs.call = AsyncMock(return_value={"intel": ["Derthert at Epicrotea"]})
q: asyncio.Queue[TaskMessage] = asyncio.Queue()
comp = ScoutCompanion(gabs_client=gabs, task_queue=q)
task = TaskMessage(
from_agent="diplomacy_vassal",
to_agent="scout_companion",
primitive="report_intel",
args={},
)
result = await comp._execute(task)
assert result.success is True

View File

@@ -0,0 +1,147 @@
"""Unit tests for bannerlord.gabs_client — TCP JSON-RPC client."""
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from bannerlord.gabs_client import GABSClient, GABSError, GABSUnavailable
# ── Connection ────────────────────────────────────────────────────────────────
class TestGABSClientConnection:
async def test_connect_success(self):
mock_reader = AsyncMock()
mock_writer = MagicMock()
mock_writer.close = MagicMock()
mock_writer.wait_closed = AsyncMock()
with patch(
"bannerlord.gabs_client.asyncio.open_connection",
return_value=(mock_reader, mock_writer),
):
client = GABSClient()
await client.connect()
assert client.connected is True
await client.close()
async def test_connect_failure_degrades_gracefully(self):
with patch(
"bannerlord.gabs_client.asyncio.open_connection",
side_effect=OSError("Connection refused"),
):
client = GABSClient()
await client.connect() # must not raise
assert client.connected is False
async def test_connect_timeout_degrades_gracefully(self):
with patch(
"bannerlord.gabs_client.asyncio.open_connection",
side_effect=asyncio.TimeoutError(),
):
client = GABSClient()
await client.connect()
assert client.connected is False
async def test_context_manager(self):
mock_reader = AsyncMock()
mock_writer = MagicMock()
mock_writer.close = MagicMock()
mock_writer.wait_closed = AsyncMock()
with patch(
"bannerlord.gabs_client.asyncio.open_connection",
return_value=(mock_reader, mock_writer),
):
async with GABSClient() as client:
assert client.connected is True
assert client.connected is False
# ── RPC ───────────────────────────────────────────────────────────────────────
class TestGABSClientRPC:
def _make_connected_client(self, response_data: dict):
"""Return a client with mocked reader/writer."""
client = GABSClient()
client._connected = True
raw_response = json.dumps(response_data) + "\n"
client._reader = AsyncMock()
client._reader.readline = AsyncMock(return_value=raw_response.encode())
client._writer = MagicMock()
client._writer.write = MagicMock()
client._writer.drain = AsyncMock()
return client
async def test_call_returns_result(self):
client = self._make_connected_client({"jsonrpc": "2.0", "id": 1, "result": {"foo": "bar"}})
result = await client.call("game.getState")
assert result == {"foo": "bar"}
async def test_call_raises_on_error(self):
client = self._make_connected_client(
{"jsonrpc": "2.0", "id": 1, "error": {"code": -32601, "message": "Method not found"}}
)
with pytest.raises(GABSError, match="Method not found"):
await client.call("game.nonexistent")
async def test_call_raises_unavailable_when_not_connected(self):
client = GABSClient()
assert client.connected is False
with pytest.raises(GABSUnavailable):
await client.call("game.getState")
async def test_sequence_increments(self):
client = self._make_connected_client({"jsonrpc": "2.0", "id": 1, "result": {}})
await client.call("game.getState")
assert client._seq == 1
client._reader.readline = AsyncMock(
return_value=(json.dumps({"jsonrpc": "2.0", "id": 2, "result": {}}) + "\n").encode()
)
await client.call("game.getState")
assert client._seq == 2
async def test_get_state_calls_correct_method(self):
client = self._make_connected_client(
{"jsonrpc": "2.0", "id": 1, "result": {"campaign_day": 10}}
)
result = await client.get_state()
written = client._writer.write.call_args[0][0].decode()
payload = json.loads(written.strip())
assert payload["method"] == "game.getState"
assert result == {"campaign_day": 10}
async def test_move_party_sends_target(self):
client = self._make_connected_client(
{"jsonrpc": "2.0", "id": 1, "result": {"moving": True}}
)
await client.move_party("Epicrotea")
written = client._writer.write.call_args[0][0].decode()
payload = json.loads(written.strip())
assert payload["method"] == "party.move"
assert payload["params"]["target"] == "Epicrotea"
async def test_connection_lost_marks_disconnected(self):
client = GABSClient()
client._connected = True
client._reader = AsyncMock()
client._reader.readline = AsyncMock(side_effect=OSError("connection reset"))
client._writer = MagicMock()
client._writer.write = MagicMock()
client._writer.drain = AsyncMock()
with pytest.raises(GABSUnavailable):
await client.call("game.getState")
assert client.connected is False

View File

@@ -0,0 +1,190 @@
"""Unit tests for bannerlord.models — data contracts and reward functions."""
import pytest
from bannerlord.models import (
SUBGOAL_TOKENS,
DiplomacyReward,
EconomyReward,
KingSubgoal,
ResultMessage,
StateUpdateMessage,
SubgoalMessage,
TaskMessage,
VictoryCondition,
WarReward,
)
# ── KingSubgoal ───────────────────────────────────────────────────────────────
class TestKingSubgoal:
def test_valid_token(self):
s = KingSubgoal(token="EXPAND_TERRITORY", target="Epicrotea")
assert s.token == "EXPAND_TERRITORY"
assert s.target == "Epicrotea"
assert s.priority == 1.0
def test_all_tokens_valid(self):
for token in SUBGOAL_TOKENS:
KingSubgoal(token=token)
def test_invalid_token_raises(self):
with pytest.raises(ValueError, match="Unknown subgoal token"):
KingSubgoal(token="NUKE_CALRADIA")
def test_priority_clamp(self):
with pytest.raises(Exception):
KingSubgoal(token="TRADE", priority=3.0)
def test_optional_fields_default_none(self):
s = KingSubgoal(token="HEAL")
assert s.target is None
assert s.quantity is None
assert s.deadline_days is None
assert s.context is None
# ── Messages ──────────────────────────────────────────────────────────────────
class TestSubgoalMessage:
def test_defaults(self):
msg = SubgoalMessage(
to_agent="war_vassal",
subgoal=KingSubgoal(token="RAID_ECONOMY"),
)
assert msg.msg_type == "subgoal"
assert msg.from_agent == "king"
assert msg.to_agent == "war_vassal"
assert msg.issued_at is not None
def test_subgoal_roundtrip(self):
subgoal = KingSubgoal(token="RECRUIT", quantity=30, priority=1.5)
msg = SubgoalMessage(to_agent="war_vassal", subgoal=subgoal)
assert msg.subgoal.quantity == 30
assert msg.subgoal.priority == 1.5
class TestTaskMessage:
def test_construction(self):
t = TaskMessage(
from_agent="war_vassal",
to_agent="logistics_companion",
primitive="recruit_troop",
args={"troop_type": "cavalry", "quantity": 5},
priority=1.2,
)
assert t.msg_type == "task"
assert t.primitive == "recruit_troop"
assert t.args["quantity"] == 5
class TestResultMessage:
def test_success(self):
r = ResultMessage(
from_agent="logistics_companion",
to_agent="war_vassal",
success=True,
outcome={"recruited": 10},
reward_delta=0.15,
)
assert r.success is True
assert r.reward_delta == 0.15
def test_failure(self):
r = ResultMessage(
from_agent="scout_companion",
to_agent="diplomacy_vassal",
success=False,
outcome={"error": "GABS unavailable"},
)
assert r.success is False
assert r.reward_delta == 0.0
class TestStateUpdateMessage:
def test_construction(self):
msg = StateUpdateMessage(
game_state={"campaign_day": 42, "player_title": "Lord"},
tick=42,
)
assert msg.msg_type == "state"
assert msg.tick == 42
assert msg.game_state["campaign_day"] == 42
# ── Reward functions ──────────────────────────────────────────────────────────
class TestWarReward:
def test_positive_expansion(self):
r = WarReward(territory_delta=2.0, army_strength_ratio=1.2, subgoal_bonus=0.1)
assert r.total > 0
def test_casualty_cost_penalizes(self):
no_cost = WarReward(territory_delta=1.0, army_strength_ratio=1.0)
with_cost = WarReward(territory_delta=1.0, army_strength_ratio=1.0, casualty_cost=5.0)
assert with_cost.total < no_cost.total
def test_zero_state(self):
r = WarReward()
# army_strength_ratio default 1.0, rest 0 → 0.25 * 1.0 = 0.25
assert abs(r.total - 0.25) < 1e-9
class TestEconomyReward:
def test_income_positive(self):
r = EconomyReward(daily_denars_income=100.0, food_stock_buffer=7.0, loyalty_average=80.0)
assert r.total > 0
def test_construction_queue_penalizes(self):
no_queue = EconomyReward(daily_denars_income=50.0)
long_queue = EconomyReward(daily_denars_income=50.0, construction_queue_length=10)
assert long_queue.total < no_queue.total
def test_loyalty_contributes(self):
low_loyalty = EconomyReward(loyalty_average=10.0)
high_loyalty = EconomyReward(loyalty_average=90.0)
assert high_loyalty.total > low_loyalty.total
class TestDiplomacyReward:
def test_allies_positive(self):
r = DiplomacyReward(allies_count=3)
assert r.total > 0
def test_active_wars_penalizes(self):
peace = DiplomacyReward(allies_count=2)
war = DiplomacyReward(allies_count=2, active_wars_front=4)
assert war.total < peace.total
# ── Victory condition ─────────────────────────────────────────────────────────
class TestVictoryCondition:
def test_not_achieved_without_title(self):
v = VictoryCondition(holds_king_title=False, territory_control_pct=70.0)
assert not v.achieved
def test_not_achieved_without_majority(self):
v = VictoryCondition(holds_king_title=True, territory_control_pct=40.0)
assert not v.achieved
def test_achieved_when_king_with_majority(self):
v = VictoryCondition(holds_king_title=True, territory_control_pct=55.0)
assert v.achieved
def test_exact_threshold(self):
v = VictoryCondition(holds_king_title=True, territory_control_pct=51.0)
assert v.achieved
def test_custom_threshold(self):
v = VictoryCondition(
holds_king_title=True,
territory_control_pct=70.0,
majority_threshold=75.0,
)
assert not v.achieved