1
0
This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Timmy-time-dashboard/src/integrations/bannerlord/observer.py
2026-03-23 19:06:11 +00:00

242 lines
9.2 KiB
Python

"""Bannerlord Observer — Passive Lord (M1).
Implements the observe() loop: poll GABS for game state and write a
structured journal entry to the configured journal file (default
``memory/bannerlord/journal.md``).
This is pure observation — no actions are taken. The observer records
state every ``gabs_poll_interval`` seconds and tracks how many in-game
days have been observed.
Usage::
from integrations.bannerlord.observer import BannerlordObserver
observer = BannerlordObserver()
await observer.observe() # runs indefinitely
await observer.observe(days=7) # stop after 7 in-game days observed
Refs: #1093 (M1 Observer), #1091 (Epic)
"""
from __future__ import annotations
import asyncio
import logging
import os
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from config import settings
from integrations.bannerlord.gabs_client import GabsClient, GabsError
logger = logging.getLogger(__name__)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _get_journal_path() -> Path:
"""Resolve the journal file path from settings (relative to repo root)."""
repo_root = getattr(settings, "repo_root", None) or os.getcwd()
return Path(repo_root) / settings.gabs_journal_path
def _format_journal_entry(
snapshot: dict[str, Any],
wall_ts: datetime,
entry_num: int,
) -> str:
"""Format a game-state snapshot as a Markdown journal entry.
Args:
snapshot: Merged dict of all GABS responses.
wall_ts: Wall-clock timestamp of the observation.
entry_num: Sequential entry counter.
Returns:
A Markdown string ready to append to the journal file.
"""
ts = wall_ts.strftime("%Y-%m-%d %H:%M:%S UTC")
# ── Game state fields ─────────────────────────────────────────────
game: dict[str, Any] = snapshot.get("game_state", {})
hero: dict[str, Any] = snapshot.get("player", {})
party: dict[str, Any] = snapshot.get("player_party", {})
kingdoms: list[dict[str, Any]] = snapshot.get("kingdoms", [])
in_game_day = game.get("day", "?")
in_game_season = game.get("season", "?")
campaign_phase = game.get("campaign_phase", "?")
hero_name = hero.get("name", "unknown")
hero_clan = hero.get("clan", "?")
hero_renown = hero.get("renown", "?")
hero_level = hero.get("level", "?")
hero_gold = hero.get("gold", "?")
hero_location = hero.get("current_settlement", hero.get("location", "?"))
party_size = party.get("size", "?")
party_morale = party.get("morale", "?")
party_food_days = party.get("food_days_left", "?")
# ── Kingdom summary ───────────────────────────────────────────────
kingdom_lines = []
for k in kingdoms[:6]: # cap at 6 to keep entries readable
name = k.get("name", "?")
ruler = k.get("ruler", "?")
strength = k.get("military_strength", "?")
kingdom_lines.append(f" - {name} (ruler: {ruler}, strength: {strength})")
kingdoms_section = "\n".join(kingdom_lines) if kingdom_lines else " - (no data)"
return f"""
---
## Entry #{entry_num:04d} — Day {in_game_day} / {in_game_season}
**Observed:** {ts}
**Campaign phase:** {campaign_phase}
### Hero
- **Name:** {hero_name} ({hero_clan})
- **Level:** {hero_level} | **Renown:** {hero_renown} | **Gold:** {hero_gold} d
- **Location:** {hero_location}
### Party
- **Size:** {party_size} troops | **Morale:** {party_morale} | **Food:** {party_food_days} days
### Kingdoms
{kingdoms_section}
"""
# ── Observer ──────────────────────────────────────────────────────────────────
class BannerlordObserver:
"""Poll GABS and journal Bannerlord game state to Markdown.
Args:
host: GABS VM host (defaults to ``settings.gabs_host``).
port: GABS port (defaults to ``settings.gabs_port``).
timeout: Socket timeout in seconds.
poll_interval: Seconds between polls (defaults to ``settings.gabs_poll_interval``).
journal_path: Override the output path (defaults to ``settings.gabs_journal_path``).
"""
def __init__(
self,
host: str | None = None,
port: int | None = None,
timeout: float | None = None,
poll_interval: int | None = None,
journal_path: str | None = None,
) -> None:
self._host = host or settings.gabs_host
self._port = port or settings.gabs_port
self._timeout = timeout if timeout is not None else settings.gabs_timeout
self._poll_interval = (
poll_interval if poll_interval is not None else settings.gabs_poll_interval
)
self._journal_path = Path(journal_path) if journal_path else _get_journal_path()
self._entry_count = 0
self._days_observed: set[str] = set()
# ── Public ────────────────────────────────────────────────────────
async def observe(self, days: int = 0) -> None:
"""Run the observer loop.
Args:
days: Stop after this many unique in-game days have been logged.
Pass ``0`` (default) to run indefinitely.
"""
logger.info(
"BannerlordObserver starting — target=%s:%d interval=%ds journal=%s",
self._host,
self._port,
self._poll_interval,
self._journal_path,
)
self._ensure_journal_header()
client = GabsClient(host=self._host, port=self._port, timeout=self._timeout)
while True:
snapshot = await asyncio.to_thread(self._poll_snapshot, client)
if snapshot is not None:
self._entry_count += 1
wall_ts = datetime.now(UTC)
entry = _format_journal_entry(snapshot, wall_ts, self._entry_count)
await asyncio.to_thread(self._append_to_journal, entry)
in_game_day = str(snapshot.get("game_state", {}).get("day", ""))
if in_game_day:
self._days_observed.add(in_game_day)
logger.info(
"Observer entry #%d — in-game day %s (%d unique days seen)",
self._entry_count,
in_game_day,
len(self._days_observed),
)
if days and len(self._days_observed) >= days:
logger.info(
"Observer goal reached: %d in-game days observed. Stopping.",
days,
)
return
await asyncio.sleep(self._poll_interval)
# ── Internal ──────────────────────────────────────────────────────
def _poll_snapshot(self, client: GabsClient) -> dict[str, Any] | None:
"""Synchronous: call GABS and return a merged snapshot dict.
Returns None on failure (GABS unreachable — degrade gracefully).
"""
snapshot: dict[str, Any] = {}
try:
snapshot["game_state"] = client.get_game_state()
except GabsError as exc:
logger.warning("GABS get_game_state failed: %s", exc)
return None
for method, key, fetcher in [
("hero/get_player", "player", client.get_player),
("party/get_player_party", "player_party", client.get_player_party),
("kingdom/list_kingdoms", "kingdoms", client.list_kingdoms),
]:
try:
snapshot[key] = fetcher()
except GabsError as exc:
logger.warning("GABS %s failed (partial snapshot): %s", method, exc)
snapshot[key] = {} if key != "kingdoms" else []
return snapshot
def _ensure_journal_header(self) -> None:
"""Create the journal file with a Markdown header if it doesn't exist."""
if self._journal_path.exists():
return
self._journal_path.parent.mkdir(parents=True, exist_ok=True)
header = (
"# Bannerlord Journal — Timmy's Campaign Observations\n\n"
"> Passive Lord (M1) — Observer mode. "
"Timmy watches, learns, and waits.\n\n"
"Epic: #1091 · M1: #1093\n"
)
self._journal_path.write_text(header, encoding="utf-8")
logger.info("Created journal at %s", self._journal_path)
def _append_to_journal(self, entry: str) -> None:
"""Append a formatted entry to the journal file."""
try:
with self._journal_path.open("a", encoding="utf-8") as fh:
fh.write(entry)
except OSError as exc:
logger.error("Failed to write journal entry: %s", exc)