forked from Rockachopa/Timmy-time-dashboard
This commit is contained in:
@@ -374,6 +374,21 @@ class Settings(BaseSettings):
|
|||||||
error_feedback_enabled: bool = True # Auto-create bug report tasks
|
error_feedback_enabled: bool = True # Auto-create bug report tasks
|
||||||
error_dedup_window_seconds: int = 300 # 5-min dedup window
|
error_dedup_window_seconds: int = 300 # 5-min dedup window
|
||||||
|
|
||||||
|
# ── Bannerlord / GABS ────────────────────────────────────────────
|
||||||
|
# GABS (Game Action Bridge Server) TCP JSON-RPC endpoint.
|
||||||
|
# The GABS mod runs inside the Windows VM and exposes a JSON-RPC server
|
||||||
|
# on port 4825 that Timmy uses to read and act on Bannerlord game state.
|
||||||
|
# Set GABS_HOST to the VM's LAN IP (e.g. "10.0.0.50") to enable.
|
||||||
|
gabs_enabled: bool = False
|
||||||
|
gabs_host: str = "127.0.0.1"
|
||||||
|
gabs_port: int = 4825
|
||||||
|
gabs_timeout: float = 5.0 # socket timeout in seconds
|
||||||
|
# How often (seconds) the observer polls GABS for fresh game state.
|
||||||
|
gabs_poll_interval: int = 60
|
||||||
|
# Path to the Bannerlord journal inside the memory vault.
|
||||||
|
# Relative to repo root. Written by the GABS observer loop.
|
||||||
|
gabs_journal_path: str = "memory/bannerlord/journal.md"
|
||||||
|
|
||||||
# ── Scripture / Biblical Integration ──────────────────────────────
|
# ── Scripture / Biblical Integration ──────────────────────────────
|
||||||
# Enable the biblical text module.
|
# Enable the biblical text module.
|
||||||
scripture_enabled: bool = True
|
scripture_enabled: bool = True
|
||||||
|
|||||||
9
src/integrations/bannerlord/__init__.py
Normal file
9
src/integrations/bannerlord/__init__.py
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
"""Bannerlord — GABS TCP bridge for Mount & Blade II: Bannerlord.
|
||||||
|
|
||||||
|
Provides:
|
||||||
|
- GabsClient: low-level JSON-RPC 2.0 TCP client (port 4825)
|
||||||
|
- BannerlordObserver: observe() loop that polls game state and journals to SOUL.md
|
||||||
|
|
||||||
|
Epic: #1091 (Project Bannerlord)
|
||||||
|
M1: #1093 (Passive Lord — Observer Mode via GABS)
|
||||||
|
"""
|
||||||
148
src/integrations/bannerlord/gabs_client.py
Normal file
148
src/integrations/bannerlord/gabs_client.py
Normal file
@@ -0,0 +1,148 @@
|
|||||||
|
"""GABS TCP JSON-RPC 2.0 client.
|
||||||
|
|
||||||
|
Low-level transport layer for communicating with the Bannerlord.GABS mod.
|
||||||
|
GABS runs inside the Windows VM and listens on port 4825. Messages are
|
||||||
|
newline-delimited JSON-RPC 2.0.
|
||||||
|
|
||||||
|
Wire format::
|
||||||
|
|
||||||
|
-> {"jsonrpc":"2.0","method":"core/get_game_state","id":1}\\n
|
||||||
|
<- {"jsonrpc":"2.0","result":{...},"id":1}\\n
|
||||||
|
|
||||||
|
All public methods raise :class:`GabsError` on failure so callers can
|
||||||
|
degrade gracefully without inspecting raw socket errors.
|
||||||
|
|
||||||
|
Refs: #1093 (M1 Observer), #1091 (Epic)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import socket
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_DEFAULT_HOST = "127.0.0.1"
|
||||||
|
_DEFAULT_PORT = 4825
|
||||||
|
_DEFAULT_TIMEOUT = 5.0
|
||||||
|
_RECV_BUFSIZE = 4096
|
||||||
|
|
||||||
|
|
||||||
|
class GabsError(Exception):
|
||||||
|
"""Raised when a GABS call fails (connection, protocol, or RPC error)."""
|
||||||
|
|
||||||
|
|
||||||
|
class GabsClient:
|
||||||
|
"""Synchronous TCP JSON-RPC 2.0 client for Bannerlord.GABS.
|
||||||
|
|
||||||
|
Each public call opens a fresh TCP connection, sends the request, reads
|
||||||
|
the response, and closes the socket. This avoids persistent-connection
|
||||||
|
complexity and is fast enough for poll intervals of ≥1 s.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
host: VM IP or hostname (default ``127.0.0.1``).
|
||||||
|
port: GABS TCP port (default ``4825``).
|
||||||
|
timeout: Socket timeout in seconds (default ``5.0``).
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
host: str = _DEFAULT_HOST,
|
||||||
|
port: int = _DEFAULT_PORT,
|
||||||
|
timeout: float = _DEFAULT_TIMEOUT,
|
||||||
|
) -> None:
|
||||||
|
self.host = host
|
||||||
|
self.port = port
|
||||||
|
self.timeout = timeout
|
||||||
|
self._req_id = 0
|
||||||
|
|
||||||
|
# ── Public API ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def call(self, method: str, params: dict[str, Any] | None = None) -> Any:
|
||||||
|
"""Send a JSON-RPC request and return the ``result`` value.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
method: RPC method name (e.g. ``"core/get_game_state"``).
|
||||||
|
params: Optional parameters dict.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The ``result`` field from the JSON-RPC response.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
GabsError: On any connection, protocol, or application-level error.
|
||||||
|
"""
|
||||||
|
self._req_id += 1
|
||||||
|
payload: dict[str, Any] = {
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"method": method,
|
||||||
|
"id": self._req_id,
|
||||||
|
}
|
||||||
|
if params:
|
||||||
|
payload["params"] = params
|
||||||
|
|
||||||
|
try:
|
||||||
|
sock = socket.create_connection((self.host, self.port), timeout=self.timeout)
|
||||||
|
except OSError as exc:
|
||||||
|
raise GabsError(f"TCP connect to {self.host}:{self.port} failed: {exc}") from exc
|
||||||
|
|
||||||
|
try:
|
||||||
|
sock.settimeout(self.timeout)
|
||||||
|
raw = json.dumps(payload) + "\n"
|
||||||
|
sock.sendall(raw.encode())
|
||||||
|
|
||||||
|
buf = b""
|
||||||
|
while b"\n" not in buf:
|
||||||
|
chunk = sock.recv(_RECV_BUFSIZE)
|
||||||
|
if not chunk:
|
||||||
|
raise GabsError("Connection closed before response received")
|
||||||
|
buf += chunk
|
||||||
|
|
||||||
|
line = buf.split(b"\n", 1)[0]
|
||||||
|
resp: dict[str, Any] = json.loads(line.decode())
|
||||||
|
except GabsError:
|
||||||
|
raise
|
||||||
|
except json.JSONDecodeError as exc:
|
||||||
|
raise GabsError(f"Malformed JSON from GABS: {exc}") from exc
|
||||||
|
except OSError as exc:
|
||||||
|
raise GabsError(f"Socket error reading from GABS: {exc}") from exc
|
||||||
|
finally:
|
||||||
|
sock.close()
|
||||||
|
|
||||||
|
if "error" in resp:
|
||||||
|
err = resp["error"]
|
||||||
|
code = err.get("code", "?")
|
||||||
|
msg = err.get("message", "unknown error")
|
||||||
|
raise GabsError(f"GABS RPC error [{code}]: {msg}")
|
||||||
|
|
||||||
|
return resp.get("result")
|
||||||
|
|
||||||
|
def ping(self) -> bool:
|
||||||
|
"""Return True if GABS responds to a ping, False otherwise."""
|
||||||
|
try:
|
||||||
|
self.call("ping")
|
||||||
|
return True
|
||||||
|
except GabsError as exc:
|
||||||
|
logger.debug("GABS ping failed: %s", exc)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_game_state(self) -> dict[str, Any]:
|
||||||
|
"""Return the current Bannerlord campaign game state."""
|
||||||
|
result = self.call("core/get_game_state")
|
||||||
|
return result if isinstance(result, dict) else {}
|
||||||
|
|
||||||
|
def get_player(self) -> dict[str, Any]:
|
||||||
|
"""Return the player hero's stats and status."""
|
||||||
|
result = self.call("hero/get_player")
|
||||||
|
return result if isinstance(result, dict) else {}
|
||||||
|
|
||||||
|
def get_player_party(self) -> dict[str, Any]:
|
||||||
|
"""Return the player's party composition and stats."""
|
||||||
|
result = self.call("party/get_player_party")
|
||||||
|
return result if isinstance(result, dict) else {}
|
||||||
|
|
||||||
|
def list_kingdoms(self) -> list[dict[str, Any]]:
|
||||||
|
"""Return the list of all active kingdoms in the campaign."""
|
||||||
|
result = self.call("kingdom/list_kingdoms")
|
||||||
|
return result if isinstance(result, list) else []
|
||||||
239
src/integrations/bannerlord/observer.py
Normal file
239
src/integrations/bannerlord/observer.py
Normal file
@@ -0,0 +1,239 @@
|
|||||||
|
"""Bannerlord Observer — Passive Lord (M1).
|
||||||
|
|
||||||
|
Implements the observe() loop: poll GABS for game state and write a
|
||||||
|
structured journal entry to the configured journal file (default
|
||||||
|
``memory/bannerlord/journal.md``).
|
||||||
|
|
||||||
|
This is pure observation — no actions are taken. The observer records
|
||||||
|
state every ``gabs_poll_interval`` seconds and tracks how many in-game
|
||||||
|
days have been observed.
|
||||||
|
|
||||||
|
Usage::
|
||||||
|
|
||||||
|
from integrations.bannerlord.observer import BannerlordObserver
|
||||||
|
observer = BannerlordObserver()
|
||||||
|
await observer.observe() # runs indefinitely
|
||||||
|
await observer.observe(days=7) # stop after 7 in-game days observed
|
||||||
|
|
||||||
|
Refs: #1093 (M1 Observer), #1091 (Epic)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from config import settings
|
||||||
|
from integrations.bannerlord.gabs_client import GabsClient, GabsError
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _get_journal_path() -> Path:
|
||||||
|
"""Resolve the journal file path from settings (relative to repo root)."""
|
||||||
|
repo_root = getattr(settings, "repo_root", None) or os.getcwd()
|
||||||
|
return Path(repo_root) / settings.gabs_journal_path
|
||||||
|
|
||||||
|
|
||||||
|
def _format_journal_entry(
|
||||||
|
snapshot: dict[str, Any],
|
||||||
|
wall_ts: datetime,
|
||||||
|
entry_num: int,
|
||||||
|
) -> str:
|
||||||
|
"""Format a game-state snapshot as a Markdown journal entry.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
snapshot: Merged dict of all GABS responses.
|
||||||
|
wall_ts: Wall-clock timestamp of the observation.
|
||||||
|
entry_num: Sequential entry counter.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A Markdown string ready to append to the journal file.
|
||||||
|
"""
|
||||||
|
ts = wall_ts.strftime("%Y-%m-%d %H:%M:%S UTC")
|
||||||
|
|
||||||
|
# ── Game state fields ─────────────────────────────────────────────
|
||||||
|
game: dict[str, Any] = snapshot.get("game_state", {})
|
||||||
|
hero: dict[str, Any] = snapshot.get("player", {})
|
||||||
|
party: dict[str, Any] = snapshot.get("player_party", {})
|
||||||
|
kingdoms: list[dict[str, Any]] = snapshot.get("kingdoms", [])
|
||||||
|
|
||||||
|
in_game_day = game.get("day", "?")
|
||||||
|
in_game_season = game.get("season", "?")
|
||||||
|
campaign_phase = game.get("campaign_phase", "?")
|
||||||
|
|
||||||
|
hero_name = hero.get("name", "unknown")
|
||||||
|
hero_clan = hero.get("clan", "?")
|
||||||
|
hero_renown = hero.get("renown", "?")
|
||||||
|
hero_level = hero.get("level", "?")
|
||||||
|
hero_gold = hero.get("gold", "?")
|
||||||
|
hero_location = hero.get("current_settlement", hero.get("location", "?"))
|
||||||
|
|
||||||
|
party_size = party.get("size", "?")
|
||||||
|
party_morale = party.get("morale", "?")
|
||||||
|
party_food_days = party.get("food_days_left", "?")
|
||||||
|
|
||||||
|
# ── Kingdom summary ───────────────────────────────────────────────
|
||||||
|
kingdom_lines = []
|
||||||
|
for k in kingdoms[:6]: # cap at 6 to keep entries readable
|
||||||
|
name = k.get("name", "?")
|
||||||
|
ruler = k.get("ruler", "?")
|
||||||
|
strength = k.get("military_strength", "?")
|
||||||
|
kingdom_lines.append(f" - {name} (ruler: {ruler}, strength: {strength})")
|
||||||
|
kingdoms_section = "\n".join(kingdom_lines) if kingdom_lines else " - (no data)"
|
||||||
|
|
||||||
|
return f"""
|
||||||
|
---
|
||||||
|
|
||||||
|
## Entry #{entry_num:04d} — Day {in_game_day} / {in_game_season}
|
||||||
|
|
||||||
|
**Observed:** {ts}
|
||||||
|
**Campaign phase:** {campaign_phase}
|
||||||
|
|
||||||
|
### Hero
|
||||||
|
- **Name:** {hero_name} ({hero_clan})
|
||||||
|
- **Level:** {hero_level} | **Renown:** {hero_renown} | **Gold:** {hero_gold} d
|
||||||
|
- **Location:** {hero_location}
|
||||||
|
|
||||||
|
### Party
|
||||||
|
- **Size:** {party_size} troops | **Morale:** {party_morale} | **Food:** {party_food_days} days
|
||||||
|
|
||||||
|
### Kingdoms
|
||||||
|
{kingdoms_section}
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
# ── Observer ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class BannerlordObserver:
|
||||||
|
"""Poll GABS and journal Bannerlord game state to Markdown.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
host: GABS VM host (defaults to ``settings.gabs_host``).
|
||||||
|
port: GABS port (defaults to ``settings.gabs_port``).
|
||||||
|
timeout: Socket timeout in seconds.
|
||||||
|
poll_interval: Seconds between polls (defaults to ``settings.gabs_poll_interval``).
|
||||||
|
journal_path: Override the output path (defaults to ``settings.gabs_journal_path``).
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
host: str | None = None,
|
||||||
|
port: int | None = None,
|
||||||
|
timeout: float | None = None,
|
||||||
|
poll_interval: int | None = None,
|
||||||
|
journal_path: str | None = None,
|
||||||
|
) -> None:
|
||||||
|
self._host = host or settings.gabs_host
|
||||||
|
self._port = port or settings.gabs_port
|
||||||
|
self._timeout = timeout if timeout is not None else settings.gabs_timeout
|
||||||
|
self._poll_interval = poll_interval if poll_interval is not None else settings.gabs_poll_interval
|
||||||
|
self._journal_path = Path(journal_path) if journal_path else _get_journal_path()
|
||||||
|
self._entry_count = 0
|
||||||
|
self._days_observed: set[str] = set()
|
||||||
|
|
||||||
|
# ── Public ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
async def observe(self, days: int = 0) -> None:
|
||||||
|
"""Run the observer loop.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
days: Stop after this many unique in-game days have been logged.
|
||||||
|
Pass ``0`` (default) to run indefinitely.
|
||||||
|
"""
|
||||||
|
logger.info(
|
||||||
|
"BannerlordObserver starting — target=%s:%d interval=%ds journal=%s",
|
||||||
|
self._host,
|
||||||
|
self._port,
|
||||||
|
self._poll_interval,
|
||||||
|
self._journal_path,
|
||||||
|
)
|
||||||
|
self._ensure_journal_header()
|
||||||
|
|
||||||
|
client = GabsClient(host=self._host, port=self._port, timeout=self._timeout)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
snapshot = await asyncio.to_thread(self._poll_snapshot, client)
|
||||||
|
|
||||||
|
if snapshot is not None:
|
||||||
|
self._entry_count += 1
|
||||||
|
wall_ts = datetime.now(UTC)
|
||||||
|
entry = _format_journal_entry(snapshot, wall_ts, self._entry_count)
|
||||||
|
await asyncio.to_thread(self._append_to_journal, entry)
|
||||||
|
|
||||||
|
in_game_day = str(snapshot.get("game_state", {}).get("day", ""))
|
||||||
|
if in_game_day:
|
||||||
|
self._days_observed.add(in_game_day)
|
||||||
|
logger.info(
|
||||||
|
"Observer entry #%d — in-game day %s (%d unique days seen)",
|
||||||
|
self._entry_count,
|
||||||
|
in_game_day,
|
||||||
|
len(self._days_observed),
|
||||||
|
)
|
||||||
|
|
||||||
|
if days and len(self._days_observed) >= days:
|
||||||
|
logger.info(
|
||||||
|
"Observer goal reached: %d in-game days observed. Stopping.",
|
||||||
|
days,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
await asyncio.sleep(self._poll_interval)
|
||||||
|
|
||||||
|
# ── Internal ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _poll_snapshot(self, client: GabsClient) -> dict[str, Any] | None:
|
||||||
|
"""Synchronous: call GABS and return a merged snapshot dict.
|
||||||
|
|
||||||
|
Returns None on failure (GABS unreachable — degrade gracefully).
|
||||||
|
"""
|
||||||
|
snapshot: dict[str, Any] = {}
|
||||||
|
|
||||||
|
try:
|
||||||
|
snapshot["game_state"] = client.get_game_state()
|
||||||
|
except GabsError as exc:
|
||||||
|
logger.warning("GABS get_game_state failed: %s", exc)
|
||||||
|
return None
|
||||||
|
|
||||||
|
for method, key, fetcher in [
|
||||||
|
("hero/get_player", "player", client.get_player),
|
||||||
|
("party/get_player_party", "player_party", client.get_player_party),
|
||||||
|
("kingdom/list_kingdoms", "kingdoms", client.list_kingdoms),
|
||||||
|
]:
|
||||||
|
try:
|
||||||
|
snapshot[key] = fetcher()
|
||||||
|
except GabsError as exc:
|
||||||
|
logger.warning("GABS %s failed (partial snapshot): %s", method, exc)
|
||||||
|
snapshot[key] = {} if key != "kingdoms" else []
|
||||||
|
|
||||||
|
return snapshot
|
||||||
|
|
||||||
|
def _ensure_journal_header(self) -> None:
|
||||||
|
"""Create the journal file with a Markdown header if it doesn't exist."""
|
||||||
|
if self._journal_path.exists():
|
||||||
|
return
|
||||||
|
self._journal_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
header = (
|
||||||
|
"# Bannerlord Journal — Timmy's Campaign Observations\n\n"
|
||||||
|
"> Passive Lord (M1) — Observer mode. "
|
||||||
|
"Timmy watches, learns, and waits.\n\n"
|
||||||
|
"Epic: #1091 · M1: #1093\n"
|
||||||
|
)
|
||||||
|
self._journal_path.write_text(header, encoding="utf-8")
|
||||||
|
logger.info("Created journal at %s", self._journal_path)
|
||||||
|
|
||||||
|
def _append_to_journal(self, entry: str) -> None:
|
||||||
|
"""Append a formatted entry to the journal file."""
|
||||||
|
try:
|
||||||
|
with self._journal_path.open("a", encoding="utf-8") as fh:
|
||||||
|
fh.write(entry)
|
||||||
|
except OSError as exc:
|
||||||
|
logger.error("Failed to write journal entry: %s", exc)
|
||||||
288
tests/integrations/test_gabs_observer.py
Normal file
288
tests/integrations/test_gabs_observer.py
Normal file
@@ -0,0 +1,288 @@
|
|||||||
|
"""Unit tests for the Bannerlord GABS client and observer.
|
||||||
|
|
||||||
|
All tests are offline — no real TCP connection is made. Sockets are
|
||||||
|
mocked or substituted with in-process fakes.
|
||||||
|
|
||||||
|
Refs: #1093 (M1 Observer), #1091 (Epic)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import socket
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from integrations.bannerlord.gabs_client import GabsClient, GabsError
|
||||||
|
|
||||||
|
|
||||||
|
# ── GabsClient unit tests ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _make_response(result: object = None, error: dict | None = None, req_id: int = 1) -> bytes:
|
||||||
|
"""Encode a JSON-RPC 2.0 response as newline-delimited bytes."""
|
||||||
|
resp: dict = {"jsonrpc": "2.0", "id": req_id}
|
||||||
|
if error is not None:
|
||||||
|
resp["error"] = error
|
||||||
|
else:
|
||||||
|
resp["result"] = result
|
||||||
|
return (json.dumps(resp) + "\n").encode()
|
||||||
|
|
||||||
|
|
||||||
|
def _mock_socket(response_bytes: bytes) -> MagicMock:
|
||||||
|
"""Return a MagicMock socket that yields *response_bytes* from recv()."""
|
||||||
|
sock = MagicMock(spec=socket.socket)
|
||||||
|
# First recv returns the full response, subsequent calls return b"" (EOF)
|
||||||
|
sock.recv.side_effect = [response_bytes, b""]
|
||||||
|
return sock
|
||||||
|
|
||||||
|
|
||||||
|
class TestGabsClientCall:
|
||||||
|
def test_successful_call_returns_result(self, tmp_path):
|
||||||
|
"""call() returns the result field on a successful JSON-RPC response."""
|
||||||
|
expected = {"day": 42, "season": "spring"}
|
||||||
|
response = _make_response(result=expected)
|
||||||
|
|
||||||
|
with patch("socket.create_connection") as mock_conn:
|
||||||
|
mock_conn.return_value = _mock_socket(response)
|
||||||
|
client = GabsClient()
|
||||||
|
result = client.call("core/get_game_state")
|
||||||
|
|
||||||
|
assert result == expected
|
||||||
|
|
||||||
|
def test_rpc_error_raises_gabs_error(self):
|
||||||
|
"""call() raises GabsError when the server returns an error object."""
|
||||||
|
error = {"code": -32601, "message": "Method not found"}
|
||||||
|
response = _make_response(error=error)
|
||||||
|
|
||||||
|
with patch("socket.create_connection") as mock_conn:
|
||||||
|
mock_conn.return_value = _mock_socket(response)
|
||||||
|
client = GabsClient()
|
||||||
|
with pytest.raises(GabsError, match="Method not found"):
|
||||||
|
client.call("unknown/method")
|
||||||
|
|
||||||
|
def test_tcp_failure_raises_gabs_error(self):
|
||||||
|
"""call() raises GabsError when TCP connection is refused."""
|
||||||
|
with patch("socket.create_connection", side_effect=OSError("Connection refused")):
|
||||||
|
client = GabsClient()
|
||||||
|
with pytest.raises(GabsError, match="TCP connect"):
|
||||||
|
client.call("ping")
|
||||||
|
|
||||||
|
def test_malformed_json_raises_gabs_error(self):
|
||||||
|
"""call() raises GabsError when the server sends invalid JSON."""
|
||||||
|
with patch("socket.create_connection") as mock_conn:
|
||||||
|
bad_sock = MagicMock(spec=socket.socket)
|
||||||
|
bad_sock.recv.return_value = b"not valid json\n"
|
||||||
|
mock_conn.return_value = bad_sock
|
||||||
|
client = GabsClient()
|
||||||
|
with pytest.raises(GabsError, match="Malformed JSON"):
|
||||||
|
client.call("ping")
|
||||||
|
|
||||||
|
def test_connection_closed_early_raises_gabs_error(self):
|
||||||
|
"""call() raises GabsError when the server closes without sending \\n."""
|
||||||
|
with patch("socket.create_connection") as mock_conn:
|
||||||
|
bad_sock = MagicMock(spec=socket.socket)
|
||||||
|
# recv never sends a newline; returns empty bytes on second call
|
||||||
|
bad_sock.recv.side_effect = [b"partial", b""]
|
||||||
|
mock_conn.return_value = bad_sock
|
||||||
|
client = GabsClient()
|
||||||
|
with pytest.raises(GabsError, match="closed before response"):
|
||||||
|
client.call("ping")
|
||||||
|
|
||||||
|
def test_socket_is_closed_after_call(self):
|
||||||
|
"""The socket is closed even after a successful call."""
|
||||||
|
response = _make_response(result="pong")
|
||||||
|
mock_sock = _mock_socket(response)
|
||||||
|
|
||||||
|
with patch("socket.create_connection", return_value=mock_sock):
|
||||||
|
GabsClient().call("ping")
|
||||||
|
|
||||||
|
mock_sock.close.assert_called_once()
|
||||||
|
|
||||||
|
def test_socket_is_closed_after_error(self):
|
||||||
|
"""The socket is closed even when the server returns a JSON-RPC error."""
|
||||||
|
error = {"code": -1, "message": "fail"}
|
||||||
|
response = _make_response(error=error)
|
||||||
|
mock_sock = _mock_socket(response)
|
||||||
|
|
||||||
|
with patch("socket.create_connection", return_value=mock_sock):
|
||||||
|
with pytest.raises(GabsError):
|
||||||
|
GabsClient().call("something")
|
||||||
|
|
||||||
|
mock_sock.close.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
class TestGabsClientHighLevel:
|
||||||
|
def _patched_client(self, method_results: dict) -> GabsClient:
|
||||||
|
"""Return a GabsClient whose call() is stubbed with *method_results*."""
|
||||||
|
client = GabsClient()
|
||||||
|
client.call = MagicMock(side_effect=lambda m, **_: method_results.get(m))
|
||||||
|
return client
|
||||||
|
|
||||||
|
def test_ping_returns_true_on_success(self):
|
||||||
|
client = GabsClient()
|
||||||
|
client.call = MagicMock(return_value=None)
|
||||||
|
assert client.ping() is True
|
||||||
|
|
||||||
|
def test_ping_returns_false_on_gabs_error(self):
|
||||||
|
client = GabsClient()
|
||||||
|
client.call = MagicMock(side_effect=GabsError("timeout"))
|
||||||
|
assert client.ping() is False
|
||||||
|
|
||||||
|
def test_get_game_state_returns_dict(self):
|
||||||
|
client = GabsClient()
|
||||||
|
client.call = MagicMock(return_value={"day": 1, "season": "autumn"})
|
||||||
|
result = client.get_game_state()
|
||||||
|
assert result["day"] == 1
|
||||||
|
|
||||||
|
def test_get_game_state_returns_empty_dict_on_non_dict(self):
|
||||||
|
client = GabsClient()
|
||||||
|
client.call = MagicMock(return_value=None)
|
||||||
|
assert client.get_game_state() == {}
|
||||||
|
|
||||||
|
def test_get_player_returns_dict(self):
|
||||||
|
client = GabsClient()
|
||||||
|
client.call = MagicMock(return_value={"name": "Timmy", "level": 5})
|
||||||
|
result = client.get_player()
|
||||||
|
assert result["name"] == "Timmy"
|
||||||
|
|
||||||
|
def test_list_kingdoms_returns_list(self):
|
||||||
|
client = GabsClient()
|
||||||
|
client.call = MagicMock(return_value=[{"name": "Empire"}, {"name": "Vlandia"}])
|
||||||
|
result = client.list_kingdoms()
|
||||||
|
assert len(result) == 2
|
||||||
|
|
||||||
|
def test_list_kingdoms_returns_empty_list_on_non_list(self):
|
||||||
|
client = GabsClient()
|
||||||
|
client.call = MagicMock(return_value=None)
|
||||||
|
assert client.list_kingdoms() == []
|
||||||
|
|
||||||
|
|
||||||
|
# ── BannerlordObserver unit tests ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestBannerlordObserver:
|
||||||
|
def test_journal_header_created_on_first_run(self, tmp_path):
|
||||||
|
"""ensure_journal_header creates the file if it does not exist."""
|
||||||
|
from integrations.bannerlord.observer import BannerlordObserver
|
||||||
|
|
||||||
|
journal = tmp_path / "test_journal.md"
|
||||||
|
observer = BannerlordObserver(journal_path=str(journal))
|
||||||
|
observer._ensure_journal_header()
|
||||||
|
|
||||||
|
assert journal.exists()
|
||||||
|
content = journal.read_text()
|
||||||
|
assert "Bannerlord Journal" in content
|
||||||
|
assert "#1091" in content
|
||||||
|
|
||||||
|
def test_journal_header_not_overwritten(self, tmp_path):
|
||||||
|
"""ensure_journal_header does not overwrite an existing file."""
|
||||||
|
from integrations.bannerlord.observer import BannerlordObserver
|
||||||
|
|
||||||
|
journal = tmp_path / "existing.md"
|
||||||
|
journal.write_text("# existing content\n")
|
||||||
|
observer = BannerlordObserver(journal_path=str(journal))
|
||||||
|
observer._ensure_journal_header()
|
||||||
|
|
||||||
|
assert journal.read_text() == "# existing content\n"
|
||||||
|
|
||||||
|
def test_append_to_journal(self, tmp_path):
|
||||||
|
"""_append_to_journal appends text to the journal file."""
|
||||||
|
from integrations.bannerlord.observer import BannerlordObserver
|
||||||
|
|
||||||
|
journal = tmp_path / "journal.md"
|
||||||
|
journal.write_text("# header\n")
|
||||||
|
observer = BannerlordObserver(journal_path=str(journal))
|
||||||
|
observer._append_to_journal("\nentry text\n")
|
||||||
|
|
||||||
|
assert "entry text" in journal.read_text()
|
||||||
|
|
||||||
|
def test_poll_snapshot_returns_none_when_gabs_unreachable(self, tmp_path):
|
||||||
|
"""_poll_snapshot returns None when get_game_state fails."""
|
||||||
|
from integrations.bannerlord.observer import BannerlordObserver
|
||||||
|
|
||||||
|
observer = BannerlordObserver(journal_path=str(tmp_path / "j.md"))
|
||||||
|
mock_client = MagicMock()
|
||||||
|
mock_client.get_game_state.side_effect = GabsError("refused")
|
||||||
|
|
||||||
|
result = observer._poll_snapshot(mock_client)
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_poll_snapshot_partial_on_secondary_failure(self, tmp_path):
|
||||||
|
"""_poll_snapshot returns a snapshot even if hero/party calls fail."""
|
||||||
|
from integrations.bannerlord.observer import BannerlordObserver
|
||||||
|
|
||||||
|
observer = BannerlordObserver(journal_path=str(tmp_path / "j.md"))
|
||||||
|
mock_client = MagicMock()
|
||||||
|
mock_client.get_game_state.return_value = {"day": 5}
|
||||||
|
mock_client.get_player.side_effect = GabsError("hero unavailable")
|
||||||
|
mock_client.get_player_party.side_effect = GabsError("party unavailable")
|
||||||
|
mock_client.list_kingdoms.return_value = [{"name": "Empire"}]
|
||||||
|
|
||||||
|
snapshot = observer._poll_snapshot(mock_client)
|
||||||
|
assert snapshot is not None
|
||||||
|
assert snapshot["game_state"]["day"] == 5
|
||||||
|
assert snapshot["player"] == {}
|
||||||
|
assert snapshot["player_party"] == {}
|
||||||
|
assert snapshot["kingdoms"][0]["name"] == "Empire"
|
||||||
|
|
||||||
|
def test_format_journal_entry_contains_key_fields(self, tmp_path):
|
||||||
|
"""_format_journal_entry includes hero name, day, and kingdom data."""
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
|
||||||
|
from integrations.bannerlord.observer import _format_journal_entry
|
||||||
|
|
||||||
|
snapshot = {
|
||||||
|
"game_state": {"day": 7, "season": "winter", "campaign_phase": "early"},
|
||||||
|
"player": {"name": "Timmy", "clan": "Thalheimer", "renown": 42, "level": 3, "gold": 1000},
|
||||||
|
"player_party": {"size": 25, "morale": 80, "food_days_left": 5},
|
||||||
|
"kingdoms": [{"name": "Vlandia", "ruler": "Derthert", "military_strength": 5000}],
|
||||||
|
}
|
||||||
|
ts = datetime(2026, 3, 23, 12, 0, 0, tzinfo=UTC)
|
||||||
|
entry = _format_journal_entry(snapshot, ts, entry_num=1)
|
||||||
|
|
||||||
|
assert "Entry #0001" in entry
|
||||||
|
assert "Day 7" in entry
|
||||||
|
assert "winter" in entry
|
||||||
|
assert "Timmy" in entry
|
||||||
|
assert "Thalheimer" in entry
|
||||||
|
assert "Vlandia" in entry
|
||||||
|
assert "Derthert" in entry
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_observe_stops_after_target_days(self, tmp_path):
|
||||||
|
"""observe(days=2) stops after 2 unique in-game days are logged."""
|
||||||
|
from integrations.bannerlord.observer import BannerlordObserver
|
||||||
|
|
||||||
|
journal = tmp_path / "j.md"
|
||||||
|
observer = BannerlordObserver(
|
||||||
|
poll_interval=0, # no sleep
|
||||||
|
journal_path=str(journal),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Simulate two distinct in-game days across three polls
|
||||||
|
snapshots = [
|
||||||
|
{"game_state": {"day": 1}, "player": {}, "player_party": {}, "kingdoms": []},
|
||||||
|
{"game_state": {"day": 1}, "player": {}, "player_party": {}, "kingdoms": []},
|
||||||
|
{"game_state": {"day": 2}, "player": {}, "player_party": {}, "kingdoms": []},
|
||||||
|
]
|
||||||
|
call_count = 0
|
||||||
|
|
||||||
|
def fake_poll(client):
|
||||||
|
nonlocal call_count
|
||||||
|
if call_count >= len(snapshots):
|
||||||
|
return snapshots[-1]
|
||||||
|
snap = snapshots[call_count]
|
||||||
|
call_count += 1
|
||||||
|
return snap
|
||||||
|
|
||||||
|
observer._poll_snapshot = fake_poll
|
||||||
|
|
||||||
|
await observer.observe(days=2)
|
||||||
|
|
||||||
|
assert len(observer._days_observed) >= 2
|
||||||
|
assert journal.exists()
|
||||||
|
content = journal.read_text()
|
||||||
|
assert "Entry #" in content
|
||||||
Reference in New Issue
Block a user