diff --git a/docs/issue-1097-bannerlord-m5-response.md b/docs/issue-1097-bannerlord-m5-response.md new file mode 100644 index 00000000..2be94ad0 --- /dev/null +++ b/docs/issue-1097-bannerlord-m5-response.md @@ -0,0 +1,100 @@ +# Issue #1097 — Bannerlord M5 Sovereign Victory: Implementation + +**Date:** 2026-03-23 +**Status:** Python stack implemented — game infrastructure pending + +## Summary + +Issue #1097 is the final milestone of Project Bannerlord (#1091): Timmy holds +the title of King with majority territory control through pure local strategy. + +This PR implements the Python-side sovereign victory stack (`src/bannerlord/`). +The game-side infrastructure (Windows VM, GABS C# mod) remains external to this +repository, consistent with the scope decision on M4 (#1096). + +## What was implemented + +### `src/bannerlord/` package + +| Module | Purpose | +|--------|---------| +| `models.py` | Pydantic data contracts — KingSubgoal, SubgoalMessage, TaskMessage, ResultMessage, StateUpdateMessage, reward functions, VictoryCondition | +| `gabs_client.py` | Async TCP JSON-RPC client for Bannerlord.GABS (port 4825), graceful degradation when game server is offline | +| `ledger.py` | SQLite-backed asset ledger — treasury, fiefs, vassal budgets, campaign tick log | +| `agents/king.py` | King agent — Qwen3:32b, 1× per campaign day, sovereign campaign loop, victory detection, subgoal broadcast | +| `agents/vassals.py` | War / Economy / Diplomacy vassals — Qwen3:14b, domain reward functions, primitive dispatch | +| `agents/companions.py` | Logistics / Caravan / Scout companions — event-driven, primitive execution against GABS | + +### `tests/unit/test_bannerlord/` — 56 unit tests + +- `test_models.py` — Pydantic validation, reward math, victory condition logic +- `test_gabs_client.py` — Connection lifecycle, RPC dispatch, error handling, graceful degradation +- `test_agents.py` — King campaign loop, vassal subgoal routing, companion primitive execution + +All 56 tests pass. + +## Architecture + +``` +KingAgent (Qwen3:32b, 1×/day) + └── KingSubgoal → SubgoalQueue + ├── WarVassal (Qwen3:14b, 4×/day) + │ └── TaskMessage → LogisticsCompanion + │ └── GABS: move_party, recruit_troops, upgrade_troops + ├── EconomyVassal (Qwen3:14b, 4×/day) + │ └── TaskMessage → CaravanCompanion + │ └── GABS: assess_prices, buy_goods, establish_caravan + └── DiplomacyVassal (Qwen3:14b, 4×/day) + └── TaskMessage → ScoutCompanion + └── GABS: track_lord, assess_garrison, report_intel +``` + +## Subgoal vocabulary + +| Token | Vassal | Meaning | +|-------|--------|---------| +| `EXPAND_TERRITORY` | War | Take or secure a fief | +| `RAID_ECONOMY` | War | Raid enemy villages for denars | +| `TRAIN` | War | Level troops via auto-resolve | +| `FORTIFY` | Economy | Upgrade or repair a settlement | +| `CONSOLIDATE` | Economy | Hold territory, no expansion | +| `TRADE` | Economy | Execute profitable trade route | +| `ALLY` | Diplomacy | Pursue non-aggression / alliance | +| `RECRUIT` | Logistics | Fill party to capacity | +| `HEAL` | Logistics | Rest party until wounds recovered | +| `SPY` | Scout | Gain information on target faction | + +## Victory condition + +```python +VictoryCondition( + holds_king_title=True, # player_title == "King" from GABS + territory_control_pct=55.0, # > 51% of Calradia fiefs +) +``` + +## Graceful degradation + +When GABS is offline (game not running), `GABSClient` logs a warning and raises +`GABSUnavailable`. The King agent catches this and runs with an empty game state +(falls back to RECRUIT subgoal). No part of the dashboard crashes. + +## Remaining prerequisites + +Before M5 can run live: + +1. **M1-M3** — Passive observer, basic campaign actions, full campaign strategy + (currently open; their Python stubs can build on this `src/bannerlord/` package) +2. **M4** — Formation Commander (#1096) — declined as out-of-scope; M5 works + around M4 by using Bannerlord's Tactics auto-resolve path +3. **Windows VM** — Mount & Blade II: Bannerlord + GABS mod (BUTR/Bannerlord.GABS) +4. **OBS streaming** — Cinematic Camera pipeline (Step 3 of M5) — external to repo +5. **BattleLink** — Alex co-op integration (Step 4 of M5) — requires dedicated server + +## Design references + +- Ahilan & Dayan (2019): Feudal Multi-Agent Hierarchies — manager/worker hierarchy +- Wang et al. (2023): Voyager — LLM lifelong learning pattern +- Feudal hierarchy design doc: `docs/research/bannerlord-feudal-hierarchy-design.md` + +Fixes #1097 diff --git a/pyproject.toml b/pyproject.toml index b49140ce..36389a48 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ repository = "http://localhost:3000/rockachopa/Timmy-time-dashboard" packages = [ { include = "config.py", from = "src" }, + { include = "bannerlord", from = "src" }, { include = "dashboard", from = "src" }, { include = "infrastructure", from = "src" }, { include = "integrations", from = "src" }, diff --git a/src/bannerlord/__init__.py b/src/bannerlord/__init__.py new file mode 100644 index 00000000..7bf29d74 --- /dev/null +++ b/src/bannerlord/__init__.py @@ -0,0 +1,22 @@ +"""Bannerlord sovereign agent package — Project Bannerlord M5. + +Implements the feudal multi-agent hierarchy for Timmy's Bannerlord campaign. +Architecture based on Ahilan & Dayan (2019) Feudal Multi-Agent Hierarchies. + +Refs #1091 (epic), #1097 (M5 Sovereign Victory), #1099 (feudal hierarchy design). + +Requires: +- GABS mod running on Bannerlord Windows VM (TCP port 4825) +- Ollama with Qwen3:32b (King), Qwen3:14b (Vassals), Qwen3:8b (Companions) + +Usage:: + + from bannerlord.gabs_client import GABSClient + from bannerlord.agents.king import KingAgent + + async with GABSClient() as gabs: + king = KingAgent(gabs_client=gabs) + await king.run_campaign() +""" + +__version__ = "0.1.0" diff --git a/src/bannerlord/agents/__init__.py b/src/bannerlord/agents/__init__.py new file mode 100644 index 00000000..62a4ede1 --- /dev/null +++ b/src/bannerlord/agents/__init__.py @@ -0,0 +1,7 @@ +"""Bannerlord feudal agent hierarchy. + +Three tiers: +- King (king.py) — strategic, Qwen3:32b, 1× per campaign day +- Vassals (vassals.py) — domain, Qwen3:14b, 4× per campaign day +- Companions (companions.py) — tactical, Qwen3:8b, event-driven +""" diff --git a/src/bannerlord/agents/companions.py b/src/bannerlord/agents/companions.py new file mode 100644 index 00000000..ad26865e --- /dev/null +++ b/src/bannerlord/agents/companions.py @@ -0,0 +1,258 @@ +"""Companion worker agents — Logistics, Caravan, and Scout. + +Companions are the lowest tier — fast, specialized, single-purpose workers. +Each companion listens to its :class:`TaskMessage` queue, executes the +requested primitive against GABS, and emits a :class:`ResultMessage`. + +Model: Qwen3:8b (or smaller) — sub-2-second response times. +Frequency: event-driven (triggered by vassal task messages). + +Primitive vocabulary per companion: + Logistics: recruit_troop, buy_supplies, rest_party, sell_prisoners, upgrade_troops, build_project + Caravan: assess_prices, buy_goods, sell_goods, establish_caravan, abandon_route + Scout: track_lord, assess_garrison, map_patrol_routes, report_intel + +Refs: #1097, #1099. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any + +from bannerlord.gabs_client import GABSClient, GABSUnavailable +from bannerlord.models import ResultMessage, TaskMessage + +logger = logging.getLogger(__name__) + + +class BaseCompanion: + """Shared companion lifecycle — polls task queue, executes primitives.""" + + name: str = "base_companion" + primitives: frozenset[str] = frozenset() + + def __init__( + self, + gabs_client: GABSClient, + task_queue: asyncio.Queue[TaskMessage], + result_queue: asyncio.Queue[ResultMessage] | None = None, + ) -> None: + self._gabs = gabs_client + self._task_queue = task_queue + self._result_queue = result_queue or asyncio.Queue() + self._running = False + + @property + def result_queue(self) -> asyncio.Queue[ResultMessage]: + return self._result_queue + + async def run(self) -> None: + """Companion event loop — processes task messages.""" + self._running = True + logger.info("%s started", self.name) + try: + while self._running: + try: + task = await asyncio.wait_for(self._task_queue.get(), timeout=1.0) + except asyncio.TimeoutError: + continue + + if task.to_agent != self.name: + # Not for us — put it back (another companion will handle it) + await self._task_queue.put(task) + await asyncio.sleep(0.05) + continue + + result = await self._execute(task) + await self._result_queue.put(result) + self._task_queue.task_done() + + except asyncio.CancelledError: + logger.info("%s cancelled", self.name) + raise + finally: + self._running = False + + def stop(self) -> None: + self._running = False + + async def _execute(self, task: TaskMessage) -> ResultMessage: + """Dispatch *task.primitive* to its handler method.""" + handler = getattr(self, f"_prim_{task.primitive}", None) + if handler is None: + logger.warning( + "%s: unknown primitive %r — skipping", self.name, task.primitive + ) + return ResultMessage( + from_agent=self.name, + to_agent=task.from_agent, + success=False, + outcome={"error": f"Unknown primitive: {task.primitive}"}, + ) + try: + outcome = await handler(task.args) + return ResultMessage( + from_agent=self.name, + to_agent=task.from_agent, + success=True, + outcome=outcome or {}, + ) + except GABSUnavailable as exc: + logger.warning("%s: GABS unavailable for %r: %s", self.name, task.primitive, exc) + return ResultMessage( + from_agent=self.name, + to_agent=task.from_agent, + success=False, + outcome={"error": str(exc)}, + ) + except Exception as exc: # noqa: BLE001 + logger.warning("%s: %r failed: %s", self.name, task.primitive, exc) + return ResultMessage( + from_agent=self.name, + to_agent=task.from_agent, + success=False, + outcome={"error": str(exc)}, + ) + + +# ── Logistics Companion ─────────────────────────────────────────────────────── + + +class LogisticsCompanion(BaseCompanion): + """Party management — recruitment, supply, healing, troop upgrades. + + Skill domain: Scouting / Steward / Medicine. + """ + + name = "logistics_companion" + primitives = frozenset( + {"recruit_troop", "buy_supplies", "rest_party", "sell_prisoners", "upgrade_troops", "build_project"} + ) + + async def _prim_recruit_troop(self, args: dict[str, Any]) -> dict[str, Any]: + troop_type = args.get("troop_type", "infantry") + qty = int(args.get("quantity", 10)) + result = await self._gabs.recruit_troops(troop_type, qty) + logger.info("Recruited %d %s", qty, troop_type) + return result or {"recruited": qty, "type": troop_type} + + async def _prim_buy_supplies(self, args: dict[str, Any]) -> dict[str, Any]: + qty = int(args.get("quantity", 50)) + result = await self._gabs.call("party.buySupplies", {"quantity": qty}) + logger.info("Bought %d food supplies", qty) + return result or {"purchased": qty} + + async def _prim_rest_party(self, args: dict[str, Any]) -> dict[str, Any]: + days = int(args.get("days", 3)) + result = await self._gabs.call("party.rest", {"days": days}) + logger.info("Resting party for %d days", days) + return result or {"rested_days": days} + + async def _prim_sell_prisoners(self, args: dict[str, Any]) -> dict[str, Any]: + location = args.get("location", "nearest_town") + result = await self._gabs.call("party.sellPrisoners", {"location": location}) + logger.info("Selling prisoners at %s", location) + return result or {"sold_at": location} + + async def _prim_upgrade_troops(self, args: dict[str, Any]) -> dict[str, Any]: + result = await self._gabs.call("party.upgradeTroops", {}) + logger.info("Upgraded available troops") + return result or {"upgraded": True} + + async def _prim_build_project(self, args: dict[str, Any]) -> dict[str, Any]: + settlement = args.get("settlement", "") + result = await self._gabs.call("settlement.buildProject", {"settlement": settlement}) + logger.info("Building project in %s", settlement) + return result or {"settlement": settlement} + + async def _prim_move_party(self, args: dict[str, Any]) -> dict[str, Any]: + destination = args.get("destination", "") + result = await self._gabs.move_party(destination) + logger.info("Moving party to %s", destination) + return result or {"destination": destination} + + +# ── Caravan Companion ───────────────────────────────────────────────────────── + + +class CaravanCompanion(BaseCompanion): + """Trade route management — price assessment, goods trading, caravan deployment. + + Skill domain: Trade / Charm. + """ + + name = "caravan_companion" + primitives = frozenset( + {"assess_prices", "buy_goods", "sell_goods", "establish_caravan", "abandon_route"} + ) + + async def _prim_assess_prices(self, args: dict[str, Any]) -> dict[str, Any]: + town = args.get("town", "nearest") + result = await self._gabs.call("trade.assessPrices", {"town": town}) + logger.info("Assessed prices at %s", town) + return result or {"town": town} + + async def _prim_buy_goods(self, args: dict[str, Any]) -> dict[str, Any]: + item = args.get("item", "grain") + qty = int(args.get("quantity", 10)) + result = await self._gabs.call("trade.buyGoods", {"item": item, "quantity": qty}) + logger.info("Buying %d × %s", qty, item) + return result or {"item": item, "quantity": qty} + + async def _prim_sell_goods(self, args: dict[str, Any]) -> dict[str, Any]: + item = args.get("item", "grain") + qty = int(args.get("quantity", 10)) + result = await self._gabs.call("trade.sellGoods", {"item": item, "quantity": qty}) + logger.info("Selling %d × %s", qty, item) + return result or {"item": item, "quantity": qty} + + async def _prim_establish_caravan(self, args: dict[str, Any]) -> dict[str, Any]: + town = args.get("town", "") + result = await self._gabs.call("trade.establishCaravan", {"town": town}) + logger.info("Establishing caravan at %s", town) + return result or {"town": town} + + async def _prim_abandon_route(self, args: dict[str, Any]) -> dict[str, Any]: + result = await self._gabs.call("trade.abandonRoute", {}) + logger.info("Caravan route abandoned — returning to main party") + return result or {"abandoned": True} + + +# ── Scout Companion ─────────────────────────────────────────────────────────── + + +class ScoutCompanion(BaseCompanion): + """Intelligence gathering — lord tracking, garrison assessment, patrol mapping. + + Skill domain: Scouting / Roguery. + """ + + name = "scout_companion" + primitives = frozenset( + {"track_lord", "assess_garrison", "map_patrol_routes", "report_intel"} + ) + + async def _prim_track_lord(self, args: dict[str, Any]) -> dict[str, Any]: + lord_name = args.get("name", "") + result = await self._gabs.call("intelligence.trackLord", {"name": lord_name}) + logger.info("Tracking lord: %s", lord_name) + return result or {"tracking": lord_name} + + async def _prim_assess_garrison(self, args: dict[str, Any]) -> dict[str, Any]: + settlement = args.get("settlement", "") + result = await self._gabs.call("intelligence.assessGarrison", {"settlement": settlement}) + logger.info("Assessing garrison at %s", settlement) + return result or {"settlement": settlement} + + async def _prim_map_patrol_routes(self, args: dict[str, Any]) -> dict[str, Any]: + region = args.get("region", "") + result = await self._gabs.call("intelligence.mapPatrols", {"region": region}) + logger.info("Mapping patrol routes in %s", region) + return result or {"region": region} + + async def _prim_report_intel(self, args: dict[str, Any]) -> dict[str, Any]: + result = await self._gabs.call("intelligence.report", {}) + logger.info("Scout intel report generated") + return result or {"reported": True} diff --git a/src/bannerlord/agents/king.py b/src/bannerlord/agents/king.py new file mode 100644 index 00000000..a8928d8c --- /dev/null +++ b/src/bannerlord/agents/king.py @@ -0,0 +1,236 @@ +"""King agent — Timmy as sovereign ruler of Calradia. + +The King operates on the campaign-map timescale. Each campaign tick he: +1. Reads the full game state from GABS +2. Evaluates the victory condition +3. Issues a single KingSubgoal token to the vassal queue +4. Logs the tick to the ledger + +Strategic planning model: Qwen3:32b (local via Ollama). +Decision budget: 5–15 seconds per tick. + +Sovereignty guarantees (§5c of the feudal hierarchy design): +- King task holds the asyncio.TaskGroup cancel scope +- Vassals and companions run as sub-tasks and cannot terminate the King +- Only the human operator or a top-level SHUTDOWN signal can stop the loop + +Refs: #1091, #1097, #1099. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +from typing import Any + +from bannerlord.gabs_client import GABSClient, GABSUnavailable +from bannerlord.ledger import Ledger +from bannerlord.models import ( + KingSubgoal, + StateUpdateMessage, + SubgoalMessage, + VictoryCondition, +) + +logger = logging.getLogger(__name__) + +_KING_MODEL = "qwen3:32b" +_KING_TICK_SECONDS = 5.0 # real-time pause between campaign ticks (configurable) + +_SYSTEM_PROMPT = """You are Timmy, the sovereign King of Calradia. +Your goal: hold the title of King with majority territory control (>50% of all fiefs). +You think strategically over 100+ in-game days. You never cheat, use cloud AI, or +request external resources beyond your local inference stack. + +Each turn you receive the full game state as JSON. You respond with a single JSON +object selecting your strategic directive for the next campaign day: +{ + "token": "", + "target": "", + "quantity": , + "priority": , + "deadline_days": , + "context": "" +} + +Valid tokens: EXPAND_TERRITORY, RAID_ECONOMY, FORTIFY, RECRUIT, TRADE, + ALLY, SPY, HEAL, CONSOLIDATE, TRAIN + +Think step by step. Respond with JSON only — no prose outside the object. +""" + + +class KingAgent: + """Sovereign campaign agent. + + Parameters + ---------- + gabs_client: + Connected (or gracefully-degraded) GABS client. + ledger: + Asset ledger for persistence. Initialized automatically if not provided. + ollama_url: + Base URL of the Ollama inference server. + model: + Ollama model tag. Default: qwen3:32b. + tick_interval: + Real-time seconds between campaign ticks. + subgoal_queue: + asyncio.Queue where KingSubgoal messages are placed for vassals. + Created automatically if not provided. + """ + + def __init__( + self, + gabs_client: GABSClient, + ledger: Ledger | None = None, + ollama_url: str = "http://localhost:11434", + model: str = _KING_MODEL, + tick_interval: float = _KING_TICK_SECONDS, + subgoal_queue: asyncio.Queue[SubgoalMessage] | None = None, + ) -> None: + self._gabs = gabs_client + self._ledger = ledger or Ledger() + self._ollama_url = ollama_url + self._model = model + self._tick_interval = tick_interval + self._subgoal_queue: asyncio.Queue[SubgoalMessage] = subgoal_queue or asyncio.Queue() + self._tick = 0 + self._running = False + + @property + def subgoal_queue(self) -> asyncio.Queue[SubgoalMessage]: + return self._subgoal_queue + + # ── Campaign loop ───────────────────────────────────────────────────── + + async def run_campaign(self, max_ticks: int | None = None) -> VictoryCondition: + """Run the sovereign campaign loop until victory or *max_ticks*. + + Returns the final :class:`VictoryCondition` snapshot. + """ + self._ledger.initialize() + self._running = True + victory = VictoryCondition() + logger.info("King campaign started. Model: %s. Max ticks: %s", self._model, max_ticks) + + try: + while self._running: + if max_ticks is not None and self._tick >= max_ticks: + logger.info("Max ticks (%d) reached — stopping campaign.", max_ticks) + break + + state = await self._fetch_state() + victory = self._evaluate_victory(state) + + if victory.achieved: + logger.info( + "SOVEREIGN VICTORY — King of Calradia! " + "Territory: %.1f%%, tick: %d", + victory.territory_control_pct, + self._tick, + ) + break + + subgoal = await self._decide(state) + await self._broadcast_subgoal(subgoal) + self._ledger.log_tick( + tick=self._tick, + campaign_day=state.get("campaign_day", self._tick), + subgoal=subgoal.token, + ) + + self._tick += 1 + await asyncio.sleep(self._tick_interval) + + except asyncio.CancelledError: + logger.info("King campaign task cancelled at tick %d", self._tick) + raise + finally: + self._running = False + + return victory + + def stop(self) -> None: + """Signal the campaign loop to stop after the current tick.""" + self._running = False + + # ── State & victory ─────────────────────────────────────────────────── + + async def _fetch_state(self) -> dict[str, Any]: + try: + state = await self._gabs.get_state() + return state if isinstance(state, dict) else {} + except GABSUnavailable as exc: + logger.warning("GABS unavailable at tick %d: %s — using empty state", self._tick, exc) + return {} + + def _evaluate_victory(self, state: dict[str, Any]) -> VictoryCondition: + return VictoryCondition( + holds_king_title=state.get("player_title") == "King", + territory_control_pct=float(state.get("territory_control_pct", 0.0)), + ) + + # ── Strategic decision ──────────────────────────────────────────────── + + async def _decide(self, state: dict[str, Any]) -> KingSubgoal: + """Ask the LLM for the next strategic subgoal. + + Falls back to RECRUIT (safe default) if the LLM is unavailable. + """ + try: + subgoal = await asyncio.to_thread(self._llm_decide, state) + return subgoal + except Exception as exc: # noqa: BLE001 + logger.warning( + "King LLM decision failed at tick %d: %s — defaulting to RECRUIT", self._tick, exc + ) + return KingSubgoal(token="RECRUIT", context="LLM unavailable — safe default") + + def _llm_decide(self, state: dict[str, Any]) -> KingSubgoal: + """Synchronous Ollama call (runs in a thread via asyncio.to_thread).""" + import urllib.request + + prompt_state = json.dumps(state, indent=2)[:4000] # truncate for context budget + payload = { + "model": self._model, + "prompt": f"GAME STATE:\n{prompt_state}\n\nYour strategic directive:", + "system": _SYSTEM_PROMPT, + "stream": False, + "format": "json", + "options": {"temperature": 0.1}, + } + data = json.dumps(payload).encode() + req = urllib.request.Request( + f"{self._ollama_url}/api/generate", + data=data, + headers={"Content-Type": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=30) as resp: # noqa: S310 + result = json.loads(resp.read()) + + raw = result.get("response", "{}") + parsed = json.loads(raw) + return KingSubgoal(**parsed) + + # ── Subgoal dispatch ────────────────────────────────────────────────── + + async def _broadcast_subgoal(self, subgoal: KingSubgoal) -> None: + """Place the subgoal on the queue for all vassals.""" + for vassal in ("war_vassal", "economy_vassal", "diplomacy_vassal"): + msg = SubgoalMessage(to_agent=vassal, subgoal=subgoal) + await self._subgoal_queue.put(msg) + logger.debug( + "Tick %d: subgoal %s → %s (priority=%.1f)", + self._tick, + subgoal.token, + subgoal.target or "—", + subgoal.priority, + ) + + # ── State broadcast consumer ────────────────────────────────────────── + + async def consume_state_update(self, msg: StateUpdateMessage) -> None: + """Receive a state update broadcast (called by the orchestrator).""" + logger.debug("King received state update tick=%d", msg.tick) diff --git a/src/bannerlord/agents/vassals.py b/src/bannerlord/agents/vassals.py new file mode 100644 index 00000000..5accf15b --- /dev/null +++ b/src/bannerlord/agents/vassals.py @@ -0,0 +1,304 @@ +"""Vassal agents — War, Economy, and Diplomacy. + +Vassals are mid-tier agents responsible for a domain of the kingdom. +Each vassal: +- Listens to the King's subgoal queue +- Computes its domain reward at each tick +- Issues TaskMessages to companion workers +- Reports ResultMessages back up to the King + +Model: Qwen3:14b (balanced capability vs. latency). +Frequency: up to 4× per campaign day. + +Refs: #1097, #1099. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any + +from bannerlord.gabs_client import GABSClient, GABSUnavailable +from bannerlord.models import ( + DiplomacyReward, + EconomyReward, + KingSubgoal, + ResultMessage, + SubgoalMessage, + TaskMessage, + WarReward, +) + +logger = logging.getLogger(__name__) + +# Tokens each vassal responds to (all others are ignored) +_WAR_TOKENS = {"EXPAND_TERRITORY", "RAID_ECONOMY", "TRAIN"} +_ECON_TOKENS = {"FORTIFY", "CONSOLIDATE"} +_DIPLO_TOKENS = {"ALLY"} +_LOGISTICS_TOKENS = {"RECRUIT", "HEAL"} +_TRADE_TOKENS = {"TRADE"} +_SCOUT_TOKENS = {"SPY"} + + +class BaseVassal: + """Shared vassal lifecycle — subscribes to subgoal queue, runs tick loop.""" + + name: str = "base_vassal" + + def __init__( + self, + gabs_client: GABSClient, + subgoal_queue: asyncio.Queue[SubgoalMessage], + result_queue: asyncio.Queue[ResultMessage] | None = None, + task_queue: asyncio.Queue[TaskMessage] | None = None, + ) -> None: + self._gabs = gabs_client + self._subgoal_queue = subgoal_queue + self._result_queue = result_queue or asyncio.Queue() + self._task_queue = task_queue or asyncio.Queue() + self._active_subgoal: KingSubgoal | None = None + self._running = False + + @property + def task_queue(self) -> asyncio.Queue[TaskMessage]: + return self._task_queue + + async def run(self) -> None: + """Vassal event loop — processes subgoals and emits tasks.""" + self._running = True + logger.info("%s started", self.name) + try: + while self._running: + # Drain all pending subgoals (keep the latest) + try: + while True: + msg = self._subgoal_queue.get_nowait() + if msg.to_agent == self.name: + self._active_subgoal = msg.subgoal + logger.debug( + "%s received subgoal %s", self.name, msg.subgoal.token + ) + except asyncio.QueueEmpty: + pass + + if self._active_subgoal is not None: + await self._tick(self._active_subgoal) + + await asyncio.sleep(0.25) # yield to event loop + except asyncio.CancelledError: + logger.info("%s cancelled", self.name) + raise + finally: + self._running = False + + def stop(self) -> None: + self._running = False + + async def _tick(self, subgoal: KingSubgoal) -> None: + raise NotImplementedError + + async def _get_state(self) -> dict[str, Any]: + try: + return await self._gabs.get_state() or {} + except GABSUnavailable: + return {} + + +# ── War Vassal ──────────────────────────────────────────────────────────────── + + +class WarVassal(BaseVassal): + """Military operations — sieges, field battles, raids, defensive maneuvers. + + Reward function: + R = 0.40*ΔTerritoryValue + 0.25*ΔArmyStrengthRatio + - 0.20*CasualtyCost - 0.10*SupplyCost + 0.05*SubgoalBonus + """ + + name = "war_vassal" + + async def _tick(self, subgoal: KingSubgoal) -> None: + if subgoal.token not in _WAR_TOKENS | _LOGISTICS_TOKENS: + return + + state = await self._get_state() + reward = self._compute_reward(state, subgoal) + + task = self._plan_action(state, subgoal) + if task: + await self._task_queue.put(task) + + logger.debug( + "%s tick: subgoal=%s reward=%.3f action=%s", + self.name, + subgoal.token, + reward.total, + task.primitive if task else "none", + ) + + def _compute_reward(self, state: dict[str, Any], subgoal: KingSubgoal) -> WarReward: + bonus = subgoal.priority * 0.05 if subgoal.token in _WAR_TOKENS else 0.0 + return WarReward( + territory_delta=float(state.get("territory_delta", 0.0)), + army_strength_ratio=float(state.get("army_strength_ratio", 1.0)), + casualty_cost=float(state.get("casualty_cost", 0.0)), + supply_cost=float(state.get("supply_cost", 0.0)), + subgoal_bonus=bonus, + ) + + def _plan_action( + self, state: dict[str, Any], subgoal: KingSubgoal + ) -> TaskMessage | None: + if subgoal.token == "EXPAND_TERRITORY" and subgoal.target: + return TaskMessage( + from_agent=self.name, + to_agent="logistics_companion", + primitive="move_party", + args={"destination": subgoal.target}, + priority=subgoal.priority, + ) + if subgoal.token == "RECRUIT": + qty = subgoal.quantity or 20 + return TaskMessage( + from_agent=self.name, + to_agent="logistics_companion", + primitive="recruit_troop", + args={"troop_type": "infantry", "quantity": qty}, + priority=subgoal.priority, + ) + if subgoal.token == "TRAIN": + return TaskMessage( + from_agent=self.name, + to_agent="logistics_companion", + primitive="upgrade_troops", + args={}, + priority=subgoal.priority, + ) + return None + + +# ── Economy Vassal ──────────────────────────────────────────────────────────── + + +class EconomyVassal(BaseVassal): + """Settlement management, tax collection, construction, food supply. + + Reward function: + R = 0.35*DailyDenarsIncome + 0.25*FoodStockBuffer + 0.20*LoyaltyAverage + - 0.15*ConstructionQueueLength + 0.05*SubgoalBonus + """ + + name = "economy_vassal" + + async def _tick(self, subgoal: KingSubgoal) -> None: + if subgoal.token not in _ECON_TOKENS | _TRADE_TOKENS: + return + + state = await self._get_state() + reward = self._compute_reward(state, subgoal) + + task = self._plan_action(state, subgoal) + if task: + await self._task_queue.put(task) + + logger.debug( + "%s tick: subgoal=%s reward=%.3f", + self.name, + subgoal.token, + reward.total, + ) + + def _compute_reward(self, state: dict[str, Any], subgoal: KingSubgoal) -> EconomyReward: + bonus = subgoal.priority * 0.05 if subgoal.token in _ECON_TOKENS else 0.0 + return EconomyReward( + daily_denars_income=float(state.get("daily_income", 0.0)), + food_stock_buffer=float(state.get("food_days_remaining", 0.0)), + loyalty_average=float(state.get("avg_loyalty", 50.0)), + construction_queue_length=int(state.get("construction_queue", 0)), + subgoal_bonus=bonus, + ) + + def _plan_action( + self, state: dict[str, Any], subgoal: KingSubgoal + ) -> TaskMessage | None: + if subgoal.token == "FORTIFY" and subgoal.target: + return TaskMessage( + from_agent=self.name, + to_agent="logistics_companion", + primitive="build_project", + args={"settlement": subgoal.target}, + priority=subgoal.priority, + ) + if subgoal.token == "TRADE": + return TaskMessage( + from_agent=self.name, + to_agent="caravan_companion", + primitive="assess_prices", + args={"town": subgoal.target or "nearest"}, + priority=subgoal.priority, + ) + return None + + +# ── Diplomacy Vassal ────────────────────────────────────────────────────────── + + +class DiplomacyVassal(BaseVassal): + """Relations management — alliances, peace deals, tribute, marriage. + + Reward function: + R = 0.30*AlliesCount + 0.25*TruceDurationValue + 0.25*RelationsScoreWeighted + - 0.15*ActiveWarsFront + 0.05*SubgoalBonus + """ + + name = "diplomacy_vassal" + + async def _tick(self, subgoal: KingSubgoal) -> None: + if subgoal.token not in _DIPLO_TOKENS | _SCOUT_TOKENS: + return + + state = await self._get_state() + reward = self._compute_reward(state, subgoal) + + task = self._plan_action(state, subgoal) + if task: + await self._task_queue.put(task) + + logger.debug( + "%s tick: subgoal=%s reward=%.3f", + self.name, + subgoal.token, + reward.total, + ) + + def _compute_reward(self, state: dict[str, Any], subgoal: KingSubgoal) -> DiplomacyReward: + bonus = subgoal.priority * 0.05 if subgoal.token in _DIPLO_TOKENS else 0.0 + return DiplomacyReward( + allies_count=int(state.get("allies_count", 0)), + truce_duration_value=float(state.get("truce_value", 0.0)), + relations_score_weighted=float(state.get("relations_weighted", 0.0)), + active_wars_front=int(state.get("active_wars", 0)), + subgoal_bonus=bonus, + ) + + def _plan_action( + self, state: dict[str, Any], subgoal: KingSubgoal + ) -> TaskMessage | None: + if subgoal.token == "ALLY" and subgoal.target: + return TaskMessage( + from_agent=self.name, + to_agent="scout_companion", + primitive="track_lord", + args={"name": subgoal.target}, + priority=subgoal.priority, + ) + if subgoal.token == "SPY" and subgoal.target: + return TaskMessage( + from_agent=self.name, + to_agent="scout_companion", + primitive="assess_garrison", + args={"settlement": subgoal.target}, + priority=subgoal.priority, + ) + return None diff --git a/src/bannerlord/gabs_client.py b/src/bannerlord/gabs_client.py new file mode 100644 index 00000000..ca6d1654 --- /dev/null +++ b/src/bannerlord/gabs_client.py @@ -0,0 +1,201 @@ +"""GABS TCP/JSON-RPC client. + +Connects to the Bannerlord.GABS C# mod server running on a Windows VM. +Protocol: newline-delimited JSON-RPC 2.0 over raw TCP. + +Default host: localhost, port: 4825 (configurable via settings.bannerlord_gabs_host +and settings.bannerlord_gabs_port). + +Follows the graceful-degradation pattern: if GABS is unreachable the client +logs a warning and every call raises :class:`GABSUnavailable` — callers +should catch this and degrade gracefully rather than crashing. + +Refs: #1091, #1097. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +from contextlib import asynccontextmanager +from typing import Any + +logger = logging.getLogger(__name__) + +_DEFAULT_HOST = "localhost" +_DEFAULT_PORT = 4825 +_DEFAULT_TIMEOUT = 10.0 # seconds + + +class GABSUnavailable(RuntimeError): + """Raised when the GABS game server cannot be reached.""" + + +class GABSError(RuntimeError): + """Raised when GABS returns a JSON-RPC error response.""" + + def __init__(self, code: int, message: str) -> None: + super().__init__(f"GABS error {code}: {message}") + self.code = code + + +class GABSClient: + """Async TCP JSON-RPC client for Bannerlord.GABS. + + Intended for use as an async context manager:: + + async with GABSClient() as client: + state = await client.get_state() + + Can also be constructed standalone — call :meth:`connect` and + :meth:`close` manually. + """ + + def __init__( + self, + host: str = _DEFAULT_HOST, + port: int = _DEFAULT_PORT, + timeout: float = _DEFAULT_TIMEOUT, + ) -> None: + self._host = host + self._port = port + self._timeout = timeout + self._reader: asyncio.StreamReader | None = None + self._writer: asyncio.StreamWriter | None = None + self._seq = 0 + self._connected = False + + # ── Lifecycle ───────────────────────────────────────────────────────── + + async def connect(self) -> None: + """Open the TCP connection to GABS. + + Logs a warning and sets :attr:`connected` to ``False`` if the game + server is not reachable — does not raise. + """ + try: + self._reader, self._writer = await asyncio.wait_for( + asyncio.open_connection(self._host, self._port), + timeout=self._timeout, + ) + self._connected = True + logger.info("GABS connected at %s:%s", self._host, self._port) + except (OSError, asyncio.TimeoutError) as exc: + logger.warning( + "GABS unavailable at %s:%s — Bannerlord agent will degrade: %s", + self._host, + self._port, + exc, + ) + self._connected = False + + async def close(self) -> None: + if self._writer is not None: + try: + self._writer.close() + await self._writer.wait_closed() + except Exception: # noqa: BLE001 + pass + self._connected = False + logger.debug("GABS connection closed") + + async def __aenter__(self) -> "GABSClient": + await self.connect() + return self + + async def __aexit__(self, *_: Any) -> None: + await self.close() + + @property + def connected(self) -> bool: + return self._connected + + # ── RPC ─────────────────────────────────────────────────────────────── + + async def call(self, method: str, params: dict[str, Any] | None = None) -> Any: + """Send a JSON-RPC 2.0 request and return the ``result`` field. + + Raises: + GABSUnavailable: if the client is not connected. + GABSError: if the server returns a JSON-RPC error. + """ + if not self._connected or self._reader is None or self._writer is None: + raise GABSUnavailable( + f"GABS not connected (host={self._host}, port={self._port}). " + "Is the Bannerlord VM running?" + ) + + self._seq += 1 + request = { + "jsonrpc": "2.0", + "id": self._seq, + "method": method, + "params": params or {}, + } + payload = json.dumps(request) + "\n" + + try: + self._writer.write(payload.encode()) + await asyncio.wait_for(self._writer.drain(), timeout=self._timeout) + + raw = await asyncio.wait_for( + self._reader.readline(), timeout=self._timeout + ) + except (OSError, asyncio.TimeoutError) as exc: + self._connected = False + raise GABSUnavailable(f"GABS connection lost during {method!r}: {exc}") from exc + + response = json.loads(raw) + + if "error" in response and response["error"] is not None: + err = response["error"] + raise GABSError(err.get("code", -1), err.get("message", "unknown")) + + return response.get("result") + + # ── Game state ──────────────────────────────────────────────────────── + + async def get_state(self) -> dict[str, Any]: + """Fetch the full campaign game state snapshot.""" + return await self.call("game.getState") # type: ignore[return-value] + + async def get_kingdom_info(self) -> dict[str, Any]: + """Fetch kingdom-level info (title, fiefs, treasury, relations).""" + return await self.call("kingdom.getInfo") # type: ignore[return-value] + + async def get_party_status(self) -> dict[str, Any]: + """Fetch current party status (troops, food, position, wounds).""" + return await self.call("party.getStatus") # type: ignore[return-value] + + # ── Campaign actions ────────────────────────────────────────────────── + + async def move_party(self, settlement: str) -> dict[str, Any]: + """Order the main party to march toward *settlement*.""" + return await self.call("party.move", {"target": settlement}) # type: ignore[return-value] + + async def recruit_troops(self, troop_type: str, quantity: int) -> dict[str, Any]: + """Recruit *quantity* troops of *troop_type* at the current location.""" + return await self.call( # type: ignore[return-value] + "party.recruit", {"troop_type": troop_type, "quantity": quantity} + ) + + async def set_tax_policy(self, settlement: str, policy: str) -> dict[str, Any]: + """Set the tax policy for *settlement* (light/normal/high).""" + return await self.call( # type: ignore[return-value] + "settlement.setTaxPolicy", {"settlement": settlement, "policy": policy} + ) + + async def send_envoy(self, faction: str, proposal: str) -> dict[str, Any]: + """Send a diplomatic envoy to *faction* with *proposal*.""" + return await self.call( # type: ignore[return-value] + "diplomacy.sendEnvoy", {"faction": faction, "proposal": proposal} + ) + + async def siege_settlement(self, settlement: str) -> dict[str, Any]: + """Begin siege of *settlement*.""" + return await self.call("battle.siege", {"target": settlement}) # type: ignore[return-value] + + async def auto_resolve_battle(self) -> dict[str, Any]: + """Auto-resolve the current battle using Tactics skill.""" + return await self.call("battle.autoResolve") # type: ignore[return-value] diff --git a/src/bannerlord/ledger.py b/src/bannerlord/ledger.py new file mode 100644 index 00000000..e3898e62 --- /dev/null +++ b/src/bannerlord/ledger.py @@ -0,0 +1,260 @@ +"""Asset ledger for the Bannerlord sovereign agent. + +Tracks kingdom assets (denars, settlements, troop allocations) in an +in-memory dict backed by SQLite for persistence. Follows the existing +SQLite migration pattern in this repo. + +The King has exclusive write access to treasury and settlement ownership. +Vassals receive an allocated budget and cannot exceed it without King +re-authorization. Companions hold only work-in-progress quotas. + +Refs: #1097, #1099. +""" + +from __future__ import annotations + +import logging +import sqlite3 +from contextlib import contextmanager +from datetime import datetime +from pathlib import Path +from typing import Iterator + +logger = logging.getLogger(__name__) + +_DEFAULT_DB = Path.home() / ".timmy" / "bannerlord" / "ledger.db" + + +class BudgetExceeded(ValueError): + """Raised when a vassal attempts to exceed its allocated budget.""" + + +class Ledger: + """Sovereign asset ledger backed by SQLite. + + Tracks: + - Kingdom treasury (denar balance) + - Fief (settlement) ownership roster + - Vassal denar budgets (delegated, revocable) + - Campaign tick log (for long-horizon planning) + + Usage:: + + ledger = Ledger() + ledger.initialize() + ledger.deposit(5000, "tax income — Epicrotea") + ledger.allocate_budget("war_vassal", 2000) + """ + + def __init__(self, db_path: Path = _DEFAULT_DB) -> None: + self._db_path = db_path + self._db_path.parent.mkdir(parents=True, exist_ok=True) + + # ── Setup ───────────────────────────────────────────────────────────── + + def initialize(self) -> None: + """Create tables if they don't exist.""" + with self._conn() as conn: + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS treasury ( + id INTEGER PRIMARY KEY CHECK (id = 1), + balance REAL NOT NULL DEFAULT 0 + ); + INSERT OR IGNORE INTO treasury (id, balance) VALUES (1, 0); + + CREATE TABLE IF NOT EXISTS fiefs ( + name TEXT PRIMARY KEY, + fief_type TEXT NOT NULL, -- town / castle / village + acquired_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS vassal_budgets ( + agent TEXT PRIMARY KEY, + allocated REAL NOT NULL DEFAULT 0, + spent REAL NOT NULL DEFAULT 0 + ); + + CREATE TABLE IF NOT EXISTS tick_log ( + tick INTEGER PRIMARY KEY, + campaign_day INTEGER NOT NULL, + subgoal TEXT, + reward_war REAL, + reward_econ REAL, + reward_diplo REAL, + logged_at TEXT NOT NULL + ); + """ + ) + logger.debug("Ledger initialized at %s", self._db_path) + + # ── Treasury ────────────────────────────────────────────────────────── + + def balance(self) -> float: + with self._conn() as conn: + row = conn.execute("SELECT balance FROM treasury WHERE id = 1").fetchone() + return float(row[0]) if row else 0.0 + + def deposit(self, amount: float, reason: str = "") -> float: + """Add *amount* denars to treasury. Returns new balance.""" + if amount < 0: + raise ValueError("Use withdraw() for negative amounts") + with self._conn() as conn: + conn.execute( + "UPDATE treasury SET balance = balance + ? WHERE id = 1", (amount,) + ) + bal = self.balance() + logger.info("Treasury +%.0f denars (%s) → balance %.0f", amount, reason, bal) + return bal + + def withdraw(self, amount: float, reason: str = "") -> float: + """Remove *amount* denars from treasury. Returns new balance.""" + if amount < 0: + raise ValueError("Amount must be positive") + bal = self.balance() + if amount > bal: + raise BudgetExceeded( + f"Cannot withdraw {amount:.0f} denars — treasury balance is only {bal:.0f}" + ) + with self._conn() as conn: + conn.execute( + "UPDATE treasury SET balance = balance - ? WHERE id = 1", (amount,) + ) + new_bal = self.balance() + logger.info("Treasury -%.0f denars (%s) → balance %.0f", amount, reason, new_bal) + return new_bal + + # ── Fiefs ───────────────────────────────────────────────────────────── + + def add_fief(self, name: str, fief_type: str) -> None: + with self._conn() as conn: + conn.execute( + "INSERT OR REPLACE INTO fiefs (name, fief_type, acquired_at) VALUES (?, ?, ?)", + (name, fief_type, datetime.utcnow().isoformat()), + ) + logger.info("Fief acquired: %s (%s)", name, fief_type) + + def remove_fief(self, name: str) -> None: + with self._conn() as conn: + conn.execute("DELETE FROM fiefs WHERE name = ?", (name,)) + logger.info("Fief lost: %s", name) + + def list_fiefs(self) -> list[dict[str, str]]: + with self._conn() as conn: + rows = conn.execute("SELECT name, fief_type, acquired_at FROM fiefs").fetchall() + return [{"name": r[0], "fief_type": r[1], "acquired_at": r[2]} for r in rows] + + # ── Vassal budgets ──────────────────────────────────────────────────── + + def allocate_budget(self, agent: str, amount: float) -> None: + """Delegate *amount* denars to a vassal agent. + + Withdraws from treasury. Raises :class:`BudgetExceeded` if + the treasury cannot cover the allocation. + """ + self.withdraw(amount, reason=f"budget → {agent}") + with self._conn() as conn: + conn.execute( + """ + INSERT INTO vassal_budgets (agent, allocated, spent) + VALUES (?, ?, 0) + ON CONFLICT(agent) DO UPDATE SET allocated = allocated + excluded.allocated + """, + (agent, amount), + ) + logger.info("Allocated %.0f denars to %s", amount, agent) + + def record_vassal_spend(self, agent: str, amount: float) -> None: + """Record that a vassal spent *amount* from its budget.""" + with self._conn() as conn: + row = conn.execute( + "SELECT allocated, spent FROM vassal_budgets WHERE agent = ?", (agent,) + ).fetchone() + if row is None: + raise BudgetExceeded(f"{agent} has no allocated budget") + allocated, spent = row + if spent + amount > allocated: + raise BudgetExceeded( + f"{agent} budget exhausted: {spent:.0f}/{allocated:.0f} spent, " + f"requested {amount:.0f}" + ) + with self._conn() as conn: + conn.execute( + "UPDATE vassal_budgets SET spent = spent + ? WHERE agent = ?", + (amount, agent), + ) + + def vassal_remaining(self, agent: str) -> float: + with self._conn() as conn: + row = conn.execute( + "SELECT allocated - spent FROM vassal_budgets WHERE agent = ?", (agent,) + ).fetchone() + return float(row[0]) if row else 0.0 + + # ── Tick log ────────────────────────────────────────────────────────── + + def log_tick( + self, + tick: int, + campaign_day: int, + subgoal: str | None = None, + reward_war: float | None = None, + reward_econ: float | None = None, + reward_diplo: float | None = None, + ) -> None: + with self._conn() as conn: + conn.execute( + """ + INSERT OR REPLACE INTO tick_log + (tick, campaign_day, subgoal, reward_war, reward_econ, reward_diplo, logged_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + tick, + campaign_day, + subgoal, + reward_war, + reward_econ, + reward_diplo, + datetime.utcnow().isoformat(), + ), + ) + + def tick_history(self, last_n: int = 100) -> list[dict]: + with self._conn() as conn: + rows = conn.execute( + """ + SELECT tick, campaign_day, subgoal, reward_war, reward_econ, reward_diplo, logged_at + FROM tick_log + ORDER BY tick DESC + LIMIT ? + """, + (last_n,), + ).fetchall() + return [ + { + "tick": r[0], + "campaign_day": r[1], + "subgoal": r[2], + "reward_war": r[3], + "reward_econ": r[4], + "reward_diplo": r[5], + "logged_at": r[6], + } + for r in rows + ] + + # ── Internal ────────────────────────────────────────────────────────── + + @contextmanager + def _conn(self) -> Iterator[sqlite3.Connection]: + conn = sqlite3.connect(self._db_path) + conn.execute("PRAGMA journal_mode=WAL") + try: + yield conn + conn.commit() + except Exception: + conn.rollback() + raise + finally: + conn.close() diff --git a/src/bannerlord/models.py b/src/bannerlord/models.py new file mode 100644 index 00000000..f9d7b229 --- /dev/null +++ b/src/bannerlord/models.py @@ -0,0 +1,200 @@ +"""Bannerlord feudal hierarchy data models. + +All inter-agent communication uses typed Pydantic models. No raw dicts +cross agent boundaries — every message is validated at construction time. + +Design: Ahilan & Dayan (2019) Feudal Multi-Agent Hierarchies. +Refs: #1097, #1099. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any, Literal + +from pydantic import BaseModel, Field + +# ── Subgoal vocabulary ──────────────────────────────────────────────────────── + +SUBGOAL_TOKENS = frozenset( + { + "EXPAND_TERRITORY", # Take or secure a fief — War Vassal + "RAID_ECONOMY", # Raid enemy villages for denars — War Vassal + "FORTIFY", # Upgrade or repair a settlement — Economy Vassal + "RECRUIT", # Fill party to capacity — Logistics Companion + "TRADE", # Execute profitable trade route — Caravan Companion + "ALLY", # Pursue non-aggression / alliance — Diplomacy Vassal + "SPY", # Gain information on target faction — Scout Companion + "HEAL", # Rest party until wounds recovered — Logistics Companion + "CONSOLIDATE", # Hold territory, no expansion — Economy Vassal + "TRAIN", # Level troops via auto-resolve bandits — War Vassal + } +) + + +# ── King subgoal ────────────────────────────────────────────────────────────── + + +class KingSubgoal(BaseModel): + """Strategic directive issued by the King agent to vassals. + + The King operates on campaign-map timescale (days to weeks of in-game + time). His sole output is one subgoal token plus optional parameters. + He never micro-manages primitives. + """ + + token: str = Field(..., description="One of SUBGOAL_TOKENS") + target: str | None = Field( + None, description="Named target (settlement, lord, faction)" + ) + quantity: int | None = Field(None, description="For RECRUIT, TRADE tokens", ge=1) + priority: float = Field( + 1.0, ge=0.0, le=2.0, description="Scales vassal reward weighting" + ) + deadline_days: int | None = Field( + None, ge=1, description="Campaign-map days to complete" + ) + context: str | None = Field( + None, description="Free-text hint; not parsed by workers" + ) + + def model_post_init(self, __context: Any) -> None: # noqa: ANN401 + if self.token not in SUBGOAL_TOKENS: + raise ValueError( + f"Unknown subgoal token {self.token!r}. " + f"Must be one of: {sorted(SUBGOAL_TOKENS)}" + ) + + +# ── Inter-agent messages ────────────────────────────────────────────────────── + + +class SubgoalMessage(BaseModel): + """King → Vassal direction.""" + + msg_type: Literal["subgoal"] = "subgoal" + from_agent: Literal["king"] = "king" + to_agent: str = Field(..., description="e.g. 'war_vassal', 'economy_vassal'") + subgoal: KingSubgoal + issued_at: datetime = Field(default_factory=datetime.utcnow) + + +class TaskMessage(BaseModel): + """Vassal → Companion direction.""" + + msg_type: Literal["task"] = "task" + from_agent: str = Field(..., description="e.g. 'war_vassal'") + to_agent: str = Field(..., description="e.g. 'logistics_companion'") + primitive: str = Field(..., description="One of the companion primitives") + args: dict[str, Any] = Field(default_factory=dict) + priority: float = Field(1.0, ge=0.0, le=2.0) + issued_at: datetime = Field(default_factory=datetime.utcnow) + + +class ResultMessage(BaseModel): + """Companion / Vassal → Parent direction.""" + + msg_type: Literal["result"] = "result" + from_agent: str + to_agent: str + success: bool + outcome: dict[str, Any] = Field(default_factory=dict, description="Primitive-specific result") + reward_delta: float = Field(0.0, description="Computed reward contribution") + completed_at: datetime = Field(default_factory=datetime.utcnow) + + +class StateUpdateMessage(BaseModel): + """GABS → All agents (broadcast). + + Sent every campaign tick. Agents consume at their own cadence. + """ + + msg_type: Literal["state"] = "state" + game_state: dict[str, Any] = Field(..., description="Full GABS state snapshot") + tick: int = Field(..., ge=0) + timestamp: datetime = Field(default_factory=datetime.utcnow) + + +# ── Reward snapshots ────────────────────────────────────────────────────────── + + +class WarReward(BaseModel): + """Computed reward for the War Vassal at a given tick.""" + + territory_delta: float = 0.0 + army_strength_ratio: float = 1.0 + casualty_cost: float = 0.0 + supply_cost: float = 0.0 + subgoal_bonus: float = 0.0 + + @property + def total(self) -> float: + w1, w2, w3, w4, w5 = 0.40, 0.25, 0.20, 0.10, 0.05 + return ( + w1 * self.territory_delta + + w2 * self.army_strength_ratio + - w3 * self.casualty_cost + - w4 * self.supply_cost + + w5 * self.subgoal_bonus + ) + + +class EconomyReward(BaseModel): + """Computed reward for the Economy Vassal at a given tick.""" + + daily_denars_income: float = 0.0 + food_stock_buffer: float = 0.0 + loyalty_average: float = 50.0 + construction_queue_length: int = 0 + subgoal_bonus: float = 0.0 + + @property + def total(self) -> float: + w1, w2, w3, w4, w5 = 0.35, 0.25, 0.20, 0.15, 0.05 + return ( + w1 * self.daily_denars_income + + w2 * self.food_stock_buffer + + w3 * self.loyalty_average + - w4 * self.construction_queue_length + + w5 * self.subgoal_bonus + ) + + +class DiplomacyReward(BaseModel): + """Computed reward for the Diplomacy Vassal at a given tick.""" + + allies_count: int = 0 + truce_duration_value: float = 0.0 + relations_score_weighted: float = 0.0 + active_wars_front: int = 0 + subgoal_bonus: float = 0.0 + + @property + def total(self) -> float: + w1, w2, w3, w4, w5 = 0.30, 0.25, 0.25, 0.15, 0.05 + return ( + w1 * self.allies_count + + w2 * self.truce_duration_value + + w3 * self.relations_score_weighted + - w4 * self.active_wars_front + + w5 * self.subgoal_bonus + ) + + +# ── Victory condition ───────────────────────────────────────────────────────── + + +class VictoryCondition(BaseModel): + """Sovereign Victory (M5) — evaluated each campaign tick.""" + + holds_king_title: bool = False + territory_control_pct: float = Field( + 0.0, ge=0.0, le=100.0, description="% of Calradia fiefs held" + ) + majority_threshold: float = Field( + 51.0, ge=0.0, le=100.0, description="Required % for majority control" + ) + + @property + def achieved(self) -> bool: + return self.holds_king_title and self.territory_control_pct >= self.majority_threshold diff --git a/tests/unit/test_bannerlord/__init__.py b/tests/unit/test_bannerlord/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/test_bannerlord/test_agents.py b/tests/unit/test_bannerlord/test_agents.py new file mode 100644 index 00000000..a1a6b993 --- /dev/null +++ b/tests/unit/test_bannerlord/test_agents.py @@ -0,0 +1,307 @@ +"""Unit tests for bannerlord agents — King, Vassals, Companions.""" + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from bannerlord.agents.companions import ( + CaravanCompanion, + LogisticsCompanion, + ScoutCompanion, +) +from bannerlord.agents.king import KingAgent +from bannerlord.agents.vassals import DiplomacyVassal, EconomyVassal, WarVassal +from bannerlord.gabs_client import GABSClient, GABSUnavailable +from bannerlord.ledger import Ledger +from bannerlord.models import ( + KingSubgoal, + ResultMessage, + SubgoalMessage, + TaskMessage, + VictoryCondition, +) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + + +def _mock_gabs(state: dict | None = None) -> GABSClient: + """Return a disconnected GABS stub that returns *state* from get_state.""" + gabs = MagicMock(spec=GABSClient) + gabs.connected = False + if state is not None: + gabs.get_state = AsyncMock(return_value=state) + else: + gabs.get_state = AsyncMock(side_effect=GABSUnavailable("no game")) + gabs.call = AsyncMock(return_value={}) + gabs.recruit_troops = AsyncMock(return_value={"recruited": 10}) + gabs.move_party = AsyncMock(return_value={"moving": True}) + return gabs + + +def _mock_ledger(tmp_path) -> Ledger: + ledger = Ledger(db_path=tmp_path / "ledger.db") + ledger.initialize() + return ledger + + +# ── King agent ──────────────────────────────────────────────────────────────── + + +class TestKingAgent: + async def test_victory_detected(self, tmp_path): + """Campaign stops immediately when victory condition is met.""" + gabs = _mock_gabs({"player_title": "King", "territory_control_pct": 55.0}) + ledger = _mock_ledger(tmp_path) + king = KingAgent(gabs_client=gabs, ledger=ledger, tick_interval=0) + victory = await king.run_campaign(max_ticks=10) + assert victory.achieved + + async def test_max_ticks_respected(self, tmp_path): + """Campaign stops after max_ticks when victory not yet achieved.""" + gabs = _mock_gabs({"player_title": "Lord", "territory_control_pct": 10.0}) + ledger = _mock_ledger(tmp_path) + + # Patch LLM to return a valid subgoal without calling Ollama + king = KingAgent(gabs_client=gabs, ledger=ledger, tick_interval=0) + with patch.object(king, "_decide", AsyncMock(return_value=KingSubgoal(token="RECRUIT"))): + victory = await king.run_campaign(max_ticks=3) + + assert not victory.achieved + assert king._tick == 3 + + async def test_llm_failure_falls_back_to_recruit(self, tmp_path): + """If LLM fails, King defaults to RECRUIT subgoal.""" + gabs = _mock_gabs({"player_title": "Lord", "territory_control_pct": 5.0}) + ledger = _mock_ledger(tmp_path) + king = KingAgent(gabs_client=gabs, ledger=ledger, tick_interval=0) + + with patch.object(king, "_llm_decide", side_effect=RuntimeError("Ollama down")): + subgoal = await king._decide({}) + + assert subgoal.token == "RECRUIT" + + async def test_subgoal_broadcast_to_all_vassals(self, tmp_path): + """King broadcasts subgoal to all three vassals.""" + gabs = _mock_gabs({}) + ledger = _mock_ledger(tmp_path) + king = KingAgent(gabs_client=gabs, ledger=ledger) + subgoal = KingSubgoal(token="EXPAND_TERRITORY", target="Epicrotea") + await king._broadcast_subgoal(subgoal) + + messages = [] + while not king.subgoal_queue.empty(): + messages.append(king.subgoal_queue.get_nowait()) + + assert len(messages) == 3 + recipients = {m.to_agent for m in messages} + assert recipients == {"war_vassal", "economy_vassal", "diplomacy_vassal"} + + async def test_gabs_unavailable_uses_empty_state(self, tmp_path): + """King handles GABS being offline gracefully.""" + gabs = _mock_gabs() # raises GABSUnavailable + ledger = _mock_ledger(tmp_path) + king = KingAgent(gabs_client=gabs, ledger=ledger) + state = await king._fetch_state() + assert state == {} + + def test_evaluate_victory_king_with_majority(self, tmp_path): + gabs = _mock_gabs() + ledger = _mock_ledger(tmp_path) + king = KingAgent(gabs_client=gabs, ledger=ledger) + v = king._evaluate_victory({"player_title": "King", "territory_control_pct": 60.0}) + assert v.achieved + + def test_evaluate_victory_not_king(self, tmp_path): + gabs = _mock_gabs() + ledger = _mock_ledger(tmp_path) + king = KingAgent(gabs_client=gabs, ledger=ledger) + v = king._evaluate_victory({"player_title": "Lord", "territory_control_pct": 80.0}) + assert not v.achieved + + +# ── Vassals ─────────────────────────────────────────────────────────────────── + + +class TestWarVassal: + async def test_expand_territory_emits_move_task(self): + gabs = _mock_gabs({"territory_delta": 1.0, "army_strength_ratio": 1.5}) + queue = asyncio.Queue() + vassal = WarVassal(gabs_client=gabs, subgoal_queue=queue) + subgoal = KingSubgoal(token="EXPAND_TERRITORY", target="Seonon") + await vassal._tick(subgoal) + task: TaskMessage = vassal.task_queue.get_nowait() + assert task.primitive == "move_party" + assert task.args["destination"] == "Seonon" + + async def test_recruit_emits_recruit_task(self): + gabs = _mock_gabs({}) + queue = asyncio.Queue() + vassal = WarVassal(gabs_client=gabs, subgoal_queue=queue) + subgoal = KingSubgoal(token="RECRUIT", quantity=15) + await vassal._tick(subgoal) + task: TaskMessage = vassal.task_queue.get_nowait() + assert task.primitive == "recruit_troop" + assert task.args["quantity"] == 15 + + async def test_irrelevant_token_emits_no_task(self): + gabs = _mock_gabs({}) + queue = asyncio.Queue() + vassal = WarVassal(gabs_client=gabs, subgoal_queue=queue) + subgoal = KingSubgoal(token="ALLY") + await vassal._tick(subgoal) + assert vassal.task_queue.empty() + + +class TestEconomyVassal: + async def test_fortify_emits_build_task(self): + gabs = _mock_gabs({"daily_income": 200.0}) + queue = asyncio.Queue() + vassal = EconomyVassal(gabs_client=gabs, subgoal_queue=queue) + subgoal = KingSubgoal(token="FORTIFY", target="Epicrotea") + await vassal._tick(subgoal) + task: TaskMessage = vassal.task_queue.get_nowait() + assert task.primitive == "build_project" + assert task.args["settlement"] == "Epicrotea" + + async def test_trade_emits_assess_prices(self): + gabs = _mock_gabs({}) + queue = asyncio.Queue() + vassal = EconomyVassal(gabs_client=gabs, subgoal_queue=queue) + subgoal = KingSubgoal(token="TRADE", target="Pravend") + await vassal._tick(subgoal) + task: TaskMessage = vassal.task_queue.get_nowait() + assert task.primitive == "assess_prices" + + +class TestDiplomacyVassal: + async def test_ally_emits_track_lord(self): + gabs = _mock_gabs({"allies_count": 1}) + queue = asyncio.Queue() + vassal = DiplomacyVassal(gabs_client=gabs, subgoal_queue=queue) + subgoal = KingSubgoal(token="ALLY", target="Derthert") + await vassal._tick(subgoal) + task: TaskMessage = vassal.task_queue.get_nowait() + assert task.primitive == "track_lord" + assert task.args["name"] == "Derthert" + + async def test_spy_emits_assess_garrison(self): + gabs = _mock_gabs({}) + queue = asyncio.Queue() + vassal = DiplomacyVassal(gabs_client=gabs, subgoal_queue=queue) + subgoal = KingSubgoal(token="SPY", target="Marunath") + await vassal._tick(subgoal) + task: TaskMessage = vassal.task_queue.get_nowait() + assert task.primitive == "assess_garrison" + assert task.args["settlement"] == "Marunath" + + +# ── Companions ──────────────────────────────────────────────────────────────── + + +class TestLogisticsCompanion: + async def test_recruit_troop(self): + gabs = _mock_gabs() + gabs.recruit_troops = AsyncMock(return_value={"recruited": 10, "type": "infantry"}) + q: asyncio.Queue[TaskMessage] = asyncio.Queue() + comp = LogisticsCompanion(gabs_client=gabs, task_queue=q) + task = TaskMessage( + from_agent="war_vassal", + to_agent="logistics_companion", + primitive="recruit_troop", + args={"troop_type": "infantry", "quantity": 10}, + ) + result = await comp._execute(task) + assert result.success is True + assert result.outcome["recruited"] == 10 + + async def test_unknown_primitive_fails_gracefully(self): + gabs = _mock_gabs() + q: asyncio.Queue[TaskMessage] = asyncio.Queue() + comp = LogisticsCompanion(gabs_client=gabs, task_queue=q) + task = TaskMessage( + from_agent="war_vassal", + to_agent="logistics_companion", + primitive="launch_nukes", + args={}, + ) + result = await comp._execute(task) + assert result.success is False + assert "Unknown primitive" in result.outcome["error"] + + async def test_gabs_unavailable_returns_failure(self): + gabs = _mock_gabs() + gabs.recruit_troops = AsyncMock(side_effect=GABSUnavailable("offline")) + q: asyncio.Queue[TaskMessage] = asyncio.Queue() + comp = LogisticsCompanion(gabs_client=gabs, task_queue=q) + task = TaskMessage( + from_agent="war_vassal", + to_agent="logistics_companion", + primitive="recruit_troop", + args={"troop_type": "infantry", "quantity": 5}, + ) + result = await comp._execute(task) + assert result.success is False + + +class TestCaravanCompanion: + async def test_assess_prices(self): + gabs = _mock_gabs() + gabs.call = AsyncMock(return_value={"grain": 12, "linen": 45}) + q: asyncio.Queue[TaskMessage] = asyncio.Queue() + comp = CaravanCompanion(gabs_client=gabs, task_queue=q) + task = TaskMessage( + from_agent="economy_vassal", + to_agent="caravan_companion", + primitive="assess_prices", + args={"town": "Pravend"}, + ) + result = await comp._execute(task) + assert result.success is True + + async def test_abandon_route(self): + gabs = _mock_gabs() + gabs.call = AsyncMock(return_value={"abandoned": True}) + q: asyncio.Queue[TaskMessage] = asyncio.Queue() + comp = CaravanCompanion(gabs_client=gabs, task_queue=q) + task = TaskMessage( + from_agent="economy_vassal", + to_agent="caravan_companion", + primitive="abandon_route", + args={}, + ) + result = await comp._execute(task) + assert result.success is True + assert result.outcome["abandoned"] is True + + +class TestScoutCompanion: + async def test_assess_garrison(self): + gabs = _mock_gabs() + gabs.call = AsyncMock(return_value={"garrison_size": 120, "settlement": "Marunath"}) + q: asyncio.Queue[TaskMessage] = asyncio.Queue() + comp = ScoutCompanion(gabs_client=gabs, task_queue=q) + task = TaskMessage( + from_agent="diplomacy_vassal", + to_agent="scout_companion", + primitive="assess_garrison", + args={"settlement": "Marunath"}, + ) + result = await comp._execute(task) + assert result.success is True + assert result.outcome["garrison_size"] == 120 + + async def test_report_intel(self): + gabs = _mock_gabs() + gabs.call = AsyncMock(return_value={"intel": ["Derthert at Epicrotea"]}) + q: asyncio.Queue[TaskMessage] = asyncio.Queue() + comp = ScoutCompanion(gabs_client=gabs, task_queue=q) + task = TaskMessage( + from_agent="diplomacy_vassal", + to_agent="scout_companion", + primitive="report_intel", + args={}, + ) + result = await comp._execute(task) + assert result.success is True diff --git a/tests/unit/test_bannerlord/test_gabs_client.py b/tests/unit/test_bannerlord/test_gabs_client.py new file mode 100644 index 00000000..ec15b522 --- /dev/null +++ b/tests/unit/test_bannerlord/test_gabs_client.py @@ -0,0 +1,147 @@ +"""Unit tests for bannerlord.gabs_client — TCP JSON-RPC client.""" + +import asyncio +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from bannerlord.gabs_client import GABSClient, GABSError, GABSUnavailable + + +# ── Connection ──────────────────────────────────────────────────────────────── + + +class TestGABSClientConnection: + async def test_connect_success(self): + mock_reader = AsyncMock() + mock_writer = MagicMock() + mock_writer.close = MagicMock() + mock_writer.wait_closed = AsyncMock() + + with patch( + "bannerlord.gabs_client.asyncio.open_connection", + return_value=(mock_reader, mock_writer), + ): + client = GABSClient() + await client.connect() + + assert client.connected is True + await client.close() + + async def test_connect_failure_degrades_gracefully(self): + with patch( + "bannerlord.gabs_client.asyncio.open_connection", + side_effect=OSError("Connection refused"), + ): + client = GABSClient() + await client.connect() # must not raise + + assert client.connected is False + + async def test_connect_timeout_degrades_gracefully(self): + with patch( + "bannerlord.gabs_client.asyncio.open_connection", + side_effect=asyncio.TimeoutError(), + ): + client = GABSClient() + await client.connect() + + assert client.connected is False + + async def test_context_manager(self): + mock_reader = AsyncMock() + mock_writer = MagicMock() + mock_writer.close = MagicMock() + mock_writer.wait_closed = AsyncMock() + + with patch( + "bannerlord.gabs_client.asyncio.open_connection", + return_value=(mock_reader, mock_writer), + ): + async with GABSClient() as client: + assert client.connected is True + + assert client.connected is False + + +# ── RPC ─────────────────────────────────────────────────────────────────────── + + +class TestGABSClientRPC: + def _make_connected_client(self, response_data: dict): + """Return a client with mocked reader/writer.""" + client = GABSClient() + client._connected = True + + raw_response = json.dumps(response_data) + "\n" + client._reader = AsyncMock() + client._reader.readline = AsyncMock(return_value=raw_response.encode()) + + client._writer = MagicMock() + client._writer.write = MagicMock() + client._writer.drain = AsyncMock() + + return client + + async def test_call_returns_result(self): + client = self._make_connected_client({"jsonrpc": "2.0", "id": 1, "result": {"foo": "bar"}}) + result = await client.call("game.getState") + assert result == {"foo": "bar"} + + async def test_call_raises_on_error(self): + client = self._make_connected_client( + {"jsonrpc": "2.0", "id": 1, "error": {"code": -32601, "message": "Method not found"}} + ) + with pytest.raises(GABSError, match="Method not found"): + await client.call("game.nonexistent") + + async def test_call_raises_unavailable_when_not_connected(self): + client = GABSClient() + assert client.connected is False + with pytest.raises(GABSUnavailable): + await client.call("game.getState") + + async def test_sequence_increments(self): + client = self._make_connected_client({"jsonrpc": "2.0", "id": 1, "result": {}}) + await client.call("game.getState") + assert client._seq == 1 + client._reader.readline = AsyncMock( + return_value=(json.dumps({"jsonrpc": "2.0", "id": 2, "result": {}}) + "\n").encode() + ) + await client.call("game.getState") + assert client._seq == 2 + + async def test_get_state_calls_correct_method(self): + client = self._make_connected_client( + {"jsonrpc": "2.0", "id": 1, "result": {"campaign_day": 10}} + ) + result = await client.get_state() + written = client._writer.write.call_args[0][0].decode() + payload = json.loads(written.strip()) + assert payload["method"] == "game.getState" + assert result == {"campaign_day": 10} + + async def test_move_party_sends_target(self): + client = self._make_connected_client( + {"jsonrpc": "2.0", "id": 1, "result": {"moving": True}} + ) + await client.move_party("Epicrotea") + written = client._writer.write.call_args[0][0].decode() + payload = json.loads(written.strip()) + assert payload["method"] == "party.move" + assert payload["params"]["target"] == "Epicrotea" + + async def test_connection_lost_marks_disconnected(self): + client = GABSClient() + client._connected = True + client._reader = AsyncMock() + client._reader.readline = AsyncMock(side_effect=OSError("connection reset")) + client._writer = MagicMock() + client._writer.write = MagicMock() + client._writer.drain = AsyncMock() + + with pytest.raises(GABSUnavailable): + await client.call("game.getState") + + assert client.connected is False diff --git a/tests/unit/test_bannerlord/test_models.py b/tests/unit/test_bannerlord/test_models.py new file mode 100644 index 00000000..4b64d949 --- /dev/null +++ b/tests/unit/test_bannerlord/test_models.py @@ -0,0 +1,190 @@ +"""Unit tests for bannerlord.models — data contracts and reward functions.""" + +import pytest + +from bannerlord.models import ( + SUBGOAL_TOKENS, + DiplomacyReward, + EconomyReward, + KingSubgoal, + ResultMessage, + StateUpdateMessage, + SubgoalMessage, + TaskMessage, + VictoryCondition, + WarReward, +) + + +# ── KingSubgoal ─────────────────────────────────────────────────────────────── + + +class TestKingSubgoal: + def test_valid_token(self): + s = KingSubgoal(token="EXPAND_TERRITORY", target="Epicrotea") + assert s.token == "EXPAND_TERRITORY" + assert s.target == "Epicrotea" + assert s.priority == 1.0 + + def test_all_tokens_valid(self): + for token in SUBGOAL_TOKENS: + KingSubgoal(token=token) + + def test_invalid_token_raises(self): + with pytest.raises(ValueError, match="Unknown subgoal token"): + KingSubgoal(token="NUKE_CALRADIA") + + def test_priority_clamp(self): + with pytest.raises(Exception): + KingSubgoal(token="TRADE", priority=3.0) + + def test_optional_fields_default_none(self): + s = KingSubgoal(token="HEAL") + assert s.target is None + assert s.quantity is None + assert s.deadline_days is None + assert s.context is None + + +# ── Messages ────────────────────────────────────────────────────────────────── + + +class TestSubgoalMessage: + def test_defaults(self): + msg = SubgoalMessage( + to_agent="war_vassal", + subgoal=KingSubgoal(token="RAID_ECONOMY"), + ) + assert msg.msg_type == "subgoal" + assert msg.from_agent == "king" + assert msg.to_agent == "war_vassal" + assert msg.issued_at is not None + + def test_subgoal_roundtrip(self): + subgoal = KingSubgoal(token="RECRUIT", quantity=30, priority=1.5) + msg = SubgoalMessage(to_agent="war_vassal", subgoal=subgoal) + assert msg.subgoal.quantity == 30 + assert msg.subgoal.priority == 1.5 + + +class TestTaskMessage: + def test_construction(self): + t = TaskMessage( + from_agent="war_vassal", + to_agent="logistics_companion", + primitive="recruit_troop", + args={"troop_type": "cavalry", "quantity": 5}, + priority=1.2, + ) + assert t.msg_type == "task" + assert t.primitive == "recruit_troop" + assert t.args["quantity"] == 5 + + +class TestResultMessage: + def test_success(self): + r = ResultMessage( + from_agent="logistics_companion", + to_agent="war_vassal", + success=True, + outcome={"recruited": 10}, + reward_delta=0.15, + ) + assert r.success is True + assert r.reward_delta == 0.15 + + def test_failure(self): + r = ResultMessage( + from_agent="scout_companion", + to_agent="diplomacy_vassal", + success=False, + outcome={"error": "GABS unavailable"}, + ) + assert r.success is False + assert r.reward_delta == 0.0 + + +class TestStateUpdateMessage: + def test_construction(self): + msg = StateUpdateMessage( + game_state={"campaign_day": 42, "player_title": "Lord"}, + tick=42, + ) + assert msg.msg_type == "state" + assert msg.tick == 42 + assert msg.game_state["campaign_day"] == 42 + + +# ── Reward functions ────────────────────────────────────────────────────────── + + +class TestWarReward: + def test_positive_expansion(self): + r = WarReward(territory_delta=2.0, army_strength_ratio=1.2, subgoal_bonus=0.1) + assert r.total > 0 + + def test_casualty_cost_penalizes(self): + no_cost = WarReward(territory_delta=1.0, army_strength_ratio=1.0) + with_cost = WarReward(territory_delta=1.0, army_strength_ratio=1.0, casualty_cost=5.0) + assert with_cost.total < no_cost.total + + def test_zero_state(self): + r = WarReward() + # army_strength_ratio default 1.0, rest 0 → 0.25 * 1.0 = 0.25 + assert abs(r.total - 0.25) < 1e-9 + + +class TestEconomyReward: + def test_income_positive(self): + r = EconomyReward(daily_denars_income=100.0, food_stock_buffer=7.0, loyalty_average=80.0) + assert r.total > 0 + + def test_construction_queue_penalizes(self): + no_queue = EconomyReward(daily_denars_income=50.0) + long_queue = EconomyReward(daily_denars_income=50.0, construction_queue_length=10) + assert long_queue.total < no_queue.total + + def test_loyalty_contributes(self): + low_loyalty = EconomyReward(loyalty_average=10.0) + high_loyalty = EconomyReward(loyalty_average=90.0) + assert high_loyalty.total > low_loyalty.total + + +class TestDiplomacyReward: + def test_allies_positive(self): + r = DiplomacyReward(allies_count=3) + assert r.total > 0 + + def test_active_wars_penalizes(self): + peace = DiplomacyReward(allies_count=2) + war = DiplomacyReward(allies_count=2, active_wars_front=4) + assert war.total < peace.total + + +# ── Victory condition ───────────────────────────────────────────────────────── + + +class TestVictoryCondition: + def test_not_achieved_without_title(self): + v = VictoryCondition(holds_king_title=False, territory_control_pct=70.0) + assert not v.achieved + + def test_not_achieved_without_majority(self): + v = VictoryCondition(holds_king_title=True, territory_control_pct=40.0) + assert not v.achieved + + def test_achieved_when_king_with_majority(self): + v = VictoryCondition(holds_king_title=True, territory_control_pct=55.0) + assert v.achieved + + def test_exact_threshold(self): + v = VictoryCondition(holds_king_title=True, territory_control_pct=51.0) + assert v.achieved + + def test_custom_threshold(self): + v = VictoryCondition( + holds_king_title=True, + territory_control_pct=70.0, + majority_threshold=75.0, + ) + assert not v.achieved