1
0
This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Timmy-time-dashboard/src/bannerlord/gabs_client.py
Alexander Whitestone 46ef9d3aba WIP: Claude Code progress on #1094
Automated salvage commit — agent session ended (exit 124).
Work in progress, may need continuation.
2026-03-23 14:26:15 -04:00

196 lines
6.0 KiB
Python

"""GABS TCP/JSON-RPC client for Bannerlord.
Connects to the GABS C# mod (Bannerlord.GABS) over TCP on port 4825
and dispatches JSON-RPC 2.0 requests. All I/O is async; synchronous
callers must wrap in ``asyncio.to_thread()``.
Architecture:
Bannerlord (Windows VM) ← GABS C# mod ← TCP:4825 ← this client
Usage::
async with GabsClient() as client:
state = await client.get_game_state()
result = await client.call("party/move_to_settlement",
{"settlement_id": "town_A1"})
"""
from __future__ import annotations
import asyncio
import json
import logging
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
# JSON-RPC framing: each message is newline-delimited UTF-8 JSON.
_ENCODING = "utf-8"
_NEWLINE = b"\n"
_DEFAULT_TIMEOUT = 30.0
class GabsError(Exception):
"""Raised when GABS returns a JSON-RPC error response."""
def __init__(self, code: int, message: str, data: Any = None) -> None:
super().__init__(f"GABS error {code}: {message}")
self.code = code
self.data = data
class GabsClient:
"""Async TCP JSON-RPC 2.0 client for the GABS Bannerlord mod.
Parameters
----------
host:
GABS server host (Windows VM IP or ``localhost`` for port-forwarded).
port:
GABS server port (default 4825).
timeout:
Per-call timeout in seconds.
"""
def __init__(
self,
*,
host: str | None = None,
port: int | None = None,
timeout: float = _DEFAULT_TIMEOUT,
) -> None:
self._host = host or settings.gabs_host
self._port = port or settings.gabs_port
self._timeout = timeout
self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None
self._req_id = 0
self._connected = False
# -- lifecycle ---------------------------------------------------------
async def connect(self) -> None:
"""Open the TCP connection to GABS."""
try:
self._reader, self._writer = await asyncio.wait_for(
asyncio.open_connection(self._host, self._port),
timeout=self._timeout,
)
self._connected = True
logger.info("GabsClient connected to %s:%d", self._host, self._port)
except (OSError, asyncio.TimeoutError) as exc:
logger.warning("GabsClient could not connect to GABS: %s", exc)
self._connected = False
raise
async def disconnect(self) -> None:
"""Close the TCP connection."""
if self._writer is not None:
try:
self._writer.close()
await self._writer.wait_closed()
except Exception as exc: # noqa: BLE001
logger.debug("GabsClient disconnect error (ignored): %s", exc)
self._connected = False
self._reader = None
self._writer = None
logger.info("GabsClient disconnected")
@property
def is_connected(self) -> bool:
return self._connected
# -- context manager ---------------------------------------------------
async def __aenter__(self) -> "GabsClient":
await self.connect()
return self
async def __aexit__(self, *_: Any) -> None:
await self.disconnect()
# -- public API --------------------------------------------------------
async def call(self, method: str, params: dict[str, Any] | None = None) -> Any:
"""Call a GABS tool and return the result.
Parameters
----------
method:
GABS tool name, e.g. ``"party/move_to_settlement"``.
params:
Tool parameters dict.
Returns
-------
Any
The ``result`` field from the JSON-RPC response.
Raises
------
GabsError
If GABS returns an error response.
RuntimeError
If not connected.
"""
if not self._connected or self._writer is None or self._reader is None:
raise RuntimeError("GabsClient is not connected — call connect() first")
self._req_id += 1
request = {
"jsonrpc": "2.0",
"id": self._req_id,
"method": method,
"params": params or {},
}
raw = json.dumps(request).encode(_ENCODING) + _NEWLINE
try:
self._writer.write(raw)
await asyncio.wait_for(self._writer.drain(), timeout=self._timeout)
line = await asyncio.wait_for(
self._reader.readline(), timeout=self._timeout
)
except asyncio.TimeoutError as exc:
raise RuntimeError(f"GABS call '{method}' timed out after {self._timeout}s") from exc
except (OSError, ConnectionResetError) as exc:
self._connected = False
raise RuntimeError(f"GABS connection lost during '{method}': {exc}") from exc
response = json.loads(line.decode(_ENCODING))
if "error" in response:
err = response["error"]
raise GabsError(
code=err.get("code", -1),
message=err.get("message", "unknown error"),
data=err.get("data"),
)
return response.get("result")
async def get_game_state(self) -> dict[str, Any]:
"""Return the full game state snapshot from GABS.
Returns an empty dict and logs a warning if GABS is unreachable.
"""
try:
result = await self.call("game/get_state")
return result if isinstance(result, dict) else {}
except (GabsError, RuntimeError) as exc:
logger.warning("GABS get_game_state failed: %s", exc)
return {}
async def ping(self) -> bool:
"""Return True if GABS responds to a ping."""
try:
await self.call("game/ping")
return True
except Exception as exc: # noqa: BLE001
logger.debug("GABS ping failed: %s", exc)
return False