[claude] Implement Kimi delegation for heavy research via Gitea labels (#979) #1085

Merged
Rockachopa merged 2 commits from claude/issue-979 into main 2026-03-23 15:14:54 +00:00
2 changed files with 953 additions and 0 deletions

View File

@@ -0,0 +1,490 @@
"""Kimi delegation for heavy research via Gitea labels.
When research exceeds local + Groq capacity, Timmy delegates to Kimi by:
1. Filling a research template with full context
2. Creating a Gitea issue labeled `kimi-ready`
3. Monitoring for Kimi's completion (issue closed + artifact committed)
4. Indexing Kimi's artifact into semantic memory
5. Extracting action items and creating follow-up issues
Delegation flow:
Timmy detects capacity exceeded
→ Fills template with context
→ Creates `kimi-ready` Gitea issue
→ Kimi picks up, executes, commits artifact, closes issue
→ Timmy indexes artifact + creates follow-ups
"""
import asyncio
import logging
import re
from typing import Any
logger = logging.getLogger(__name__)
# Label applied to issues that Kimi should pick up
KIMI_READY_LABEL = "kimi-ready"
# Label colour for the kimi-ready label (dark teal)
KIMI_LABEL_COLOR = "#006b75"
# Keywords that suggest a task exceeds local capacity
_HEAVY_RESEARCH_KEYWORDS = frozenset(
{
"comprehensive",
"exhaustive",
"systematic review",
"literature review",
"benchmark",
"comparative analysis",
"large-scale",
"survey",
"meta-analysis",
"deep research",
"extensive",
}
)
# Minimum word count that hints at a heavy task
_HEAVY_WORD_THRESHOLD = 50
def exceeds_local_capacity(task_description: str) -> bool:
"""Heuristic: does this research task exceed local + Groq capacity?
Returns True when the task description signals heavy or broad research
that benefits from Kimi's 262K context and long-running processing.
Args:
task_description: Free-text description of the research task.
Returns:
True if the task should be delegated to Kimi.
"""
lower = task_description.lower()
word_count = len(task_description.split())
has_heavy_keyword = any(kw in lower for kw in _HEAVY_RESEARCH_KEYWORDS)
is_long_task = word_count >= _HEAVY_WORD_THRESHOLD
return has_heavy_keyword or is_long_task
def _build_research_template(
task: str,
context: str,
question: str,
priority: str = "normal",
) -> str:
"""Fill the standard Kimi research template with task context.
Args:
task: Short title for the research task.
context: Background information and relevant project context.
question: The specific research question to answer.
priority: Task priority — "low", "normal", or "high".
Returns:
Markdown-formatted issue body ready for Gitea.
"""
return f"""\
## Research Request
**Priority:** {priority}
### Research Question
{question}
### Background / Context
{context}
### Scope
Please produce a thorough, well-structured research report covering:
- Direct answer to the research question above
- Supporting evidence and sources where applicable
- Trade-offs, limitations, or caveats
- Concrete recommendations or next steps
### Deliverables
Commit your findings as a markdown artifact (e.g. `memory/research/{_slugify(task)}.md`)
and close this issue when complete.
### Task
{task}
---
*Delegated by Timmy via Kimi delegation pipeline. Label: `{KIMI_READY_LABEL}`*
"""
def _slugify(text: str) -> str:
"""Convert text to a safe filename slug."""
slug = re.sub(r"[^\w\s-]", "", text.lower())
slug = re.sub(r"[\s_]+", "-", slug)
return slug[:60].strip("-")
async def _get_or_create_label(
client: Any,
base_url: str,
headers: dict[str, str],
repo: str,
) -> int | None:
"""Ensure the `kimi-ready` label exists; return its ID or None on error.
Args:
client: httpx.AsyncClient instance.
base_url: Gitea API base URL.
headers: Auth headers.
repo: owner/repo string.
Returns:
Label ID, or None if the operation failed.
"""
labels_url = f"{base_url}/repos/{repo}/labels"
# Check for existing label
try:
resp = await client.get(labels_url, headers=headers)
if resp.status_code == 200:
for label in resp.json():
if label.get("name") == KIMI_READY_LABEL:
return label["id"]
except Exception as exc:
logger.warning("Failed to list Gitea labels: %s", exc)
return None
# Create the label
try:
resp = await client.post(
labels_url,
headers=headers,
json={"name": KIMI_READY_LABEL, "color": KIMI_LABEL_COLOR},
)
if resp.status_code in (200, 201):
return resp.json().get("id")
logger.warning("Label creation returned %s: %s", resp.status_code, resp.text[:200])
except Exception as exc:
logger.warning("Failed to create Gitea label: %s", exc)
return None
async def create_kimi_research_issue(
task: str,
context: str,
question: str,
priority: str = "normal",
) -> dict[str, Any]:
"""Create a Gitea issue labeled `kimi-ready` for Kimi to pick up.
Args:
task: Short title for the research task (used as issue title).
context: Background information and project context.
question: The specific research question.
priority: Task priority — "low", "normal", or "high".
Returns:
Dict with `success`, `issue_number`, `issue_url`, and `error` keys.
"""
try:
import httpx
from config import settings
except ImportError as exc:
return {"success": False, "error": f"Missing dependency: {exc}"}
if not settings.gitea_enabled or not settings.gitea_token:
return {
"success": False,
"error": "Gitea integration not configured (no token or disabled).",
}
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
try:
async with httpx.AsyncClient(timeout=15) as client:
label_id = await _get_or_create_label(client, base_url, headers, repo)
body = _build_research_template(task, context, question, priority)
issue_payload: dict[str, Any] = {"title": task, "body": body}
if label_id is not None:
issue_payload["labels"] = [label_id]
resp = await client.post(
f"{base_url}/repos/{repo}/issues",
headers=headers,
json=issue_payload,
)
if resp.status_code in (200, 201):
data = resp.json()
number = data.get("number")
url = data.get("html_url", "")
logger.info("Created kimi-ready issue #%s: %s", number, task[:60])
return {
"success": True,
"issue_number": number,
"issue_url": url,
"error": None,
}
logger.warning("Issue creation failed (%s): %s", resp.status_code, resp.text[:200])
return {
"success": False,
"error": f"Gitea API error {resp.status_code}: {resp.text[:200]}",
}
except Exception as exc:
logger.warning("create_kimi_research_issue failed: %s", exc)
return {"success": False, "error": str(exc)}
async def poll_kimi_issue(
issue_number: int,
poll_interval: int = 60,
max_wait: int = 3600,
) -> dict[str, Any]:
"""Poll a Gitea issue until it is closed (Kimi completed) or timeout.
Args:
issue_number: The Gitea issue number to watch.
poll_interval: Seconds between polls. Default 60.
max_wait: Maximum total seconds to wait. Default 3600 (1 hour).
Returns:
Dict with `completed` bool, `state`, `body`, and `error` keys.
"""
try:
import httpx
from config import settings
except ImportError as exc:
return {"completed": False, "error": f"Missing dependency: {exc}"}
if not settings.gitea_enabled or not settings.gitea_token:
return {"completed": False, "error": "Gitea not configured."}
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {"Authorization": f"token {settings.gitea_token}"}
issue_url = f"{base_url}/repos/{repo}/issues/{issue_number}"
elapsed = 0
while elapsed < max_wait:
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(issue_url, headers=headers)
if resp.status_code == 200:
data = resp.json()
state = data.get("state", "open")
if state == "closed":
logger.info("Kimi completed issue #%s", issue_number)
return {
"completed": True,
"state": state,
"body": data.get("body", ""),
"error": None,
}
else:
logger.warning(
"Poll issue #%s returned %s", issue_number, resp.status_code
)
except Exception as exc:
logger.warning("Poll error for issue #%s: %s", issue_number, exc)
await asyncio.sleep(poll_interval)
elapsed += poll_interval
return {
"completed": False,
"state": "timeout",
"body": "",
"error": f"Timed out after {max_wait}s waiting for issue #{issue_number}",
}
def _extract_action_items(text: str) -> list[str]:
"""Extract action items from markdown text.
Looks for lines that start with checklist markers, numbered items,
or explicit "Action:" / "TODO:" prefixes.
Args:
text: Markdown text from Kimi's artifact.
Returns:
List of action item strings (deduplicated, whitespace-stripped).
"""
items: list[str] = []
patterns = [
re.compile(r"^[-*]\s+\[ \]\s+(.+)", re.MULTILINE), # - [ ] checkbox
re.compile(r"^\d+\.\s+(.+)", re.MULTILINE), # 1. numbered list
re.compile(r"^(?:Action|TODO|Next step):\s*(.+)", re.MULTILINE | re.IGNORECASE),
]
seen: set[str] = set()
for pat in patterns:
for m in pat.finditer(text):
item = m.group(1).strip()
if item and item not in seen:
items.append(item)
seen.add(item)
return items
async def index_kimi_artifact(
issue_number: int,
title: str,
artifact_content: str,
) -> dict[str, Any]:
"""Index Kimi's research artifact into Timmy's semantic memory.
Args:
issue_number: Source Gitea issue number (used as task_id).
title: Human-readable title for the memory entry.
artifact_content: The research artifact text to index.
Returns:
Dict with `success` bool and `memory_id` or `error`.
"""
if not artifact_content.strip():
return {"success": False, "error": "Empty artifact — nothing to index."}
try:
import asyncio
from timmy.memory_system import store_memory
# store_memory is synchronous — wrap in thread to avoid blocking event loop
entry = await asyncio.to_thread(
store_memory,
content=artifact_content,
source="kimi",
context_type="document",
task_id=str(issue_number),
metadata={"issue_number": issue_number, "title": title},
)
logger.info("Indexed Kimi artifact for issue #%s (id=%s)", issue_number, entry.id)
return {"success": True, "memory_id": entry.id}
except Exception as exc:
logger.warning("Failed to index Kimi artifact for issue #%s: %s", issue_number, exc)
return {"success": False, "error": str(exc)}
async def extract_and_create_followups(
artifact_content: str,
source_issue_number: int,
) -> dict[str, Any]:
"""Extract action items from artifact and create follow-up Gitea issues.
Args:
artifact_content: Text of Kimi's research artifact.
source_issue_number: Issue number that produced the artifact (for cross-links).
Returns:
Dict with `success`, `created` (list of issue numbers), and `error`.
"""
items = _extract_action_items(artifact_content)
if not items:
logger.info("No action items found in artifact for issue #%s", source_issue_number)
return {"success": True, "created": [], "error": None}
try:
import httpx
from config import settings
except ImportError as exc:
return {"success": False, "created": [], "error": str(exc)}
if not settings.gitea_enabled or not settings.gitea_token:
return {
"success": False,
"created": [],
"error": "Gitea not configured.",
}
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
created: list[int] = []
for item in items:
body = (
f"Follow-up from Kimi research artifact in #{source_issue_number}.\n\n"
f"**Action item:** {item}"
)
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
f"{base_url}/repos/{repo}/issues",
headers=headers,
json={"title": item[:120], "body": body},
)
if resp.status_code in (200, 201):
num = resp.json().get("number")
if num:
created.append(num)
logger.info(
"Created follow-up issue #%s from kimi artifact #%s",
num,
source_issue_number,
)
else:
logger.warning(
"Follow-up issue creation returned %s for item: %s",
resp.status_code,
item[:60],
)
except Exception as exc:
logger.warning("Failed to create follow-up for item '%s': %s", item[:60], exc)
return {"success": True, "created": created, "error": None}
async def delegate_research_to_kimi(
task: str,
context: str,
question: str,
priority: str = "normal",
) -> dict[str, Any]:
"""Top-level entry point: delegate a heavy research task to Kimi.
Creates the `kimi-ready` Gitea issue and returns immediately.
Monitoring, artifact indexing, and follow-up creation happen
separately via `poll_kimi_issue`, `index_kimi_artifact`, and
`extract_and_create_followups`.
Args:
task: Short title (becomes the issue title).
context: Background / project context.
question: The specific research question Kimi should answer.
priority: "low", "normal", or "high".
Returns:
Dict with `success`, `issue_number`, `issue_url`, and `error`.
"""
if not task.strip() or not question.strip():
return {
"success": False,
"error": "Both `task` and `question` are required.",
}
logger.info("Delegating research to Kimi: %s", task[:80])
return await create_kimi_research_issue(task, context, question, priority)

View File

@@ -0,0 +1,463 @@
"""Unit tests for timmy.kimi_delegation — Kimi research delegation via Gitea labels."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.kimi_delegation import (
KIMI_LABEL_COLOR,
KIMI_READY_LABEL,
_build_research_template,
_extract_action_items,
_slugify,
delegate_research_to_kimi,
exceeds_local_capacity,
)
# ── Constants ─────────────────────────────────────────────────────────────────
def test_kimi_ready_label():
assert KIMI_READY_LABEL == "kimi-ready"
def test_kimi_label_color_is_hex():
assert KIMI_LABEL_COLOR.startswith("#")
assert len(KIMI_LABEL_COLOR) == 7
# ── exceeds_local_capacity ────────────────────────────────────────────────────
class TestExceedsLocalCapacity:
def test_keyword_comprehensive(self):
assert exceeds_local_capacity("Do a comprehensive review of X") is True
def test_keyword_deep_research(self):
assert exceeds_local_capacity("deep research into neural networks") is True
def test_keyword_benchmark(self):
assert exceeds_local_capacity("benchmark these five models") is True
def test_keyword_exhaustive(self):
assert exceeds_local_capacity("exhaustive list of options") is True
def test_keyword_case_insensitive(self):
assert exceeds_local_capacity("COMPREHENSIVE analysis") is True
def test_keyword_survey(self):
assert exceeds_local_capacity("survey all available tools") is True
def test_keyword_extensive(self):
assert exceeds_local_capacity("extensive documentation needed") is True
def test_short_simple_task(self):
assert exceeds_local_capacity("fix the login bug") is False
def test_long_task_exceeds_word_threshold(self):
long_task = " ".join(["word"] * 55)
assert exceeds_local_capacity(long_task) is True
def test_exactly_at_threshold(self):
at_threshold = " ".join(["word"] * 50)
assert exceeds_local_capacity(at_threshold) is True
def test_just_below_threshold(self):
short = " ".join(["word"] * 49)
assert exceeds_local_capacity(short) is False
def test_empty_string(self):
assert exceeds_local_capacity("") is False
# ── _slugify ──────────────────────────────────────────────────────────────────
class TestSlugify:
def test_simple_text(self):
assert _slugify("Hello World") == "hello-world"
def test_special_characters_removed(self):
assert _slugify("Hello, World!") == "hello-world"
def test_underscores_become_dashes(self):
assert _slugify("hello_world") == "hello-world"
def test_multiple_spaces(self):
assert _slugify("hello world") == "hello-world"
def test_truncates_to_60(self):
long = "a" * 80
result = _slugify(long)
assert len(result) <= 60
def test_no_leading_trailing_dashes(self):
result = _slugify(" hello ")
assert not result.startswith("-")
assert not result.endswith("-")
def test_empty_string(self):
assert _slugify("") == ""
# ── _build_research_template ──────────────────────────────────────────────────
class TestBuildResearchTemplate:
def test_contains_task(self):
body = _build_research_template("My Task", "some context", "What is X?")
assert "My Task" in body
def test_contains_question(self):
body = _build_research_template("Task", "ctx", "What is the answer?")
assert "What is the answer?" in body
def test_contains_context(self):
body = _build_research_template("Task", "project background", "Q?")
assert "project background" in body
def test_contains_kimi_ready_label(self):
body = _build_research_template("Task", "ctx", "Q?")
assert KIMI_READY_LABEL in body
def test_default_priority_normal(self):
body = _build_research_template("Task", "ctx", "Q?")
assert "normal" in body
def test_custom_priority_high(self):
body = _build_research_template("Task", "ctx", "Q?", priority="high")
assert "high" in body
def test_contains_deliverables_section(self):
body = _build_research_template("Task", "ctx", "Q?")
assert "Deliverables" in body
def test_slug_in_artifact_path(self):
body = _build_research_template("My Research Task", "ctx", "Q?")
assert "my-research-task" in body
def test_contains_research_request_header(self):
body = _build_research_template("Task", "ctx", "Q?")
assert "## Research Request" in body
# ── _extract_action_items ─────────────────────────────────────────────────────
class TestExtractActionItems:
def test_checkbox_items(self):
text = "- [ ] Do thing A\n- [ ] Do thing B"
items = _extract_action_items(text)
assert "Do thing A" in items
assert "Do thing B" in items
def test_numbered_list(self):
text = "1. First step\n2. Second step\n3. Third step"
items = _extract_action_items(text)
assert "First step" in items
assert "Second step" in items
assert "Third step" in items
def test_action_prefix(self):
text = "Action: Implement caching layer"
items = _extract_action_items(text)
assert "Implement caching layer" in items
def test_todo_prefix(self):
text = "TODO: Write tests"
items = _extract_action_items(text)
assert "Write tests" in items
def test_next_step_prefix(self):
text = "Next step: Deploy to staging"
items = _extract_action_items(text)
assert "Deploy to staging" in items
def test_case_insensitive_prefixes(self):
text = "TODO: Upper\ntodo: lower\nTodo: Mixed"
items = _extract_action_items(text)
assert len(items) == 3
def test_deduplication(self):
text = "1. Do the thing\n2. Do the thing"
items = _extract_action_items(text)
assert items.count("Do the thing") == 1
def test_empty_text(self):
assert _extract_action_items("") == []
def test_no_action_items(self):
text = "This is just a paragraph with no action items."
assert _extract_action_items(text) == []
def test_returns_list(self):
assert isinstance(_extract_action_items("1. Item"), list)
# ── delegate_research_to_kimi ─────────────────────────────────────────────────
class TestDelegateResearchToKimi:
@pytest.mark.asyncio
async def test_empty_task_returns_error(self):
result = await delegate_research_to_kimi("", "context", "question?")
assert result["success"] is False
assert "task" in result["error"].lower()
@pytest.mark.asyncio
async def test_whitespace_task_returns_error(self):
result = await delegate_research_to_kimi(" ", "context", "question?")
assert result["success"] is False
@pytest.mark.asyncio
async def test_empty_question_returns_error(self):
result = await delegate_research_to_kimi("Task title", "context", "")
assert result["success"] is False
assert "question" in result["error"].lower()
@pytest.mark.asyncio
async def test_whitespace_question_returns_error(self):
result = await delegate_research_to_kimi("Task", "ctx", " ")
assert result["success"] is False
@pytest.mark.asyncio
async def test_delegates_to_create_issue(self):
with patch(
"timmy.kimi_delegation.create_kimi_research_issue",
new_callable=AsyncMock,
return_value={
"success": True,
"issue_number": 42,
"issue_url": "http://x/42",
"error": None,
},
) as mock_create:
result = await delegate_research_to_kimi("Task", "ctx", "What is X?", "high")
mock_create.assert_awaited_once_with("Task", "ctx", "What is X?", "high")
assert result["success"] is True
assert result["issue_number"] == 42
@pytest.mark.asyncio
async def test_passes_default_priority(self):
with patch(
"timmy.kimi_delegation.create_kimi_research_issue",
new_callable=AsyncMock,
return_value={"success": True, "issue_number": 1, "issue_url": "", "error": None},
) as mock_create:
await delegate_research_to_kimi("Task", "ctx", "Q?")
_, _, _, priority = mock_create.call_args.args
assert priority == "normal"
# ── create_kimi_research_issue ────────────────────────────────────────────────
class TestCreateKimiResearchIssue:
@pytest.mark.asyncio
async def test_no_gitea_token_returns_error(self):
from timmy.kimi_delegation import create_kimi_research_issue
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = ""
with patch("config.settings", mock_settings):
result = await create_kimi_research_issue("Task", "ctx", "Q?")
assert result["success"] is False
assert "not configured" in result["error"]
@pytest.mark.asyncio
async def test_gitea_disabled_returns_error(self):
from timmy.kimi_delegation import create_kimi_research_issue
mock_settings = MagicMock()
mock_settings.gitea_enabled = False
mock_settings.gitea_token = "tok"
with patch("config.settings", mock_settings):
result = await create_kimi_research_issue("Task", "ctx", "Q?")
assert result["success"] is False
@pytest.mark.asyncio
async def test_successful_issue_creation(self):
from timmy.kimi_delegation import create_kimi_research_issue
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://gitea.local"
mock_settings.gitea_repo = "owner/repo"
label_resp = MagicMock()
label_resp.status_code = 200
label_resp.json.return_value = [{"name": "kimi-ready", "id": 7}]
issue_resp = MagicMock()
issue_resp.status_code = 201
issue_resp.json.return_value = {
"number": 101,
"html_url": "http://gitea.local/issues/101",
}
mock_client = AsyncMock()
mock_client.get.return_value = label_resp
mock_client.post.return_value = issue_resp
async_ctx = AsyncMock()
async_ctx.__aenter__.return_value = mock_client
async_ctx.__aexit__.return_value = False
with (
patch("config.settings", mock_settings),
patch("httpx.AsyncClient", return_value=async_ctx),
):
result = await create_kimi_research_issue("Task", "ctx", "Q?")
assert result["success"] is True
assert result["issue_number"] == 101
assert result["error"] is None
@pytest.mark.asyncio
async def test_api_error_returns_failure(self):
from timmy.kimi_delegation import create_kimi_research_issue
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok"
mock_settings.gitea_url = "http://gitea.local"
mock_settings.gitea_repo = "owner/repo"
label_resp = MagicMock()
label_resp.status_code = 200
label_resp.json.return_value = [{"name": "kimi-ready", "id": 7}]
issue_resp = MagicMock()
issue_resp.status_code = 500
issue_resp.text = "Internal Server Error"
mock_client = AsyncMock()
mock_client.get.return_value = label_resp
mock_client.post.return_value = issue_resp
async_ctx = AsyncMock()
async_ctx.__aenter__.return_value = mock_client
async_ctx.__aexit__.return_value = False
with (
patch("config.settings", mock_settings),
patch("httpx.AsyncClient", return_value=async_ctx),
):
result = await create_kimi_research_issue("Task", "ctx", "Q?")
assert result["success"] is False
assert "500" in result["error"]
# ── index_kimi_artifact ───────────────────────────────────────────────────────
class TestIndexKimiArtifact:
@pytest.mark.asyncio
async def test_empty_artifact_returns_error(self):
from timmy.kimi_delegation import index_kimi_artifact
result = await index_kimi_artifact(42, "Title", "")
assert result["success"] is False
assert "Empty" in result["error"]
@pytest.mark.asyncio
async def test_whitespace_only_artifact_returns_error(self):
from timmy.kimi_delegation import index_kimi_artifact
result = await index_kimi_artifact(42, "Title", " \n ")
assert result["success"] is False
@pytest.mark.asyncio
async def test_successful_indexing(self):
from timmy.kimi_delegation import index_kimi_artifact
mock_entry = MagicMock()
mock_entry.id = "mem-abc-123"
with patch("timmy.memory_system.store_memory", return_value=mock_entry) as mock_store:
result = await index_kimi_artifact(55, "Research Title", "Artifact content here.")
assert result["success"] is True
assert result["memory_id"] == "mem-abc-123"
mock_store.assert_called_once()
call_kwargs = mock_store.call_args.kwargs
assert call_kwargs["source"] == "kimi"
assert call_kwargs["context_type"] == "document"
assert call_kwargs["task_id"] == "55"
@pytest.mark.asyncio
async def test_store_memory_exception_returns_error(self):
from timmy.kimi_delegation import index_kimi_artifact
with patch(
"timmy.memory_system.store_memory",
side_effect=RuntimeError("DB error"),
):
result = await index_kimi_artifact(1, "T", "Some content")
assert result["success"] is False
assert "DB error" in result["error"]
# ── extract_and_create_followups ──────────────────────────────────────────────
class TestExtractAndCreateFollowups:
@pytest.mark.asyncio
async def test_no_action_items_returns_empty_list(self):
from timmy.kimi_delegation import extract_and_create_followups
result = await extract_and_create_followups("No action items here.", 10)
assert result["success"] is True
assert result["created"] == []
assert result["error"] is None
@pytest.mark.asyncio
async def test_gitea_not_configured(self):
from timmy.kimi_delegation import extract_and_create_followups
mock_settings = MagicMock()
mock_settings.gitea_enabled = False
mock_settings.gitea_token = ""
with patch("config.settings", mock_settings):
result = await extract_and_create_followups("1. Do the thing", 10)
assert result["success"] is False
assert result["created"] == []
@pytest.mark.asyncio
async def test_creates_followup_issues(self):
from timmy.kimi_delegation import extract_and_create_followups
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok"
mock_settings.gitea_url = "http://gitea.local"
mock_settings.gitea_repo = "owner/repo"
issue_resp = MagicMock()
issue_resp.status_code = 201
issue_resp.json.return_value = {"number": 200}
mock_client = AsyncMock()
mock_client.post.return_value = issue_resp
async_ctx = AsyncMock()
async_ctx.__aenter__.return_value = mock_client
async_ctx.__aexit__.return_value = False
with (
patch("config.settings", mock_settings),
patch("httpx.AsyncClient", return_value=async_ctx),
):
result = await extract_and_create_followups(
"1. Do the thing\n2. Do another thing", 10
)
assert result["success"] is True
assert 200 in result["created"]