[claude] Implement Kimi delegation for heavy research via Gitea labels (#979) #1085
490
src/timmy/kimi_delegation.py
Normal file
490
src/timmy/kimi_delegation.py
Normal file
@@ -0,0 +1,490 @@
|
|||||||
|
"""Kimi delegation for heavy research via Gitea labels.
|
||||||
|
|
||||||
|
When research exceeds local + Groq capacity, Timmy delegates to Kimi by:
|
||||||
|
1. Filling a research template with full context
|
||||||
|
2. Creating a Gitea issue labeled `kimi-ready`
|
||||||
|
3. Monitoring for Kimi's completion (issue closed + artifact committed)
|
||||||
|
4. Indexing Kimi's artifact into semantic memory
|
||||||
|
5. Extracting action items and creating follow-up issues
|
||||||
|
|
||||||
|
Delegation flow:
|
||||||
|
Timmy detects capacity exceeded
|
||||||
|
→ Fills template with context
|
||||||
|
→ Creates `kimi-ready` Gitea issue
|
||||||
|
→ Kimi picks up, executes, commits artifact, closes issue
|
||||||
|
→ Timmy indexes artifact + creates follow-ups
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Label applied to issues that Kimi should pick up
|
||||||
|
KIMI_READY_LABEL = "kimi-ready"
|
||||||
|
|
||||||
|
# Label colour for the kimi-ready label (dark teal)
|
||||||
|
KIMI_LABEL_COLOR = "#006b75"
|
||||||
|
|
||||||
|
# Keywords that suggest a task exceeds local capacity
|
||||||
|
_HEAVY_RESEARCH_KEYWORDS = frozenset(
|
||||||
|
{
|
||||||
|
"comprehensive",
|
||||||
|
"exhaustive",
|
||||||
|
"systematic review",
|
||||||
|
"literature review",
|
||||||
|
"benchmark",
|
||||||
|
"comparative analysis",
|
||||||
|
"large-scale",
|
||||||
|
"survey",
|
||||||
|
"meta-analysis",
|
||||||
|
"deep research",
|
||||||
|
"extensive",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Minimum word count that hints at a heavy task
|
||||||
|
_HEAVY_WORD_THRESHOLD = 50
|
||||||
|
|
||||||
|
|
||||||
|
def exceeds_local_capacity(task_description: str) -> bool:
|
||||||
|
"""Heuristic: does this research task exceed local + Groq capacity?
|
||||||
|
|
||||||
|
Returns True when the task description signals heavy or broad research
|
||||||
|
that benefits from Kimi's 262K context and long-running processing.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task_description: Free-text description of the research task.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if the task should be delegated to Kimi.
|
||||||
|
"""
|
||||||
|
lower = task_description.lower()
|
||||||
|
word_count = len(task_description.split())
|
||||||
|
|
||||||
|
has_heavy_keyword = any(kw in lower for kw in _HEAVY_RESEARCH_KEYWORDS)
|
||||||
|
is_long_task = word_count >= _HEAVY_WORD_THRESHOLD
|
||||||
|
|
||||||
|
return has_heavy_keyword or is_long_task
|
||||||
|
|
||||||
|
|
||||||
|
def _build_research_template(
|
||||||
|
task: str,
|
||||||
|
context: str,
|
||||||
|
question: str,
|
||||||
|
priority: str = "normal",
|
||||||
|
) -> str:
|
||||||
|
"""Fill the standard Kimi research template with task context.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task: Short title for the research task.
|
||||||
|
context: Background information and relevant project context.
|
||||||
|
question: The specific research question to answer.
|
||||||
|
priority: Task priority — "low", "normal", or "high".
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Markdown-formatted issue body ready for Gitea.
|
||||||
|
"""
|
||||||
|
return f"""\
|
||||||
|
## Research Request
|
||||||
|
|
||||||
|
**Priority:** {priority}
|
||||||
|
|
||||||
|
### Research Question
|
||||||
|
|
||||||
|
{question}
|
||||||
|
|
||||||
|
### Background / Context
|
||||||
|
|
||||||
|
{context}
|
||||||
|
|
||||||
|
### Scope
|
||||||
|
|
||||||
|
Please produce a thorough, well-structured research report covering:
|
||||||
|
|
||||||
|
- Direct answer to the research question above
|
||||||
|
- Supporting evidence and sources where applicable
|
||||||
|
- Trade-offs, limitations, or caveats
|
||||||
|
- Concrete recommendations or next steps
|
||||||
|
|
||||||
|
### Deliverables
|
||||||
|
|
||||||
|
Commit your findings as a markdown artifact (e.g. `memory/research/{_slugify(task)}.md`)
|
||||||
|
and close this issue when complete.
|
||||||
|
|
||||||
|
### Task
|
||||||
|
|
||||||
|
{task}
|
||||||
|
|
||||||
|
---
|
||||||
|
*Delegated by Timmy via Kimi delegation pipeline. Label: `{KIMI_READY_LABEL}`*
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def _slugify(text: str) -> str:
|
||||||
|
"""Convert text to a safe filename slug."""
|
||||||
|
slug = re.sub(r"[^\w\s-]", "", text.lower())
|
||||||
|
slug = re.sub(r"[\s_]+", "-", slug)
|
||||||
|
return slug[:60].strip("-")
|
||||||
|
|
||||||
|
|
||||||
|
async def _get_or_create_label(
|
||||||
|
client: Any,
|
||||||
|
base_url: str,
|
||||||
|
headers: dict[str, str],
|
||||||
|
repo: str,
|
||||||
|
) -> int | None:
|
||||||
|
"""Ensure the `kimi-ready` label exists; return its ID or None on error.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
client: httpx.AsyncClient instance.
|
||||||
|
base_url: Gitea API base URL.
|
||||||
|
headers: Auth headers.
|
||||||
|
repo: owner/repo string.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Label ID, or None if the operation failed.
|
||||||
|
"""
|
||||||
|
labels_url = f"{base_url}/repos/{repo}/labels"
|
||||||
|
|
||||||
|
# Check for existing label
|
||||||
|
try:
|
||||||
|
resp = await client.get(labels_url, headers=headers)
|
||||||
|
if resp.status_code == 200:
|
||||||
|
for label in resp.json():
|
||||||
|
if label.get("name") == KIMI_READY_LABEL:
|
||||||
|
return label["id"]
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Failed to list Gitea labels: %s", exc)
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Create the label
|
||||||
|
try:
|
||||||
|
resp = await client.post(
|
||||||
|
labels_url,
|
||||||
|
headers=headers,
|
||||||
|
json={"name": KIMI_READY_LABEL, "color": KIMI_LABEL_COLOR},
|
||||||
|
)
|
||||||
|
if resp.status_code in (200, 201):
|
||||||
|
return resp.json().get("id")
|
||||||
|
logger.warning("Label creation returned %s: %s", resp.status_code, resp.text[:200])
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Failed to create Gitea label: %s", exc)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def create_kimi_research_issue(
|
||||||
|
task: str,
|
||||||
|
context: str,
|
||||||
|
question: str,
|
||||||
|
priority: str = "normal",
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Create a Gitea issue labeled `kimi-ready` for Kimi to pick up.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task: Short title for the research task (used as issue title).
|
||||||
|
context: Background information and project context.
|
||||||
|
question: The specific research question.
|
||||||
|
priority: Task priority — "low", "normal", or "high".
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with `success`, `issue_number`, `issue_url`, and `error` keys.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from config import settings
|
||||||
|
except ImportError as exc:
|
||||||
|
return {"success": False, "error": f"Missing dependency: {exc}"}
|
||||||
|
|
||||||
|
if not settings.gitea_enabled or not settings.gitea_token:
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": "Gitea integration not configured (no token or disabled).",
|
||||||
|
}
|
||||||
|
|
||||||
|
base_url = f"{settings.gitea_url}/api/v1"
|
||||||
|
repo = settings.gitea_repo
|
||||||
|
headers = {
|
||||||
|
"Authorization": f"token {settings.gitea_token}",
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=15) as client:
|
||||||
|
label_id = await _get_or_create_label(client, base_url, headers, repo)
|
||||||
|
|
||||||
|
body = _build_research_template(task, context, question, priority)
|
||||||
|
issue_payload: dict[str, Any] = {"title": task, "body": body}
|
||||||
|
if label_id is not None:
|
||||||
|
issue_payload["labels"] = [label_id]
|
||||||
|
|
||||||
|
resp = await client.post(
|
||||||
|
f"{base_url}/repos/{repo}/issues",
|
||||||
|
headers=headers,
|
||||||
|
json=issue_payload,
|
||||||
|
)
|
||||||
|
|
||||||
|
if resp.status_code in (200, 201):
|
||||||
|
data = resp.json()
|
||||||
|
number = data.get("number")
|
||||||
|
url = data.get("html_url", "")
|
||||||
|
logger.info("Created kimi-ready issue #%s: %s", number, task[:60])
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"issue_number": number,
|
||||||
|
"issue_url": url,
|
||||||
|
"error": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.warning("Issue creation failed (%s): %s", resp.status_code, resp.text[:200])
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": f"Gitea API error {resp.status_code}: {resp.text[:200]}",
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("create_kimi_research_issue failed: %s", exc)
|
||||||
|
return {"success": False, "error": str(exc)}
|
||||||
|
|
||||||
|
|
||||||
|
async def poll_kimi_issue(
|
||||||
|
issue_number: int,
|
||||||
|
poll_interval: int = 60,
|
||||||
|
max_wait: int = 3600,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Poll a Gitea issue until it is closed (Kimi completed) or timeout.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
issue_number: The Gitea issue number to watch.
|
||||||
|
poll_interval: Seconds between polls. Default 60.
|
||||||
|
max_wait: Maximum total seconds to wait. Default 3600 (1 hour).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with `completed` bool, `state`, `body`, and `error` keys.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from config import settings
|
||||||
|
except ImportError as exc:
|
||||||
|
return {"completed": False, "error": f"Missing dependency: {exc}"}
|
||||||
|
|
||||||
|
if not settings.gitea_enabled or not settings.gitea_token:
|
||||||
|
return {"completed": False, "error": "Gitea not configured."}
|
||||||
|
|
||||||
|
base_url = f"{settings.gitea_url}/api/v1"
|
||||||
|
repo = settings.gitea_repo
|
||||||
|
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||||
|
issue_url = f"{base_url}/repos/{repo}/issues/{issue_number}"
|
||||||
|
|
||||||
|
elapsed = 0
|
||||||
|
while elapsed < max_wait:
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=10) as client:
|
||||||
|
resp = await client.get(issue_url, headers=headers)
|
||||||
|
|
||||||
|
if resp.status_code == 200:
|
||||||
|
data = resp.json()
|
||||||
|
state = data.get("state", "open")
|
||||||
|
if state == "closed":
|
||||||
|
logger.info("Kimi completed issue #%s", issue_number)
|
||||||
|
return {
|
||||||
|
"completed": True,
|
||||||
|
"state": state,
|
||||||
|
"body": data.get("body", ""),
|
||||||
|
"error": None,
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
"Poll issue #%s returned %s", issue_number, resp.status_code
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Poll error for issue #%s: %s", issue_number, exc)
|
||||||
|
|
||||||
|
await asyncio.sleep(poll_interval)
|
||||||
|
elapsed += poll_interval
|
||||||
|
|
||||||
|
return {
|
||||||
|
"completed": False,
|
||||||
|
"state": "timeout",
|
||||||
|
"body": "",
|
||||||
|
"error": f"Timed out after {max_wait}s waiting for issue #{issue_number}",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_action_items(text: str) -> list[str]:
|
||||||
|
"""Extract action items from markdown text.
|
||||||
|
|
||||||
|
Looks for lines that start with checklist markers, numbered items,
|
||||||
|
or explicit "Action:" / "TODO:" prefixes.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: Markdown text from Kimi's artifact.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of action item strings (deduplicated, whitespace-stripped).
|
||||||
|
"""
|
||||||
|
items: list[str] = []
|
||||||
|
patterns = [
|
||||||
|
re.compile(r"^[-*]\s+\[ \]\s+(.+)", re.MULTILINE), # - [ ] checkbox
|
||||||
|
re.compile(r"^\d+\.\s+(.+)", re.MULTILINE), # 1. numbered list
|
||||||
|
re.compile(r"^(?:Action|TODO|Next step):\s*(.+)", re.MULTILINE | re.IGNORECASE),
|
||||||
|
]
|
||||||
|
seen: set[str] = set()
|
||||||
|
for pat in patterns:
|
||||||
|
for m in pat.finditer(text):
|
||||||
|
item = m.group(1).strip()
|
||||||
|
if item and item not in seen:
|
||||||
|
items.append(item)
|
||||||
|
seen.add(item)
|
||||||
|
return items
|
||||||
|
|
||||||
|
|
||||||
|
async def index_kimi_artifact(
|
||||||
|
issue_number: int,
|
||||||
|
title: str,
|
||||||
|
artifact_content: str,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Index Kimi's research artifact into Timmy's semantic memory.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
issue_number: Source Gitea issue number (used as task_id).
|
||||||
|
title: Human-readable title for the memory entry.
|
||||||
|
artifact_content: The research artifact text to index.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with `success` bool and `memory_id` or `error`.
|
||||||
|
"""
|
||||||
|
if not artifact_content.strip():
|
||||||
|
return {"success": False, "error": "Empty artifact — nothing to index."}
|
||||||
|
|
||||||
|
try:
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
from timmy.memory_system import store_memory
|
||||||
|
|
||||||
|
# store_memory is synchronous — wrap in thread to avoid blocking event loop
|
||||||
|
entry = await asyncio.to_thread(
|
||||||
|
store_memory,
|
||||||
|
content=artifact_content,
|
||||||
|
source="kimi",
|
||||||
|
context_type="document",
|
||||||
|
task_id=str(issue_number),
|
||||||
|
metadata={"issue_number": issue_number, "title": title},
|
||||||
|
)
|
||||||
|
logger.info("Indexed Kimi artifact for issue #%s (id=%s)", issue_number, entry.id)
|
||||||
|
return {"success": True, "memory_id": entry.id}
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Failed to index Kimi artifact for issue #%s: %s", issue_number, exc)
|
||||||
|
return {"success": False, "error": str(exc)}
|
||||||
|
|
||||||
|
|
||||||
|
async def extract_and_create_followups(
|
||||||
|
artifact_content: str,
|
||||||
|
source_issue_number: int,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Extract action items from artifact and create follow-up Gitea issues.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
artifact_content: Text of Kimi's research artifact.
|
||||||
|
source_issue_number: Issue number that produced the artifact (for cross-links).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with `success`, `created` (list of issue numbers), and `error`.
|
||||||
|
"""
|
||||||
|
items = _extract_action_items(artifact_content)
|
||||||
|
if not items:
|
||||||
|
logger.info("No action items found in artifact for issue #%s", source_issue_number)
|
||||||
|
return {"success": True, "created": [], "error": None}
|
||||||
|
|
||||||
|
try:
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from config import settings
|
||||||
|
except ImportError as exc:
|
||||||
|
return {"success": False, "created": [], "error": str(exc)}
|
||||||
|
|
||||||
|
if not settings.gitea_enabled or not settings.gitea_token:
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"created": [],
|
||||||
|
"error": "Gitea not configured.",
|
||||||
|
}
|
||||||
|
|
||||||
|
base_url = f"{settings.gitea_url}/api/v1"
|
||||||
|
repo = settings.gitea_repo
|
||||||
|
headers = {
|
||||||
|
"Authorization": f"token {settings.gitea_token}",
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
}
|
||||||
|
created: list[int] = []
|
||||||
|
|
||||||
|
for item in items:
|
||||||
|
body = (
|
||||||
|
f"Follow-up from Kimi research artifact in #{source_issue_number}.\n\n"
|
||||||
|
f"**Action item:** {item}"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=10) as client:
|
||||||
|
resp = await client.post(
|
||||||
|
f"{base_url}/repos/{repo}/issues",
|
||||||
|
headers=headers,
|
||||||
|
json={"title": item[:120], "body": body},
|
||||||
|
)
|
||||||
|
if resp.status_code in (200, 201):
|
||||||
|
num = resp.json().get("number")
|
||||||
|
if num:
|
||||||
|
created.append(num)
|
||||||
|
logger.info(
|
||||||
|
"Created follow-up issue #%s from kimi artifact #%s",
|
||||||
|
num,
|
||||||
|
source_issue_number,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
"Follow-up issue creation returned %s for item: %s",
|
||||||
|
resp.status_code,
|
||||||
|
item[:60],
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Failed to create follow-up for item '%s': %s", item[:60], exc)
|
||||||
|
|
||||||
|
return {"success": True, "created": created, "error": None}
|
||||||
|
|
||||||
|
|
||||||
|
async def delegate_research_to_kimi(
|
||||||
|
task: str,
|
||||||
|
context: str,
|
||||||
|
question: str,
|
||||||
|
priority: str = "normal",
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Top-level entry point: delegate a heavy research task to Kimi.
|
||||||
|
|
||||||
|
Creates the `kimi-ready` Gitea issue and returns immediately.
|
||||||
|
Monitoring, artifact indexing, and follow-up creation happen
|
||||||
|
separately via `poll_kimi_issue`, `index_kimi_artifact`, and
|
||||||
|
`extract_and_create_followups`.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task: Short title (becomes the issue title).
|
||||||
|
context: Background / project context.
|
||||||
|
question: The specific research question Kimi should answer.
|
||||||
|
priority: "low", "normal", or "high".
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with `success`, `issue_number`, `issue_url`, and `error`.
|
||||||
|
"""
|
||||||
|
if not task.strip() or not question.strip():
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": "Both `task` and `question` are required.",
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("Delegating research to Kimi: %s", task[:80])
|
||||||
|
return await create_kimi_research_issue(task, context, question, priority)
|
||||||
463
tests/unit/test_kimi_delegation.py
Normal file
463
tests/unit/test_kimi_delegation.py
Normal file
@@ -0,0 +1,463 @@
|
|||||||
|
"""Unit tests for timmy.kimi_delegation — Kimi research delegation via Gitea labels."""
|
||||||
|
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from timmy.kimi_delegation import (
|
||||||
|
KIMI_LABEL_COLOR,
|
||||||
|
KIMI_READY_LABEL,
|
||||||
|
_build_research_template,
|
||||||
|
_extract_action_items,
|
||||||
|
_slugify,
|
||||||
|
delegate_research_to_kimi,
|
||||||
|
exceeds_local_capacity,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Constants ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_kimi_ready_label():
|
||||||
|
assert KIMI_READY_LABEL == "kimi-ready"
|
||||||
|
|
||||||
|
|
||||||
|
def test_kimi_label_color_is_hex():
|
||||||
|
assert KIMI_LABEL_COLOR.startswith("#")
|
||||||
|
assert len(KIMI_LABEL_COLOR) == 7
|
||||||
|
|
||||||
|
|
||||||
|
# ── exceeds_local_capacity ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestExceedsLocalCapacity:
|
||||||
|
def test_keyword_comprehensive(self):
|
||||||
|
assert exceeds_local_capacity("Do a comprehensive review of X") is True
|
||||||
|
|
||||||
|
def test_keyword_deep_research(self):
|
||||||
|
assert exceeds_local_capacity("deep research into neural networks") is True
|
||||||
|
|
||||||
|
def test_keyword_benchmark(self):
|
||||||
|
assert exceeds_local_capacity("benchmark these five models") is True
|
||||||
|
|
||||||
|
def test_keyword_exhaustive(self):
|
||||||
|
assert exceeds_local_capacity("exhaustive list of options") is True
|
||||||
|
|
||||||
|
def test_keyword_case_insensitive(self):
|
||||||
|
assert exceeds_local_capacity("COMPREHENSIVE analysis") is True
|
||||||
|
|
||||||
|
def test_keyword_survey(self):
|
||||||
|
assert exceeds_local_capacity("survey all available tools") is True
|
||||||
|
|
||||||
|
def test_keyword_extensive(self):
|
||||||
|
assert exceeds_local_capacity("extensive documentation needed") is True
|
||||||
|
|
||||||
|
def test_short_simple_task(self):
|
||||||
|
assert exceeds_local_capacity("fix the login bug") is False
|
||||||
|
|
||||||
|
def test_long_task_exceeds_word_threshold(self):
|
||||||
|
long_task = " ".join(["word"] * 55)
|
||||||
|
assert exceeds_local_capacity(long_task) is True
|
||||||
|
|
||||||
|
def test_exactly_at_threshold(self):
|
||||||
|
at_threshold = " ".join(["word"] * 50)
|
||||||
|
assert exceeds_local_capacity(at_threshold) is True
|
||||||
|
|
||||||
|
def test_just_below_threshold(self):
|
||||||
|
short = " ".join(["word"] * 49)
|
||||||
|
assert exceeds_local_capacity(short) is False
|
||||||
|
|
||||||
|
def test_empty_string(self):
|
||||||
|
assert exceeds_local_capacity("") is False
|
||||||
|
|
||||||
|
|
||||||
|
# ── _slugify ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestSlugify:
|
||||||
|
def test_simple_text(self):
|
||||||
|
assert _slugify("Hello World") == "hello-world"
|
||||||
|
|
||||||
|
def test_special_characters_removed(self):
|
||||||
|
assert _slugify("Hello, World!") == "hello-world"
|
||||||
|
|
||||||
|
def test_underscores_become_dashes(self):
|
||||||
|
assert _slugify("hello_world") == "hello-world"
|
||||||
|
|
||||||
|
def test_multiple_spaces(self):
|
||||||
|
assert _slugify("hello world") == "hello-world"
|
||||||
|
|
||||||
|
def test_truncates_to_60(self):
|
||||||
|
long = "a" * 80
|
||||||
|
result = _slugify(long)
|
||||||
|
assert len(result) <= 60
|
||||||
|
|
||||||
|
def test_no_leading_trailing_dashes(self):
|
||||||
|
result = _slugify(" hello ")
|
||||||
|
assert not result.startswith("-")
|
||||||
|
assert not result.endswith("-")
|
||||||
|
|
||||||
|
def test_empty_string(self):
|
||||||
|
assert _slugify("") == ""
|
||||||
|
|
||||||
|
|
||||||
|
# ── _build_research_template ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestBuildResearchTemplate:
|
||||||
|
def test_contains_task(self):
|
||||||
|
body = _build_research_template("My Task", "some context", "What is X?")
|
||||||
|
assert "My Task" in body
|
||||||
|
|
||||||
|
def test_contains_question(self):
|
||||||
|
body = _build_research_template("Task", "ctx", "What is the answer?")
|
||||||
|
assert "What is the answer?" in body
|
||||||
|
|
||||||
|
def test_contains_context(self):
|
||||||
|
body = _build_research_template("Task", "project background", "Q?")
|
||||||
|
assert "project background" in body
|
||||||
|
|
||||||
|
def test_contains_kimi_ready_label(self):
|
||||||
|
body = _build_research_template("Task", "ctx", "Q?")
|
||||||
|
assert KIMI_READY_LABEL in body
|
||||||
|
|
||||||
|
def test_default_priority_normal(self):
|
||||||
|
body = _build_research_template("Task", "ctx", "Q?")
|
||||||
|
assert "normal" in body
|
||||||
|
|
||||||
|
def test_custom_priority_high(self):
|
||||||
|
body = _build_research_template("Task", "ctx", "Q?", priority="high")
|
||||||
|
assert "high" in body
|
||||||
|
|
||||||
|
def test_contains_deliverables_section(self):
|
||||||
|
body = _build_research_template("Task", "ctx", "Q?")
|
||||||
|
assert "Deliverables" in body
|
||||||
|
|
||||||
|
def test_slug_in_artifact_path(self):
|
||||||
|
body = _build_research_template("My Research Task", "ctx", "Q?")
|
||||||
|
assert "my-research-task" in body
|
||||||
|
|
||||||
|
def test_contains_research_request_header(self):
|
||||||
|
body = _build_research_template("Task", "ctx", "Q?")
|
||||||
|
assert "## Research Request" in body
|
||||||
|
|
||||||
|
|
||||||
|
# ── _extract_action_items ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestExtractActionItems:
|
||||||
|
def test_checkbox_items(self):
|
||||||
|
text = "- [ ] Do thing A\n- [ ] Do thing B"
|
||||||
|
items = _extract_action_items(text)
|
||||||
|
assert "Do thing A" in items
|
||||||
|
assert "Do thing B" in items
|
||||||
|
|
||||||
|
def test_numbered_list(self):
|
||||||
|
text = "1. First step\n2. Second step\n3. Third step"
|
||||||
|
items = _extract_action_items(text)
|
||||||
|
assert "First step" in items
|
||||||
|
assert "Second step" in items
|
||||||
|
assert "Third step" in items
|
||||||
|
|
||||||
|
def test_action_prefix(self):
|
||||||
|
text = "Action: Implement caching layer"
|
||||||
|
items = _extract_action_items(text)
|
||||||
|
assert "Implement caching layer" in items
|
||||||
|
|
||||||
|
def test_todo_prefix(self):
|
||||||
|
text = "TODO: Write tests"
|
||||||
|
items = _extract_action_items(text)
|
||||||
|
assert "Write tests" in items
|
||||||
|
|
||||||
|
def test_next_step_prefix(self):
|
||||||
|
text = "Next step: Deploy to staging"
|
||||||
|
items = _extract_action_items(text)
|
||||||
|
assert "Deploy to staging" in items
|
||||||
|
|
||||||
|
def test_case_insensitive_prefixes(self):
|
||||||
|
text = "TODO: Upper\ntodo: lower\nTodo: Mixed"
|
||||||
|
items = _extract_action_items(text)
|
||||||
|
assert len(items) == 3
|
||||||
|
|
||||||
|
def test_deduplication(self):
|
||||||
|
text = "1. Do the thing\n2. Do the thing"
|
||||||
|
items = _extract_action_items(text)
|
||||||
|
assert items.count("Do the thing") == 1
|
||||||
|
|
||||||
|
def test_empty_text(self):
|
||||||
|
assert _extract_action_items("") == []
|
||||||
|
|
||||||
|
def test_no_action_items(self):
|
||||||
|
text = "This is just a paragraph with no action items."
|
||||||
|
assert _extract_action_items(text) == []
|
||||||
|
|
||||||
|
def test_returns_list(self):
|
||||||
|
assert isinstance(_extract_action_items("1. Item"), list)
|
||||||
|
|
||||||
|
|
||||||
|
# ── delegate_research_to_kimi ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestDelegateResearchToKimi:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_empty_task_returns_error(self):
|
||||||
|
result = await delegate_research_to_kimi("", "context", "question?")
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "task" in result["error"].lower()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_whitespace_task_returns_error(self):
|
||||||
|
result = await delegate_research_to_kimi(" ", "context", "question?")
|
||||||
|
assert result["success"] is False
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_empty_question_returns_error(self):
|
||||||
|
result = await delegate_research_to_kimi("Task title", "context", "")
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "question" in result["error"].lower()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_whitespace_question_returns_error(self):
|
||||||
|
result = await delegate_research_to_kimi("Task", "ctx", " ")
|
||||||
|
assert result["success"] is False
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_delegates_to_create_issue(self):
|
||||||
|
with patch(
|
||||||
|
"timmy.kimi_delegation.create_kimi_research_issue",
|
||||||
|
new_callable=AsyncMock,
|
||||||
|
return_value={
|
||||||
|
"success": True,
|
||||||
|
"issue_number": 42,
|
||||||
|
"issue_url": "http://x/42",
|
||||||
|
"error": None,
|
||||||
|
},
|
||||||
|
) as mock_create:
|
||||||
|
result = await delegate_research_to_kimi("Task", "ctx", "What is X?", "high")
|
||||||
|
mock_create.assert_awaited_once_with("Task", "ctx", "What is X?", "high")
|
||||||
|
assert result["success"] is True
|
||||||
|
assert result["issue_number"] == 42
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_passes_default_priority(self):
|
||||||
|
with patch(
|
||||||
|
"timmy.kimi_delegation.create_kimi_research_issue",
|
||||||
|
new_callable=AsyncMock,
|
||||||
|
return_value={"success": True, "issue_number": 1, "issue_url": "", "error": None},
|
||||||
|
) as mock_create:
|
||||||
|
await delegate_research_to_kimi("Task", "ctx", "Q?")
|
||||||
|
_, _, _, priority = mock_create.call_args.args
|
||||||
|
assert priority == "normal"
|
||||||
|
|
||||||
|
|
||||||
|
# ── create_kimi_research_issue ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestCreateKimiResearchIssue:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_no_gitea_token_returns_error(self):
|
||||||
|
from timmy.kimi_delegation import create_kimi_research_issue
|
||||||
|
|
||||||
|
mock_settings = MagicMock()
|
||||||
|
mock_settings.gitea_enabled = True
|
||||||
|
mock_settings.gitea_token = ""
|
||||||
|
|
||||||
|
with patch("config.settings", mock_settings):
|
||||||
|
result = await create_kimi_research_issue("Task", "ctx", "Q?")
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "not configured" in result["error"]
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_gitea_disabled_returns_error(self):
|
||||||
|
from timmy.kimi_delegation import create_kimi_research_issue
|
||||||
|
|
||||||
|
mock_settings = MagicMock()
|
||||||
|
mock_settings.gitea_enabled = False
|
||||||
|
mock_settings.gitea_token = "tok"
|
||||||
|
|
||||||
|
with patch("config.settings", mock_settings):
|
||||||
|
result = await create_kimi_research_issue("Task", "ctx", "Q?")
|
||||||
|
assert result["success"] is False
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_successful_issue_creation(self):
|
||||||
|
from timmy.kimi_delegation import create_kimi_research_issue
|
||||||
|
|
||||||
|
mock_settings = MagicMock()
|
||||||
|
mock_settings.gitea_enabled = True
|
||||||
|
mock_settings.gitea_token = "fake-token"
|
||||||
|
mock_settings.gitea_url = "http://gitea.local"
|
||||||
|
mock_settings.gitea_repo = "owner/repo"
|
||||||
|
|
||||||
|
label_resp = MagicMock()
|
||||||
|
label_resp.status_code = 200
|
||||||
|
label_resp.json.return_value = [{"name": "kimi-ready", "id": 7}]
|
||||||
|
|
||||||
|
issue_resp = MagicMock()
|
||||||
|
issue_resp.status_code = 201
|
||||||
|
issue_resp.json.return_value = {
|
||||||
|
"number": 101,
|
||||||
|
"html_url": "http://gitea.local/issues/101",
|
||||||
|
}
|
||||||
|
|
||||||
|
mock_client = AsyncMock()
|
||||||
|
mock_client.get.return_value = label_resp
|
||||||
|
mock_client.post.return_value = issue_resp
|
||||||
|
|
||||||
|
async_ctx = AsyncMock()
|
||||||
|
async_ctx.__aenter__.return_value = mock_client
|
||||||
|
async_ctx.__aexit__.return_value = False
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("config.settings", mock_settings),
|
||||||
|
patch("httpx.AsyncClient", return_value=async_ctx),
|
||||||
|
):
|
||||||
|
result = await create_kimi_research_issue("Task", "ctx", "Q?")
|
||||||
|
|
||||||
|
assert result["success"] is True
|
||||||
|
assert result["issue_number"] == 101
|
||||||
|
assert result["error"] is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_api_error_returns_failure(self):
|
||||||
|
from timmy.kimi_delegation import create_kimi_research_issue
|
||||||
|
|
||||||
|
mock_settings = MagicMock()
|
||||||
|
mock_settings.gitea_enabled = True
|
||||||
|
mock_settings.gitea_token = "tok"
|
||||||
|
mock_settings.gitea_url = "http://gitea.local"
|
||||||
|
mock_settings.gitea_repo = "owner/repo"
|
||||||
|
|
||||||
|
label_resp = MagicMock()
|
||||||
|
label_resp.status_code = 200
|
||||||
|
label_resp.json.return_value = [{"name": "kimi-ready", "id": 7}]
|
||||||
|
|
||||||
|
issue_resp = MagicMock()
|
||||||
|
issue_resp.status_code = 500
|
||||||
|
issue_resp.text = "Internal Server Error"
|
||||||
|
|
||||||
|
mock_client = AsyncMock()
|
||||||
|
mock_client.get.return_value = label_resp
|
||||||
|
mock_client.post.return_value = issue_resp
|
||||||
|
|
||||||
|
async_ctx = AsyncMock()
|
||||||
|
async_ctx.__aenter__.return_value = mock_client
|
||||||
|
async_ctx.__aexit__.return_value = False
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("config.settings", mock_settings),
|
||||||
|
patch("httpx.AsyncClient", return_value=async_ctx),
|
||||||
|
):
|
||||||
|
result = await create_kimi_research_issue("Task", "ctx", "Q?")
|
||||||
|
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "500" in result["error"]
|
||||||
|
|
||||||
|
|
||||||
|
# ── index_kimi_artifact ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestIndexKimiArtifact:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_empty_artifact_returns_error(self):
|
||||||
|
from timmy.kimi_delegation import index_kimi_artifact
|
||||||
|
|
||||||
|
result = await index_kimi_artifact(42, "Title", "")
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "Empty" in result["error"]
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_whitespace_only_artifact_returns_error(self):
|
||||||
|
from timmy.kimi_delegation import index_kimi_artifact
|
||||||
|
|
||||||
|
result = await index_kimi_artifact(42, "Title", " \n ")
|
||||||
|
assert result["success"] is False
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_successful_indexing(self):
|
||||||
|
from timmy.kimi_delegation import index_kimi_artifact
|
||||||
|
|
||||||
|
mock_entry = MagicMock()
|
||||||
|
mock_entry.id = "mem-abc-123"
|
||||||
|
|
||||||
|
with patch("timmy.memory_system.store_memory", return_value=mock_entry) as mock_store:
|
||||||
|
result = await index_kimi_artifact(55, "Research Title", "Artifact content here.")
|
||||||
|
|
||||||
|
assert result["success"] is True
|
||||||
|
assert result["memory_id"] == "mem-abc-123"
|
||||||
|
mock_store.assert_called_once()
|
||||||
|
call_kwargs = mock_store.call_args.kwargs
|
||||||
|
assert call_kwargs["source"] == "kimi"
|
||||||
|
assert call_kwargs["context_type"] == "document"
|
||||||
|
assert call_kwargs["task_id"] == "55"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_store_memory_exception_returns_error(self):
|
||||||
|
from timmy.kimi_delegation import index_kimi_artifact
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"timmy.memory_system.store_memory",
|
||||||
|
side_effect=RuntimeError("DB error"),
|
||||||
|
):
|
||||||
|
result = await index_kimi_artifact(1, "T", "Some content")
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "DB error" in result["error"]
|
||||||
|
|
||||||
|
|
||||||
|
# ── extract_and_create_followups ──────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestExtractAndCreateFollowups:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_no_action_items_returns_empty_list(self):
|
||||||
|
from timmy.kimi_delegation import extract_and_create_followups
|
||||||
|
|
||||||
|
result = await extract_and_create_followups("No action items here.", 10)
|
||||||
|
assert result["success"] is True
|
||||||
|
assert result["created"] == []
|
||||||
|
assert result["error"] is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_gitea_not_configured(self):
|
||||||
|
from timmy.kimi_delegation import extract_and_create_followups
|
||||||
|
|
||||||
|
mock_settings = MagicMock()
|
||||||
|
mock_settings.gitea_enabled = False
|
||||||
|
mock_settings.gitea_token = ""
|
||||||
|
|
||||||
|
with patch("config.settings", mock_settings):
|
||||||
|
result = await extract_and_create_followups("1. Do the thing", 10)
|
||||||
|
assert result["success"] is False
|
||||||
|
assert result["created"] == []
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_creates_followup_issues(self):
|
||||||
|
from timmy.kimi_delegation import extract_and_create_followups
|
||||||
|
|
||||||
|
mock_settings = MagicMock()
|
||||||
|
mock_settings.gitea_enabled = True
|
||||||
|
mock_settings.gitea_token = "tok"
|
||||||
|
mock_settings.gitea_url = "http://gitea.local"
|
||||||
|
mock_settings.gitea_repo = "owner/repo"
|
||||||
|
|
||||||
|
issue_resp = MagicMock()
|
||||||
|
issue_resp.status_code = 201
|
||||||
|
issue_resp.json.return_value = {"number": 200}
|
||||||
|
|
||||||
|
mock_client = AsyncMock()
|
||||||
|
mock_client.post.return_value = issue_resp
|
||||||
|
|
||||||
|
async_ctx = AsyncMock()
|
||||||
|
async_ctx.__aenter__.return_value = mock_client
|
||||||
|
async_ctx.__aexit__.return_value = False
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("config.settings", mock_settings),
|
||||||
|
patch("httpx.AsyncClient", return_value=async_ctx),
|
||||||
|
):
|
||||||
|
result = await extract_and_create_followups(
|
||||||
|
"1. Do the thing\n2. Do another thing", 10
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result["success"] is True
|
||||||
|
assert 200 in result["created"]
|
||||||
Reference in New Issue
Block a user