forked from Rockachopa/Timmy-time-dashboard
199 lines
7.5 KiB
Python
199 lines
7.5 KiB
Python
"""GABS TCP/JSON-RPC client.
|
|
|
|
Connects to the Bannerlord.GABS C# mod server running on a Windows VM.
|
|
Protocol: newline-delimited JSON-RPC 2.0 over raw TCP.
|
|
|
|
Default host: localhost, port: 4825 (configurable via settings.bannerlord_gabs_host
|
|
and settings.bannerlord_gabs_port).
|
|
|
|
Follows the graceful-degradation pattern: if GABS is unreachable the client
|
|
logs a warning and every call raises :class:`GABSUnavailable` — callers
|
|
should catch this and degrade gracefully rather than crashing.
|
|
|
|
Refs: #1091, #1097.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_DEFAULT_HOST = "localhost"
|
|
_DEFAULT_PORT = 4825
|
|
_DEFAULT_TIMEOUT = 10.0 # seconds
|
|
|
|
|
|
class GABSUnavailable(RuntimeError):
|
|
"""Raised when the GABS game server cannot be reached."""
|
|
|
|
|
|
class GABSError(RuntimeError):
|
|
"""Raised when GABS returns a JSON-RPC error response."""
|
|
|
|
def __init__(self, code: int, message: str) -> None:
|
|
super().__init__(f"GABS error {code}: {message}")
|
|
self.code = code
|
|
|
|
|
|
class GABSClient:
|
|
"""Async TCP JSON-RPC client for Bannerlord.GABS.
|
|
|
|
Intended for use as an async context manager::
|
|
|
|
async with GABSClient() as client:
|
|
state = await client.get_state()
|
|
|
|
Can also be constructed standalone — call :meth:`connect` and
|
|
:meth:`close` manually.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
host: str = _DEFAULT_HOST,
|
|
port: int = _DEFAULT_PORT,
|
|
timeout: float = _DEFAULT_TIMEOUT,
|
|
) -> None:
|
|
self._host = host
|
|
self._port = port
|
|
self._timeout = timeout
|
|
self._reader: asyncio.StreamReader | None = None
|
|
self._writer: asyncio.StreamWriter | None = None
|
|
self._seq = 0
|
|
self._connected = False
|
|
|
|
# ── Lifecycle ─────────────────────────────────────────────────────────
|
|
|
|
async def connect(self) -> None:
|
|
"""Open the TCP connection to GABS.
|
|
|
|
Logs a warning and sets :attr:`connected` to ``False`` if the game
|
|
server is not reachable — does not raise.
|
|
"""
|
|
try:
|
|
self._reader, self._writer = await asyncio.wait_for(
|
|
asyncio.open_connection(self._host, self._port),
|
|
timeout=self._timeout,
|
|
)
|
|
self._connected = True
|
|
logger.info("GABS connected at %s:%s", self._host, self._port)
|
|
except (TimeoutError, OSError) as exc:
|
|
logger.warning(
|
|
"GABS unavailable at %s:%s — Bannerlord agent will degrade: %s",
|
|
self._host,
|
|
self._port,
|
|
exc,
|
|
)
|
|
self._connected = False
|
|
|
|
async def close(self) -> None:
|
|
if self._writer is not None:
|
|
try:
|
|
self._writer.close()
|
|
await self._writer.wait_closed()
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
self._connected = False
|
|
logger.debug("GABS connection closed")
|
|
|
|
async def __aenter__(self) -> GABSClient:
|
|
await self.connect()
|
|
return self
|
|
|
|
async def __aexit__(self, *_: Any) -> None:
|
|
await self.close()
|
|
|
|
@property
|
|
def connected(self) -> bool:
|
|
return self._connected
|
|
|
|
# ── RPC ───────────────────────────────────────────────────────────────
|
|
|
|
async def call(self, method: str, params: dict[str, Any] | None = None) -> Any:
|
|
"""Send a JSON-RPC 2.0 request and return the ``result`` field.
|
|
|
|
Raises:
|
|
GABSUnavailable: if the client is not connected.
|
|
GABSError: if the server returns a JSON-RPC error.
|
|
"""
|
|
if not self._connected or self._reader is None or self._writer is None:
|
|
raise GABSUnavailable(
|
|
f"GABS not connected (host={self._host}, port={self._port}). "
|
|
"Is the Bannerlord VM running?"
|
|
)
|
|
|
|
self._seq += 1
|
|
request = {
|
|
"jsonrpc": "2.0",
|
|
"id": self._seq,
|
|
"method": method,
|
|
"params": params or {},
|
|
}
|
|
payload = json.dumps(request) + "\n"
|
|
|
|
try:
|
|
self._writer.write(payload.encode())
|
|
await asyncio.wait_for(self._writer.drain(), timeout=self._timeout)
|
|
|
|
raw = await asyncio.wait_for(self._reader.readline(), timeout=self._timeout)
|
|
except (TimeoutError, OSError) as exc:
|
|
self._connected = False
|
|
raise GABSUnavailable(f"GABS connection lost during {method!r}: {exc}") from exc
|
|
|
|
response = json.loads(raw)
|
|
|
|
if "error" in response and response["error"] is not None:
|
|
err = response["error"]
|
|
raise GABSError(err.get("code", -1), err.get("message", "unknown"))
|
|
|
|
return response.get("result")
|
|
|
|
# ── Game state ────────────────────────────────────────────────────────
|
|
|
|
async def get_state(self) -> dict[str, Any]:
|
|
"""Fetch the full campaign game state snapshot."""
|
|
return await self.call("game.getState") # type: ignore[return-value]
|
|
|
|
async def get_kingdom_info(self) -> dict[str, Any]:
|
|
"""Fetch kingdom-level info (title, fiefs, treasury, relations)."""
|
|
return await self.call("kingdom.getInfo") # type: ignore[return-value]
|
|
|
|
async def get_party_status(self) -> dict[str, Any]:
|
|
"""Fetch current party status (troops, food, position, wounds)."""
|
|
return await self.call("party.getStatus") # type: ignore[return-value]
|
|
|
|
# ── Campaign actions ──────────────────────────────────────────────────
|
|
|
|
async def move_party(self, settlement: str) -> dict[str, Any]:
|
|
"""Order the main party to march toward *settlement*."""
|
|
return await self.call("party.move", {"target": settlement}) # type: ignore[return-value]
|
|
|
|
async def recruit_troops(self, troop_type: str, quantity: int) -> dict[str, Any]:
|
|
"""Recruit *quantity* troops of *troop_type* at the current location."""
|
|
return await self.call( # type: ignore[return-value]
|
|
"party.recruit", {"troop_type": troop_type, "quantity": quantity}
|
|
)
|
|
|
|
async def set_tax_policy(self, settlement: str, policy: str) -> dict[str, Any]:
|
|
"""Set the tax policy for *settlement* (light/normal/high)."""
|
|
return await self.call( # type: ignore[return-value]
|
|
"settlement.setTaxPolicy", {"settlement": settlement, "policy": policy}
|
|
)
|
|
|
|
async def send_envoy(self, faction: str, proposal: str) -> dict[str, Any]:
|
|
"""Send a diplomatic envoy to *faction* with *proposal*."""
|
|
return await self.call( # type: ignore[return-value]
|
|
"diplomacy.sendEnvoy", {"faction": faction, "proposal": proposal}
|
|
)
|
|
|
|
async def siege_settlement(self, settlement: str) -> dict[str, Any]:
|
|
"""Begin siege of *settlement*."""
|
|
return await self.call("battle.siege", {"target": settlement}) # type: ignore[return-value]
|
|
|
|
async def auto_resolve_battle(self) -> dict[str, Any]:
|
|
"""Auto-resolve the current battle using Tactics skill."""
|
|
return await self.call("battle.autoResolve") # type: ignore[return-value]
|