[claude] feat: enforce 3-issue cap on Kimi delegation (#1304) (#1310)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled

This commit was merged in pull request #1310.
This commit is contained in:
2026-03-24 02:00:34 +00:00
parent 8304cf50da
commit 0b4ed1b756
2 changed files with 247 additions and 0 deletions

View File

@@ -28,6 +28,9 @@ KIMI_READY_LABEL = "kimi-ready"
# Label colour for the kimi-ready label (dark teal)
KIMI_LABEL_COLOR = "#006b75"
# Maximum number of concurrent active (open) Kimi-delegated issues
KIMI_MAX_ACTIVE_ISSUES = 3
# Keywords that suggest a task exceeds local capacity
_HEAVY_RESEARCH_KEYWORDS = frozenset(
{
@@ -176,6 +179,38 @@ async def _get_or_create_label(
return None
async def _count_active_kimi_issues(
client: Any,
base_url: str,
headers: dict[str, str],
repo: str,
) -> int:
"""Count open issues that carry the `kimi-ready` label.
Args:
client: httpx.AsyncClient instance.
base_url: Gitea API base URL.
headers: Auth headers.
repo: owner/repo string.
Returns:
Number of open kimi-ready issues, or 0 on error (fail-open to avoid
blocking delegation when Gitea is unreachable).
"""
try:
resp = await client.get(
f"{base_url}/repos/{repo}/issues",
headers=headers,
params={"state": "open", "type": "issues", "labels": KIMI_READY_LABEL, "limit": 50},
)
if resp.status_code == 200:
return len(resp.json())
logger.warning("count_active_kimi_issues: unexpected status %s", resp.status_code)
except Exception as exc:
logger.warning("count_active_kimi_issues failed: %s", exc)
return 0
async def create_kimi_research_issue(
task: str,
context: str,
@@ -217,6 +252,22 @@ async def create_kimi_research_issue(
async with httpx.AsyncClient(timeout=15) as client:
label_id = await _get_or_create_label(client, base_url, headers, repo)
active_count = await _count_active_kimi_issues(client, base_url, headers, repo)
if active_count >= KIMI_MAX_ACTIVE_ISSUES:
logger.warning(
"Kimi delegation cap reached (%d/%d active) — skipping: %s",
active_count,
KIMI_MAX_ACTIVE_ISSUES,
task[:60],
)
return {
"success": False,
"error": (
f"Kimi delegation cap reached: {active_count} active issues "
f"(max {KIMI_MAX_ACTIVE_ISSUES}). Resolve existing issues first."
),
}
body = _build_research_template(task, context, question, priority)
issue_payload: dict[str, Any] = {"title": task, "body": body}
if label_id is not None:

View File

@@ -6,8 +6,10 @@ import pytest
from timmy.kimi_delegation import (
KIMI_LABEL_COLOR,
KIMI_MAX_ACTIVE_ISSUES,
KIMI_READY_LABEL,
_build_research_template,
_count_active_kimi_issues,
_extract_action_items,
_slugify,
delegate_research_to_kimi,
@@ -458,3 +460,197 @@ class TestExtractAndCreateFollowups:
assert result["success"] is True
assert 200 in result["created"]
# ── KIMI_MAX_ACTIVE_ISSUES constant ───────────────────────────────────────────
def test_kimi_max_active_issues_value():
assert KIMI_MAX_ACTIVE_ISSUES == 3
# ── _count_active_kimi_issues ─────────────────────────────────────────────────
class TestCountActiveKimiIssues:
@pytest.mark.asyncio
async def test_returns_count_from_api(self):
mock_client = AsyncMock()
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = [{"number": 1}, {"number": 2}]
mock_client.get.return_value = resp
count = await _count_active_kimi_issues(
mock_client, "http://gitea.local/api/v1", {}, "owner/repo"
)
assert count == 2
@pytest.mark.asyncio
async def test_returns_zero_on_api_error(self):
mock_client = AsyncMock()
resp = MagicMock()
resp.status_code = 500
mock_client.get.return_value = resp
count = await _count_active_kimi_issues(
mock_client, "http://gitea.local/api/v1", {}, "owner/repo"
)
assert count == 0
@pytest.mark.asyncio
async def test_returns_zero_on_exception(self):
mock_client = AsyncMock()
mock_client.get.side_effect = Exception("network error")
count = await _count_active_kimi_issues(
mock_client, "http://gitea.local/api/v1", {}, "owner/repo"
)
assert count == 0
@pytest.mark.asyncio
async def test_queries_open_issues_with_kimi_label(self):
mock_client = AsyncMock()
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = []
mock_client.get.return_value = resp
await _count_active_kimi_issues(
mock_client, "http://gitea.local/api/v1", {}, "owner/repo"
)
call_kwargs = mock_client.get.call_args.kwargs
assert call_kwargs["params"]["state"] == "open"
assert call_kwargs["params"]["labels"] == KIMI_READY_LABEL
# ── Cap enforcement in create_kimi_research_issue ─────────────────────────────
class TestKimiCapEnforcement:
def _make_settings(self):
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://gitea.local"
mock_settings.gitea_repo = "owner/repo"
return mock_settings
def _make_async_client(self, label_json, issue_count):
label_resp = MagicMock()
label_resp.status_code = 200
label_resp.json.return_value = label_json
count_resp = MagicMock()
count_resp.status_code = 200
count_resp.json.return_value = [{"number": i} for i in range(issue_count)]
mock_client = AsyncMock()
mock_client.get.side_effect = [label_resp, count_resp]
async_ctx = AsyncMock()
async_ctx.__aenter__.return_value = mock_client
async_ctx.__aexit__.return_value = False
return async_ctx
@pytest.mark.asyncio
async def test_cap_reached_returns_failure(self):
from timmy.kimi_delegation import create_kimi_research_issue
async_ctx = self._make_async_client(
[{"name": "kimi-ready", "id": 7}], issue_count=3
)
with (
patch("config.settings", self._make_settings()),
patch("httpx.AsyncClient", return_value=async_ctx),
):
result = await create_kimi_research_issue("Task", "ctx", "Q?")
assert result["success"] is False
assert "cap" in result["error"].lower()
assert "3" in result["error"]
@pytest.mark.asyncio
async def test_cap_exceeded_returns_failure(self):
from timmy.kimi_delegation import create_kimi_research_issue
async_ctx = self._make_async_client(
[{"name": "kimi-ready", "id": 7}], issue_count=5
)
with (
patch("config.settings", self._make_settings()),
patch("httpx.AsyncClient", return_value=async_ctx),
):
result = await create_kimi_research_issue("Task", "ctx", "Q?")
assert result["success"] is False
@pytest.mark.asyncio
async def test_below_cap_proceeds_to_create(self):
from timmy.kimi_delegation import create_kimi_research_issue
label_resp = MagicMock()
label_resp.status_code = 200
label_resp.json.return_value = [{"name": "kimi-ready", "id": 7}]
count_resp = MagicMock()
count_resp.status_code = 200
count_resp.json.return_value = [{"number": 1}, {"number": 2}] # 2 active < cap of 3
issue_resp = MagicMock()
issue_resp.status_code = 201
issue_resp.json.return_value = {
"number": 99,
"html_url": "http://gitea.local/issues/99",
}
mock_client = AsyncMock()
mock_client.get.side_effect = [label_resp, count_resp]
mock_client.post.return_value = issue_resp
async_ctx = AsyncMock()
async_ctx.__aenter__.return_value = mock_client
async_ctx.__aexit__.return_value = False
with (
patch("config.settings", self._make_settings()),
patch("httpx.AsyncClient", return_value=async_ctx),
):
result = await create_kimi_research_issue("Task", "ctx", "Q?")
assert result["success"] is True
assert result["issue_number"] == 99
@pytest.mark.asyncio
async def test_zero_active_issues_proceeds(self):
from timmy.kimi_delegation import create_kimi_research_issue
label_resp = MagicMock()
label_resp.status_code = 200
label_resp.json.return_value = [{"name": "kimi-ready", "id": 7}]
count_resp = MagicMock()
count_resp.status_code = 200
count_resp.json.return_value = []
issue_resp = MagicMock()
issue_resp.status_code = 201
issue_resp.json.return_value = {"number": 50, "html_url": "http://gitea.local/issues/50"}
mock_client = AsyncMock()
mock_client.get.side_effect = [label_resp, count_resp]
mock_client.post.return_value = issue_resp
async_ctx = AsyncMock()
async_ctx.__aenter__.return_value = mock_client
async_ctx.__aexit__.return_value = False
with (
patch("config.settings", self._make_settings()),
patch("httpx.AsyncClient", return_value=async_ctx),
):
result = await create_kimi_research_issue("Task", "ctx", "Q?")
assert result["success"] is True