Automated salvage commit — agent session ended (exit 124). Work in progress, may need continuation.
237 lines
8.9 KiB
Python
237 lines
8.9 KiB
Python
"""King agent — Timmy as sovereign ruler of Calradia.
|
||
|
||
The King operates on the campaign-map timescale. Each campaign tick he:
|
||
1. Reads the full game state from GABS
|
||
2. Evaluates the victory condition
|
||
3. Issues a single KingSubgoal token to the vassal queue
|
||
4. Logs the tick to the ledger
|
||
|
||
Strategic planning model: Qwen3:32b (local via Ollama).
|
||
Decision budget: 5–15 seconds per tick.
|
||
|
||
Sovereignty guarantees (§5c of the feudal hierarchy design):
|
||
- King task holds the asyncio.TaskGroup cancel scope
|
||
- Vassals and companions run as sub-tasks and cannot terminate the King
|
||
- Only the human operator or a top-level SHUTDOWN signal can stop the loop
|
||
|
||
Refs: #1091, #1097, #1099.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
from typing import Any
|
||
|
||
from bannerlord.gabs_client import GABSClient, GABSUnavailable
|
||
from bannerlord.ledger import Ledger
|
||
from bannerlord.models import (
|
||
KingSubgoal,
|
||
StateUpdateMessage,
|
||
SubgoalMessage,
|
||
VictoryCondition,
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
_KING_MODEL = "qwen3:32b"
|
||
_KING_TICK_SECONDS = 5.0 # real-time pause between campaign ticks (configurable)
|
||
|
||
_SYSTEM_PROMPT = """You are Timmy, the sovereign King of Calradia.
|
||
Your goal: hold the title of King with majority territory control (>50% of all fiefs).
|
||
You think strategically over 100+ in-game days. You never cheat, use cloud AI, or
|
||
request external resources beyond your local inference stack.
|
||
|
||
Each turn you receive the full game state as JSON. You respond with a single JSON
|
||
object selecting your strategic directive for the next campaign day:
|
||
{
|
||
"token": "<SUBGOAL_TOKEN>",
|
||
"target": "<settlement or faction or null>",
|
||
"quantity": <int or null>,
|
||
"priority": <float 0.0-2.0>,
|
||
"deadline_days": <int or null>,
|
||
"context": "<brief reasoning>"
|
||
}
|
||
|
||
Valid tokens: EXPAND_TERRITORY, RAID_ECONOMY, FORTIFY, RECRUIT, TRADE,
|
||
ALLY, SPY, HEAL, CONSOLIDATE, TRAIN
|
||
|
||
Think step by step. Respond with JSON only — no prose outside the object.
|
||
"""
|
||
|
||
|
||
class KingAgent:
|
||
"""Sovereign campaign agent.
|
||
|
||
Parameters
|
||
----------
|
||
gabs_client:
|
||
Connected (or gracefully-degraded) GABS client.
|
||
ledger:
|
||
Asset ledger for persistence. Initialized automatically if not provided.
|
||
ollama_url:
|
||
Base URL of the Ollama inference server.
|
||
model:
|
||
Ollama model tag. Default: qwen3:32b.
|
||
tick_interval:
|
||
Real-time seconds between campaign ticks.
|
||
subgoal_queue:
|
||
asyncio.Queue where KingSubgoal messages are placed for vassals.
|
||
Created automatically if not provided.
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
gabs_client: GABSClient,
|
||
ledger: Ledger | None = None,
|
||
ollama_url: str = "http://localhost:11434",
|
||
model: str = _KING_MODEL,
|
||
tick_interval: float = _KING_TICK_SECONDS,
|
||
subgoal_queue: asyncio.Queue[SubgoalMessage] | None = None,
|
||
) -> None:
|
||
self._gabs = gabs_client
|
||
self._ledger = ledger or Ledger()
|
||
self._ollama_url = ollama_url
|
||
self._model = model
|
||
self._tick_interval = tick_interval
|
||
self._subgoal_queue: asyncio.Queue[SubgoalMessage] = subgoal_queue or asyncio.Queue()
|
||
self._tick = 0
|
||
self._running = False
|
||
|
||
@property
|
||
def subgoal_queue(self) -> asyncio.Queue[SubgoalMessage]:
|
||
return self._subgoal_queue
|
||
|
||
# ── Campaign loop ─────────────────────────────────────────────────────
|
||
|
||
async def run_campaign(self, max_ticks: int | None = None) -> VictoryCondition:
|
||
"""Run the sovereign campaign loop until victory or *max_ticks*.
|
||
|
||
Returns the final :class:`VictoryCondition` snapshot.
|
||
"""
|
||
self._ledger.initialize()
|
||
self._running = True
|
||
victory = VictoryCondition()
|
||
logger.info("King campaign started. Model: %s. Max ticks: %s", self._model, max_ticks)
|
||
|
||
try:
|
||
while self._running:
|
||
if max_ticks is not None and self._tick >= max_ticks:
|
||
logger.info("Max ticks (%d) reached — stopping campaign.", max_ticks)
|
||
break
|
||
|
||
state = await self._fetch_state()
|
||
victory = self._evaluate_victory(state)
|
||
|
||
if victory.achieved:
|
||
logger.info(
|
||
"SOVEREIGN VICTORY — King of Calradia! "
|
||
"Territory: %.1f%%, tick: %d",
|
||
victory.territory_control_pct,
|
||
self._tick,
|
||
)
|
||
break
|
||
|
||
subgoal = await self._decide(state)
|
||
await self._broadcast_subgoal(subgoal)
|
||
self._ledger.log_tick(
|
||
tick=self._tick,
|
||
campaign_day=state.get("campaign_day", self._tick),
|
||
subgoal=subgoal.token,
|
||
)
|
||
|
||
self._tick += 1
|
||
await asyncio.sleep(self._tick_interval)
|
||
|
||
except asyncio.CancelledError:
|
||
logger.info("King campaign task cancelled at tick %d", self._tick)
|
||
raise
|
||
finally:
|
||
self._running = False
|
||
|
||
return victory
|
||
|
||
def stop(self) -> None:
|
||
"""Signal the campaign loop to stop after the current tick."""
|
||
self._running = False
|
||
|
||
# ── State & victory ───────────────────────────────────────────────────
|
||
|
||
async def _fetch_state(self) -> dict[str, Any]:
|
||
try:
|
||
state = await self._gabs.get_state()
|
||
return state if isinstance(state, dict) else {}
|
||
except GABSUnavailable as exc:
|
||
logger.warning("GABS unavailable at tick %d: %s — using empty state", self._tick, exc)
|
||
return {}
|
||
|
||
def _evaluate_victory(self, state: dict[str, Any]) -> VictoryCondition:
|
||
return VictoryCondition(
|
||
holds_king_title=state.get("player_title") == "King",
|
||
territory_control_pct=float(state.get("territory_control_pct", 0.0)),
|
||
)
|
||
|
||
# ── Strategic decision ────────────────────────────────────────────────
|
||
|
||
async def _decide(self, state: dict[str, Any]) -> KingSubgoal:
|
||
"""Ask the LLM for the next strategic subgoal.
|
||
|
||
Falls back to RECRUIT (safe default) if the LLM is unavailable.
|
||
"""
|
||
try:
|
||
subgoal = await asyncio.to_thread(self._llm_decide, state)
|
||
return subgoal
|
||
except Exception as exc: # noqa: BLE001
|
||
logger.warning(
|
||
"King LLM decision failed at tick %d: %s — defaulting to RECRUIT", self._tick, exc
|
||
)
|
||
return KingSubgoal(token="RECRUIT", context="LLM unavailable — safe default")
|
||
|
||
def _llm_decide(self, state: dict[str, Any]) -> KingSubgoal:
|
||
"""Synchronous Ollama call (runs in a thread via asyncio.to_thread)."""
|
||
import urllib.request
|
||
|
||
prompt_state = json.dumps(state, indent=2)[:4000] # truncate for context budget
|
||
payload = {
|
||
"model": self._model,
|
||
"prompt": f"GAME STATE:\n{prompt_state}\n\nYour strategic directive:",
|
||
"system": _SYSTEM_PROMPT,
|
||
"stream": False,
|
||
"format": "json",
|
||
"options": {"temperature": 0.1},
|
||
}
|
||
data = json.dumps(payload).encode()
|
||
req = urllib.request.Request(
|
||
f"{self._ollama_url}/api/generate",
|
||
data=data,
|
||
headers={"Content-Type": "application/json"},
|
||
)
|
||
with urllib.request.urlopen(req, timeout=30) as resp: # noqa: S310
|
||
result = json.loads(resp.read())
|
||
|
||
raw = result.get("response", "{}")
|
||
parsed = json.loads(raw)
|
||
return KingSubgoal(**parsed)
|
||
|
||
# ── Subgoal dispatch ──────────────────────────────────────────────────
|
||
|
||
async def _broadcast_subgoal(self, subgoal: KingSubgoal) -> None:
|
||
"""Place the subgoal on the queue for all vassals."""
|
||
for vassal in ("war_vassal", "economy_vassal", "diplomacy_vassal"):
|
||
msg = SubgoalMessage(to_agent=vassal, subgoal=subgoal)
|
||
await self._subgoal_queue.put(msg)
|
||
logger.debug(
|
||
"Tick %d: subgoal %s → %s (priority=%.1f)",
|
||
self._tick,
|
||
subgoal.token,
|
||
subgoal.target or "—",
|
||
subgoal.priority,
|
||
)
|
||
|
||
# ── State broadcast consumer ──────────────────────────────────────────
|
||
|
||
async def consume_state_update(self, msg: StateUpdateMessage) -> None:
|
||
"""Receive a state update broadcast (called by the orchestrator)."""
|
||
logger.debug("King received state update tick=%d", msg.tick)
|