[kimi] Break up _dispatch_via_gitea() into helper functions (#1136) (#1183)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled

This commit was merged in pull request #1183.
This commit is contained in:
2026-03-23 21:40:17 +00:00
parent 74bf0606a9
commit 7aa48b4e22
26 changed files with 195 additions and 115 deletions

View File

@@ -3,8 +3,6 @@
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from bannerlord.agents.companions import (
CaravanCompanion,
LogisticsCompanion,
@@ -16,13 +14,9 @@ from bannerlord.gabs_client import GABSClient, GABSUnavailable
from bannerlord.ledger import Ledger
from bannerlord.models import (
KingSubgoal,
ResultMessage,
SubgoalMessage,
TaskMessage,
VictoryCondition,
)
# ── Helpers ───────────────────────────────────────────────────────────────────

View File

@@ -1,6 +1,5 @@
"""Unit tests for bannerlord.gabs_client — TCP JSON-RPC client."""
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock, patch
@@ -8,7 +7,6 @@ import pytest
from bannerlord.gabs_client import GABSClient, GABSError, GABSUnavailable
# ── Connection ────────────────────────────────────────────────────────────────
@@ -42,7 +40,7 @@ class TestGABSClientConnection:
async def test_connect_timeout_degrades_gracefully(self):
with patch(
"bannerlord.gabs_client.asyncio.open_connection",
side_effect=asyncio.TimeoutError(),
side_effect=TimeoutError(),
):
client = GABSClient()
await client.connect()

View File

@@ -561,9 +561,8 @@ class TestModelPostInit:
env = {k: v for k, v in os.environ.items() if k != "GITEA_TOKEN"}
with patch.dict(os.environ, env, clear=True):
s = Settings()
# Override repo_root so post_init finds our temp file
original_compute = s._compute_repo_root
# Override repo_root so post_init finds our temp file
def _fake_root():
return str(tmp_path)
@@ -704,7 +703,6 @@ class TestGetEffectiveOllamaModel:
from config import get_effective_ollama_model, settings
# Make primary unavailable, but one fallback available
primary = settings.ollama_model
fallback_target = settings.fallback_models[0]
def side_effect(model):
@@ -804,9 +802,7 @@ class TestValidateStartup:
with patch.object(config.settings, "timmy_env", "production"):
with patch.object(config.settings, "l402_hmac_secret", "secret1"):
with patch.object(config.settings, "l402_macaroon_secret", "secret2"):
with patch.object(
config.settings, "cors_origins", ["http://localhost:3000"]
):
with patch.object(config.settings, "cors_origins", ["http://localhost:3000"]):
config.validate_startup(force=True)
assert config._startup_validated is True

View File

@@ -7,19 +7,18 @@ from unittest.mock import AsyncMock, patch
import pytest
pytestmark = pytest.mark.unit
from timmy.sovereignty.metrics import (
ALL_EVENT_TYPES,
SovereigntyMetricsStore,
emit_sovereignty_event,
get_cost_per_hour,
get_metrics_store,
get_skills_crystallized,
get_sovereignty_pct,
record,
)
pytestmark = pytest.mark.unit
@pytest.fixture
def store(tmp_path):
@@ -130,7 +129,6 @@ class TestGetSovereigntyPct:
def test_time_window_filters_old_events(self, store, tmp_path):
"""Events outside the time window are excluded."""
# Insert an event with a very old timestamp directly
import json
import sqlite3
from contextlib import closing
@@ -230,24 +228,27 @@ class TestGetSnapshot:
class TestModuleLevelFunctions:
def test_record_and_get_sovereignty_pct(self, tmp_path):
with patch("timmy.sovereignty.metrics._store", None), patch(
"timmy.sovereignty.metrics.DB_PATH", tmp_path / "fn_test.db"
with (
patch("timmy.sovereignty.metrics._store", None),
patch("timmy.sovereignty.metrics.DB_PATH", tmp_path / "fn_test.db"),
):
record("decision_rule_hit")
pct = get_sovereignty_pct("decision")
assert pct == 100.0
def test_get_cost_per_hour_module_fn(self, tmp_path):
with patch("timmy.sovereignty.metrics._store", None), patch(
"timmy.sovereignty.metrics.DB_PATH", tmp_path / "fn_test2.db"
with (
patch("timmy.sovereignty.metrics._store", None),
patch("timmy.sovereignty.metrics.DB_PATH", tmp_path / "fn_test2.db"),
):
record("api_cost", {"usd": 0.5})
cost = get_cost_per_hour()
assert cost > 0.0
def test_get_skills_crystallized_module_fn(self, tmp_path):
with patch("timmy.sovereignty.metrics._store", None), patch(
"timmy.sovereignty.metrics.DB_PATH", tmp_path / "fn_test3.db"
with (
patch("timmy.sovereignty.metrics._store", None),
patch("timmy.sovereignty.metrics.DB_PATH", tmp_path / "fn_test3.db"),
):
record("skill_crystallized")
count = get_skills_crystallized()