fix: resolve pre-existing lint errors blocking PRs
- Replace asyncio.TimeoutError with builtin TimeoutError (UP041) in bannerlord/agents/companions.py, gabs_client.py, test_gabs_client.py - Remove unused asynccontextmanager import (F401) from gabs_client.py - Remove quoted return type annotation (UP037) from gabs_client.py - Migrate typing.Iterator to collections.abc.Iterator (UP035) in ledger.py - Migrate typing.List to list (UP035/UP006) in perception_cache.py - Remove unused imports in test_agents.py and test_sovereignty_metrics.py - Move module-level import above pytestmark to fix E402 in test_sovereignty_metrics.py - Run tox -e format to auto-fix remaining I001/formatting issues Fixes #1176 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -56,7 +56,7 @@ class BaseCompanion:
|
||||
while self._running:
|
||||
try:
|
||||
task = await asyncio.wait_for(self._task_queue.get(), timeout=1.0)
|
||||
except asyncio.TimeoutError:
|
||||
except TimeoutError:
|
||||
continue
|
||||
|
||||
if task.to_agent != self.name:
|
||||
@@ -82,9 +82,7 @@ class BaseCompanion:
|
||||
"""Dispatch *task.primitive* to its handler method."""
|
||||
handler = getattr(self, f"_prim_{task.primitive}", None)
|
||||
if handler is None:
|
||||
logger.warning(
|
||||
"%s: unknown primitive %r — skipping", self.name, task.primitive
|
||||
)
|
||||
logger.warning("%s: unknown primitive %r — skipping", self.name, task.primitive)
|
||||
return ResultMessage(
|
||||
from_agent=self.name,
|
||||
to_agent=task.from_agent,
|
||||
@@ -128,7 +126,14 @@ class LogisticsCompanion(BaseCompanion):
|
||||
|
||||
name = "logistics_companion"
|
||||
primitives = frozenset(
|
||||
{"recruit_troop", "buy_supplies", "rest_party", "sell_prisoners", "upgrade_troops", "build_project"}
|
||||
{
|
||||
"recruit_troop",
|
||||
"buy_supplies",
|
||||
"rest_party",
|
||||
"sell_prisoners",
|
||||
"upgrade_troops",
|
||||
"build_project",
|
||||
}
|
||||
)
|
||||
|
||||
async def _prim_recruit_troop(self, args: dict[str, Any]) -> dict[str, Any]:
|
||||
@@ -230,9 +235,7 @@ class ScoutCompanion(BaseCompanion):
|
||||
"""
|
||||
|
||||
name = "scout_companion"
|
||||
primitives = frozenset(
|
||||
{"track_lord", "assess_garrison", "map_patrol_routes", "report_intel"}
|
||||
)
|
||||
primitives = frozenset({"track_lord", "assess_garrison", "map_patrol_routes", "report_intel"})
|
||||
|
||||
async def _prim_track_lord(self, args: dict[str, Any]) -> dict[str, Any]:
|
||||
lord_name = args.get("name", "")
|
||||
|
||||
@@ -18,7 +18,6 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -81,7 +80,7 @@ class GABSClient:
|
||||
)
|
||||
self._connected = True
|
||||
logger.info("GABS connected at %s:%s", self._host, self._port)
|
||||
except (OSError, asyncio.TimeoutError) as exc:
|
||||
except (TimeoutError, OSError) as exc:
|
||||
logger.warning(
|
||||
"GABS unavailable at %s:%s — Bannerlord agent will degrade: %s",
|
||||
self._host,
|
||||
@@ -100,7 +99,7 @@ class GABSClient:
|
||||
self._connected = False
|
||||
logger.debug("GABS connection closed")
|
||||
|
||||
async def __aenter__(self) -> "GABSClient":
|
||||
async def __aenter__(self) -> GABSClient:
|
||||
await self.connect()
|
||||
return self
|
||||
|
||||
@@ -139,10 +138,8 @@ class GABSClient:
|
||||
self._writer.write(payload.encode())
|
||||
await asyncio.wait_for(self._writer.drain(), timeout=self._timeout)
|
||||
|
||||
raw = await asyncio.wait_for(
|
||||
self._reader.readline(), timeout=self._timeout
|
||||
)
|
||||
except (OSError, asyncio.TimeoutError) as exc:
|
||||
raw = await asyncio.wait_for(self._reader.readline(), timeout=self._timeout)
|
||||
except (TimeoutError, OSError) as exc:
|
||||
self._connected = False
|
||||
raise GABSUnavailable(f"GABS connection lost during {method!r}: {exc}") from exc
|
||||
|
||||
|
||||
@@ -15,10 +15,10 @@ from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sqlite3
|
||||
from collections.abc import Iterator
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Iterator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -100,9 +100,7 @@ class Ledger:
|
||||
if amount < 0:
|
||||
raise ValueError("Use withdraw() for negative amounts")
|
||||
with self._conn() as conn:
|
||||
conn.execute(
|
||||
"UPDATE treasury SET balance = balance + ? WHERE id = 1", (amount,)
|
||||
)
|
||||
conn.execute("UPDATE treasury SET balance = balance + ? WHERE id = 1", (amount,))
|
||||
bal = self.balance()
|
||||
logger.info("Treasury +%.0f denars (%s) → balance %.0f", amount, reason, bal)
|
||||
return bal
|
||||
@@ -117,9 +115,7 @@ class Ledger:
|
||||
f"Cannot withdraw {amount:.0f} denars — treasury balance is only {bal:.0f}"
|
||||
)
|
||||
with self._conn() as conn:
|
||||
conn.execute(
|
||||
"UPDATE treasury SET balance = balance - ? WHERE id = 1", (amount,)
|
||||
)
|
||||
conn.execute("UPDATE treasury SET balance = balance - ? WHERE id = 1", (amount,))
|
||||
new_bal = self.balance()
|
||||
logger.info("Treasury -%.0f denars (%s) → balance %.0f", amount, reason, new_bal)
|
||||
return new_bal
|
||||
|
||||
@@ -44,25 +44,16 @@ class KingSubgoal(BaseModel):
|
||||
"""
|
||||
|
||||
token: str = Field(..., description="One of SUBGOAL_TOKENS")
|
||||
target: str | None = Field(
|
||||
None, description="Named target (settlement, lord, faction)"
|
||||
)
|
||||
target: str | None = Field(None, description="Named target (settlement, lord, faction)")
|
||||
quantity: int | None = Field(None, description="For RECRUIT, TRADE tokens", ge=1)
|
||||
priority: float = Field(
|
||||
1.0, ge=0.0, le=2.0, description="Scales vassal reward weighting"
|
||||
)
|
||||
deadline_days: int | None = Field(
|
||||
None, ge=1, description="Campaign-map days to complete"
|
||||
)
|
||||
context: str | None = Field(
|
||||
None, description="Free-text hint; not parsed by workers"
|
||||
)
|
||||
priority: float = Field(1.0, ge=0.0, le=2.0, description="Scales vassal reward weighting")
|
||||
deadline_days: int | None = Field(None, ge=1, description="Campaign-map days to complete")
|
||||
context: str | None = Field(None, description="Free-text hint; not parsed by workers")
|
||||
|
||||
def model_post_init(self, __context: Any) -> None: # noqa: ANN401
|
||||
if self.token not in SUBGOAL_TOKENS:
|
||||
raise ValueError(
|
||||
f"Unknown subgoal token {self.token!r}. "
|
||||
f"Must be one of: {sorted(SUBGOAL_TOKENS)}"
|
||||
f"Unknown subgoal token {self.token!r}. Must be one of: {sorted(SUBGOAL_TOKENS)}"
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ from .database import Base # Assuming a shared Base in models/database.py
|
||||
|
||||
class TaskState(StrEnum):
|
||||
"""Enumeration of possible task lifecycle states."""
|
||||
|
||||
LATER = "LATER"
|
||||
NEXT = "NEXT"
|
||||
NOW = "NOW"
|
||||
@@ -18,6 +19,7 @@ class TaskState(StrEnum):
|
||||
|
||||
class TaskCertainty(StrEnum):
|
||||
"""Enumeration of task time-certainty levels."""
|
||||
|
||||
FUZZY = "FUZZY" # An intention without a time
|
||||
SOFT = "SOFT" # A flexible task with a time
|
||||
HARD = "HARD" # A fixed meeting/appointment
|
||||
@@ -25,6 +27,7 @@ class TaskCertainty(StrEnum):
|
||||
|
||||
class Task(Base):
|
||||
"""SQLAlchemy model representing a CALM task."""
|
||||
|
||||
__tablename__ = "tasks"
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
@@ -56,6 +59,7 @@ class Task(Base):
|
||||
|
||||
class JournalEntry(Base):
|
||||
"""SQLAlchemy model for a daily journal entry with MITs and reflections."""
|
||||
|
||||
__tablename__ = "journal_entries"
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
|
||||
@@ -15,6 +15,7 @@ router = APIRouter(prefix="/discord", tags=["discord"])
|
||||
|
||||
class TokenPayload(BaseModel):
|
||||
"""Request payload containing a Discord bot token."""
|
||||
|
||||
token: str
|
||||
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ router = APIRouter(prefix="/telegram", tags=["telegram"])
|
||||
|
||||
class TokenPayload(BaseModel):
|
||||
"""Request payload containing a Telegram bot token."""
|
||||
|
||||
token: str
|
||||
|
||||
|
||||
|
||||
@@ -52,6 +52,7 @@ def _get_db() -> Generator[sqlite3.Connection, None, None]:
|
||||
|
||||
class _EnumLike:
|
||||
"""Lightweight enum-like wrapper for string values used in templates."""
|
||||
|
||||
def __init__(self, v: str):
|
||||
self.value = v
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ TRACKED_AGENTS = frozenset({"hermes", "kimi", "manus", "claude", "gemini"})
|
||||
|
||||
class PeriodType(StrEnum):
|
||||
"""Scorecard reporting period type."""
|
||||
|
||||
daily = "daily"
|
||||
weekly = "weekly"
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ MAX_MESSAGES: int = 500
|
||||
@dataclass
|
||||
class Message:
|
||||
"""A single chat message with role, content, timestamp, and source."""
|
||||
|
||||
role: str # "user" | "agent" | "error"
|
||||
content: str
|
||||
timestamp: str
|
||||
|
||||
@@ -22,6 +22,7 @@ logger = logging.getLogger(__name__)
|
||||
@dataclass
|
||||
class Notification:
|
||||
"""A push notification with title, message, category, and read status."""
|
||||
|
||||
id: int
|
||||
title: str
|
||||
message: str
|
||||
|
||||
@@ -48,21 +48,67 @@ class ModelTier(StrEnum):
|
||||
# T1: single-action navigation and binary-choice words
|
||||
_T1_KEYWORDS = frozenset(
|
||||
{
|
||||
"go", "move", "walk", "run", "north", "south", "east", "west",
|
||||
"up", "down", "left", "right", "yes", "no", "ok", "okay",
|
||||
"open", "close", "take", "drop", "look", "pick", "use",
|
||||
"wait", "rest", "save", "attack", "flee", "jump", "crouch",
|
||||
"go",
|
||||
"move",
|
||||
"walk",
|
||||
"run",
|
||||
"north",
|
||||
"south",
|
||||
"east",
|
||||
"west",
|
||||
"up",
|
||||
"down",
|
||||
"left",
|
||||
"right",
|
||||
"yes",
|
||||
"no",
|
||||
"ok",
|
||||
"okay",
|
||||
"open",
|
||||
"close",
|
||||
"take",
|
||||
"drop",
|
||||
"look",
|
||||
"pick",
|
||||
"use",
|
||||
"wait",
|
||||
"rest",
|
||||
"save",
|
||||
"attack",
|
||||
"flee",
|
||||
"jump",
|
||||
"crouch",
|
||||
}
|
||||
)
|
||||
|
||||
# T3: planning, optimisation, or recovery signals
|
||||
_T3_KEYWORDS = frozenset(
|
||||
{
|
||||
"plan", "strategy", "optimize", "optimise", "quest", "stuck",
|
||||
"recover", "multi-step", "long-term", "negotiate", "persuade",
|
||||
"faction", "reputation", "best", "optimal", "recommend",
|
||||
"analyze", "analyse", "evaluate", "decide", "complex", "how do i",
|
||||
"what should i do", "help me figure", "what is the best",
|
||||
"plan",
|
||||
"strategy",
|
||||
"optimize",
|
||||
"optimise",
|
||||
"quest",
|
||||
"stuck",
|
||||
"recover",
|
||||
"multi-step",
|
||||
"long-term",
|
||||
"negotiate",
|
||||
"persuade",
|
||||
"faction",
|
||||
"reputation",
|
||||
"best",
|
||||
"optimal",
|
||||
"recommend",
|
||||
"analyze",
|
||||
"analyse",
|
||||
"evaluate",
|
||||
"decide",
|
||||
"complex",
|
||||
"how do i",
|
||||
"what should i do",
|
||||
"help me figure",
|
||||
"what is the best",
|
||||
}
|
||||
)
|
||||
|
||||
@@ -164,9 +210,7 @@ def build_prompt(
|
||||
|
||||
active_quests = state.get("active_quests", [])
|
||||
if active_quests:
|
||||
names = [
|
||||
q if isinstance(q, str) else q.get("name", str(q)) for q in active_quests[:5]
|
||||
]
|
||||
names = [q if isinstance(q, str) else q.get("name", str(q)) for q in active_quests[:5]]
|
||||
context_lines.append(f"Active quests: {', '.join(names)}")
|
||||
|
||||
if state.get("stuck"):
|
||||
@@ -187,8 +231,7 @@ def build_prompt(
|
||||
|
||||
system_content = (
|
||||
"You are Timmy, an AI game agent. "
|
||||
"Respond with valid game commands only.\n\n"
|
||||
+ "\n".join(context_lines)
|
||||
"Respond with valid game commands only.\n\n" + "\n".join(context_lines)
|
||||
)
|
||||
|
||||
return [
|
||||
|
||||
@@ -25,6 +25,7 @@ logger = logging.getLogger(__name__)
|
||||
@dataclass
|
||||
class Intent:
|
||||
"""A classified user intent with confidence score and extracted entities."""
|
||||
|
||||
name: str
|
||||
confidence: float # 0.0 to 1.0
|
||||
entities: dict
|
||||
|
||||
@@ -18,12 +18,14 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class TxType(StrEnum):
|
||||
"""Lightning transaction direction type."""
|
||||
|
||||
incoming = "incoming"
|
||||
outgoing = "outgoing"
|
||||
|
||||
|
||||
class TxStatus(StrEnum):
|
||||
"""Lightning transaction settlement status."""
|
||||
|
||||
pending = "pending"
|
||||
settled = "settled"
|
||||
failed = "failed"
|
||||
|
||||
@@ -37,6 +37,7 @@ _EXPIRY_DAYS = 7
|
||||
@dataclass
|
||||
class ApprovalItem:
|
||||
"""A proposed autonomous action requiring owner approval."""
|
||||
|
||||
id: str
|
||||
title: str
|
||||
description: str
|
||||
|
||||
@@ -47,6 +47,7 @@ class ApprovalItem:
|
||||
@dataclass
|
||||
class Briefing:
|
||||
"""A generated morning briefing summarizing recent activity and pending approvals."""
|
||||
|
||||
generated_at: datetime
|
||||
summary: str # 150-300 words
|
||||
approval_items: list[ApprovalItem] = field(default_factory=list)
|
||||
|
||||
@@ -33,9 +33,11 @@ def get_llm_client() -> Any:
|
||||
# model.
|
||||
class MockLLMClient:
|
||||
"""Stub LLM client for testing without a real language model."""
|
||||
|
||||
async def completion(self, prompt: str, max_tokens: int) -> Any:
|
||||
class MockCompletion:
|
||||
"""Stub completion response returned by MockLLMClient."""
|
||||
|
||||
def __init__(self, text: str) -> None:
|
||||
self.text = text
|
||||
|
||||
|
||||
@@ -145,7 +145,9 @@ class SovereigntyMetricsStore:
|
||||
|
||||
# ── public API ────────────────────────────────────────────────────────────
|
||||
|
||||
def record(self, event_type: str, metadata: dict[str, Any] | None = None, *, session_id: str = "") -> None:
|
||||
def record(
|
||||
self, event_type: str, metadata: dict[str, Any] | None = None, *, session_id: str = ""
|
||||
) -> None:
|
||||
"""Record a sovereignty event.
|
||||
|
||||
Parameters
|
||||
@@ -326,8 +328,7 @@ class SovereigntyMetricsStore:
|
||||
"""Return a real-time metrics snapshot suitable for dashboard widgets."""
|
||||
return {
|
||||
"sovereignty": {
|
||||
layer: self.get_sovereignty_pct(layer, time_window=3600)
|
||||
for layer in _LAYER_EVENTS
|
||||
layer: self.get_sovereignty_pct(layer, time_window=3600) for layer in _LAYER_EVENTS
|
||||
},
|
||||
"cost_per_hour": self.get_cost_per_hour(),
|
||||
"skills_crystallized": self.get_skills_crystallized(),
|
||||
@@ -350,7 +351,9 @@ def get_metrics_store() -> SovereigntyMetricsStore:
|
||||
# ── Convenience helpers ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def record(event_type: str, metadata: dict[str, Any] | None = None, *, session_id: str = "") -> None:
|
||||
def record(
|
||||
event_type: str, metadata: dict[str, Any] | None = None, *, session_id: str = ""
|
||||
) -> None:
|
||||
"""Module-level shortcut: ``metrics.record("perception_cache_hit")``."""
|
||||
get_metrics_store().record(event_type, metadata=metadata, session_id=session_id)
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ from __future__ import annotations
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any, List
|
||||
from typing import Any
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
@@ -25,7 +25,7 @@ class CacheResult:
|
||||
class PerceptionCache:
|
||||
def __init__(self, templates_path: Path | str = "data/templates.json"):
|
||||
self.templates_path = Path(templates_path)
|
||||
self.templates: List[Template] = []
|
||||
self.templates: list[Template] = []
|
||||
self.load()
|
||||
|
||||
def match(self, screenshot: np.ndarray) -> CacheResult:
|
||||
@@ -43,12 +43,14 @@ class PerceptionCache:
|
||||
best_match_confidence = max_val
|
||||
best_match_name = template.name
|
||||
|
||||
if best_match_confidence > 0.85: # TODO: Make this configurable per template
|
||||
return CacheResult(confidence=best_match_confidence, state={"template_name": best_match_name})
|
||||
if best_match_confidence > 0.85: # TODO: Make this configurable per template
|
||||
return CacheResult(
|
||||
confidence=best_match_confidence, state={"template_name": best_match_name}
|
||||
)
|
||||
else:
|
||||
return CacheResult(confidence=best_match_confidence, state=None)
|
||||
|
||||
def add(self, templates: List[Template]):
|
||||
def add(self, templates: list[Template]):
|
||||
self.templates.extend(templates)
|
||||
|
||||
def persist(self):
|
||||
@@ -56,7 +58,9 @@ class PerceptionCache:
|
||||
# Note: This is a simplified persistence mechanism.
|
||||
# A more robust solution would store templates as images and metadata in JSON.
|
||||
with self.templates_path.open("w") as f:
|
||||
json.dump([{"name": t.name, "threshold": t.threshold} for t in self.templates], f, indent=2)
|
||||
json.dump(
|
||||
[{"name": t.name, "threshold": t.threshold} for t in self.templates], f, indent=2
|
||||
)
|
||||
|
||||
def load(self):
|
||||
if self.templates_path.exists():
|
||||
@@ -64,10 +68,13 @@ class PerceptionCache:
|
||||
templates_data = json.load(f)
|
||||
# This is a simplified loading mechanism and assumes template images are stored elsewhere.
|
||||
# For now, we are not loading the actual images.
|
||||
self.templates = [Template(name=t["name"], image=np.array([]), threshold=t["threshold"]) for t in templates_data]
|
||||
self.templates = [
|
||||
Template(name=t["name"], image=np.array([]), threshold=t["threshold"])
|
||||
for t in templates_data
|
||||
]
|
||||
|
||||
|
||||
def crystallize_perception(screenshot: np.ndarray, vlm_response: Any) -> List[Template]:
|
||||
def crystallize_perception(screenshot: np.ndarray, vlm_response: Any) -> list[Template]:
|
||||
"""
|
||||
Extracts reusable patterns from VLM output and generates OpenCV templates.
|
||||
This is a placeholder and needs to be implemented based on the actual VLM response format.
|
||||
|
||||
@@ -26,17 +26,20 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class ChatRequest(BaseModel):
|
||||
"""Incoming chat request payload for the Timmy Serve API."""
|
||||
|
||||
message: str
|
||||
stream: bool = False
|
||||
|
||||
|
||||
class ChatResponse(BaseModel):
|
||||
"""Chat response payload returned by the Timmy Serve API."""
|
||||
|
||||
response: str
|
||||
|
||||
|
||||
class StatusResponse(BaseModel):
|
||||
"""Service status response with backend information."""
|
||||
|
||||
status: str
|
||||
backend: str
|
||||
|
||||
|
||||
@@ -191,9 +191,7 @@ def daily_run(
|
||||
if result.stdout.strip():
|
||||
console.print(result.stdout.strip())
|
||||
if result.returncode != 0:
|
||||
console.print(
|
||||
f"[red] ✗ {name} exited with code {result.returncode}[/red]"
|
||||
)
|
||||
console.print(f"[red] ✗ {name} exited with code {result.returncode}[/red]")
|
||||
if result.stderr.strip():
|
||||
console.print(f"[red]{result.stderr.strip()}[/red]")
|
||||
else:
|
||||
@@ -229,9 +227,7 @@ def log_run(
|
||||
|
||||
logbook_path = Path(".loop/logbook.jsonl")
|
||||
logbook_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
entry = json.dumps(
|
||||
{"timestamp": timestamp, "category": category, "message": message}
|
||||
)
|
||||
entry = json.dumps({"timestamp": timestamp, "category": category, "message": message})
|
||||
with open(logbook_path, "a", encoding="utf-8") as f:
|
||||
f.write(entry + "\n")
|
||||
console.print(f"[green]✓[/green] Entry logged to {logbook_path}")
|
||||
@@ -297,9 +293,7 @@ def inbox(
|
||||
else:
|
||||
pr_table.add_row("—", "[dim]No open PRs[/dim]", "—", "—")
|
||||
except Exception as exc:
|
||||
pr_table.add_row(
|
||||
"—", f"[red]Error fetching PRs: {exc}[/red]", "—", "—"
|
||||
)
|
||||
pr_table.add_row("—", f"[red]Error fetching PRs: {exc}[/red]", "—", "—")
|
||||
console.print(pr_table)
|
||||
console.print()
|
||||
|
||||
@@ -321,7 +315,11 @@ def inbox(
|
||||
"—",
|
||||
)
|
||||
issue_type = next(
|
||||
(lb for lb in labels if lb.lower() in ("bug", "feature", "refactor", "enhancement")),
|
||||
(
|
||||
lb
|
||||
for lb in labels
|
||||
if lb.lower() in ("bug", "feature", "refactor", "enhancement")
|
||||
),
|
||||
"—",
|
||||
)
|
||||
issue_table.add_row(
|
||||
@@ -333,9 +331,7 @@ def inbox(
|
||||
else:
|
||||
issue_table.add_row("—", "[dim]No open issues[/dim]", "—", "—")
|
||||
except Exception as exc:
|
||||
issue_table.add_row(
|
||||
"—", f"[red]Error fetching issues: {exc}[/red]", "—", "—"
|
||||
)
|
||||
issue_table.add_row("—", f"[red]Error fetching issues: {exc}[/red]", "—", "—")
|
||||
console.print(issue_table)
|
||||
console.print()
|
||||
|
||||
|
||||
@@ -3,8 +3,6 @@
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from bannerlord.agents.companions import (
|
||||
CaravanCompanion,
|
||||
LogisticsCompanion,
|
||||
@@ -16,13 +14,9 @@ from bannerlord.gabs_client import GABSClient, GABSUnavailable
|
||||
from bannerlord.ledger import Ledger
|
||||
from bannerlord.models import (
|
||||
KingSubgoal,
|
||||
ResultMessage,
|
||||
SubgoalMessage,
|
||||
TaskMessage,
|
||||
VictoryCondition,
|
||||
)
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
"""Unit tests for bannerlord.gabs_client — TCP JSON-RPC client."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
@@ -8,7 +7,6 @@ import pytest
|
||||
|
||||
from bannerlord.gabs_client import GABSClient, GABSError, GABSUnavailable
|
||||
|
||||
|
||||
# ── Connection ────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@@ -42,7 +40,7 @@ class TestGABSClientConnection:
|
||||
async def test_connect_timeout_degrades_gracefully(self):
|
||||
with patch(
|
||||
"bannerlord.gabs_client.asyncio.open_connection",
|
||||
side_effect=asyncio.TimeoutError(),
|
||||
side_effect=TimeoutError(),
|
||||
):
|
||||
client = GABSClient()
|
||||
await client.connect()
|
||||
|
||||
@@ -7,19 +7,18 @@ from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
from timmy.sovereignty.metrics import (
|
||||
ALL_EVENT_TYPES,
|
||||
SovereigntyMetricsStore,
|
||||
emit_sovereignty_event,
|
||||
get_cost_per_hour,
|
||||
get_metrics_store,
|
||||
get_skills_crystallized,
|
||||
get_sovereignty_pct,
|
||||
record,
|
||||
)
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def store(tmp_path):
|
||||
@@ -130,7 +129,6 @@ class TestGetSovereigntyPct:
|
||||
def test_time_window_filters_old_events(self, store, tmp_path):
|
||||
"""Events outside the time window are excluded."""
|
||||
# Insert an event with a very old timestamp directly
|
||||
import json
|
||||
import sqlite3
|
||||
from contextlib import closing
|
||||
|
||||
@@ -230,24 +228,27 @@ class TestGetSnapshot:
|
||||
|
||||
class TestModuleLevelFunctions:
|
||||
def test_record_and_get_sovereignty_pct(self, tmp_path):
|
||||
with patch("timmy.sovereignty.metrics._store", None), patch(
|
||||
"timmy.sovereignty.metrics.DB_PATH", tmp_path / "fn_test.db"
|
||||
with (
|
||||
patch("timmy.sovereignty.metrics._store", None),
|
||||
patch("timmy.sovereignty.metrics.DB_PATH", tmp_path / "fn_test.db"),
|
||||
):
|
||||
record("decision_rule_hit")
|
||||
pct = get_sovereignty_pct("decision")
|
||||
assert pct == 100.0
|
||||
|
||||
def test_get_cost_per_hour_module_fn(self, tmp_path):
|
||||
with patch("timmy.sovereignty.metrics._store", None), patch(
|
||||
"timmy.sovereignty.metrics.DB_PATH", tmp_path / "fn_test2.db"
|
||||
with (
|
||||
patch("timmy.sovereignty.metrics._store", None),
|
||||
patch("timmy.sovereignty.metrics.DB_PATH", tmp_path / "fn_test2.db"),
|
||||
):
|
||||
record("api_cost", {"usd": 0.5})
|
||||
cost = get_cost_per_hour()
|
||||
assert cost > 0.0
|
||||
|
||||
def test_get_skills_crystallized_module_fn(self, tmp_path):
|
||||
with patch("timmy.sovereignty.metrics._store", None), patch(
|
||||
"timmy.sovereignty.metrics.DB_PATH", tmp_path / "fn_test3.db"
|
||||
with (
|
||||
patch("timmy.sovereignty.metrics._store", None),
|
||||
patch("timmy.sovereignty.metrics.DB_PATH", tmp_path / "fn_test3.db"),
|
||||
):
|
||||
record("skill_crystallized")
|
||||
count = get_skills_crystallized()
|
||||
|
||||
Reference in New Issue
Block a user