WIP: Salvage before loop restart for #979
This commit is contained in:
490
src/timmy/kimi_delegation.py
Normal file
490
src/timmy/kimi_delegation.py
Normal file
@@ -0,0 +1,490 @@
|
||||
"""Kimi delegation for heavy research via Gitea labels.
|
||||
|
||||
When research exceeds local + Groq capacity, Timmy delegates to Kimi by:
|
||||
1. Filling a research template with full context
|
||||
2. Creating a Gitea issue labeled `kimi-ready`
|
||||
3. Monitoring for Kimi's completion (issue closed + artifact committed)
|
||||
4. Indexing Kimi's artifact into semantic memory
|
||||
5. Extracting action items and creating follow-up issues
|
||||
|
||||
Delegation flow:
|
||||
Timmy detects capacity exceeded
|
||||
→ Fills template with context
|
||||
→ Creates `kimi-ready` Gitea issue
|
||||
→ Kimi picks up, executes, commits artifact, closes issue
|
||||
→ Timmy indexes artifact + creates follow-ups
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Label applied to issues that Kimi should pick up
|
||||
KIMI_READY_LABEL = "kimi-ready"
|
||||
|
||||
# Label colour for the kimi-ready label (dark teal)
|
||||
KIMI_LABEL_COLOR = "#006b75"
|
||||
|
||||
# Keywords that suggest a task exceeds local capacity
|
||||
_HEAVY_RESEARCH_KEYWORDS = frozenset(
|
||||
{
|
||||
"comprehensive",
|
||||
"exhaustive",
|
||||
"systematic review",
|
||||
"literature review",
|
||||
"benchmark",
|
||||
"comparative analysis",
|
||||
"large-scale",
|
||||
"survey",
|
||||
"meta-analysis",
|
||||
"deep research",
|
||||
"extensive",
|
||||
}
|
||||
)
|
||||
|
||||
# Minimum word count that hints at a heavy task
|
||||
_HEAVY_WORD_THRESHOLD = 50
|
||||
|
||||
|
||||
def exceeds_local_capacity(task_description: str) -> bool:
|
||||
"""Heuristic: does this research task exceed local + Groq capacity?
|
||||
|
||||
Returns True when the task description signals heavy or broad research
|
||||
that benefits from Kimi's 262K context and long-running processing.
|
||||
|
||||
Args:
|
||||
task_description: Free-text description of the research task.
|
||||
|
||||
Returns:
|
||||
True if the task should be delegated to Kimi.
|
||||
"""
|
||||
lower = task_description.lower()
|
||||
word_count = len(task_description.split())
|
||||
|
||||
has_heavy_keyword = any(kw in lower for kw in _HEAVY_RESEARCH_KEYWORDS)
|
||||
is_long_task = word_count >= _HEAVY_WORD_THRESHOLD
|
||||
|
||||
return has_heavy_keyword or is_long_task
|
||||
|
||||
|
||||
def _build_research_template(
|
||||
task: str,
|
||||
context: str,
|
||||
question: str,
|
||||
priority: str = "normal",
|
||||
) -> str:
|
||||
"""Fill the standard Kimi research template with task context.
|
||||
|
||||
Args:
|
||||
task: Short title for the research task.
|
||||
context: Background information and relevant project context.
|
||||
question: The specific research question to answer.
|
||||
priority: Task priority — "low", "normal", or "high".
|
||||
|
||||
Returns:
|
||||
Markdown-formatted issue body ready for Gitea.
|
||||
"""
|
||||
return f"""\
|
||||
## Research Request
|
||||
|
||||
**Priority:** {priority}
|
||||
|
||||
### Research Question
|
||||
|
||||
{question}
|
||||
|
||||
### Background / Context
|
||||
|
||||
{context}
|
||||
|
||||
### Scope
|
||||
|
||||
Please produce a thorough, well-structured research report covering:
|
||||
|
||||
- Direct answer to the research question above
|
||||
- Supporting evidence and sources where applicable
|
||||
- Trade-offs, limitations, or caveats
|
||||
- Concrete recommendations or next steps
|
||||
|
||||
### Deliverables
|
||||
|
||||
Commit your findings as a markdown artifact (e.g. `memory/research/{_slugify(task)}.md`)
|
||||
and close this issue when complete.
|
||||
|
||||
### Task
|
||||
|
||||
{task}
|
||||
|
||||
---
|
||||
*Delegated by Timmy via Kimi delegation pipeline. Label: `{KIMI_READY_LABEL}`*
|
||||
"""
|
||||
|
||||
|
||||
def _slugify(text: str) -> str:
|
||||
"""Convert text to a safe filename slug."""
|
||||
slug = re.sub(r"[^\w\s-]", "", text.lower())
|
||||
slug = re.sub(r"[\s_]+", "-", slug)
|
||||
return slug[:60].strip("-")
|
||||
|
||||
|
||||
async def _get_or_create_label(
|
||||
client: Any,
|
||||
base_url: str,
|
||||
headers: dict[str, str],
|
||||
repo: str,
|
||||
) -> int | None:
|
||||
"""Ensure the `kimi-ready` label exists; return its ID or None on error.
|
||||
|
||||
Args:
|
||||
client: httpx.AsyncClient instance.
|
||||
base_url: Gitea API base URL.
|
||||
headers: Auth headers.
|
||||
repo: owner/repo string.
|
||||
|
||||
Returns:
|
||||
Label ID, or None if the operation failed.
|
||||
"""
|
||||
labels_url = f"{base_url}/repos/{repo}/labels"
|
||||
|
||||
# Check for existing label
|
||||
try:
|
||||
resp = await client.get(labels_url, headers=headers)
|
||||
if resp.status_code == 200:
|
||||
for label in resp.json():
|
||||
if label.get("name") == KIMI_READY_LABEL:
|
||||
return label["id"]
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to list Gitea labels: %s", exc)
|
||||
return None
|
||||
|
||||
# Create the label
|
||||
try:
|
||||
resp = await client.post(
|
||||
labels_url,
|
||||
headers=headers,
|
||||
json={"name": KIMI_READY_LABEL, "color": KIMI_LABEL_COLOR},
|
||||
)
|
||||
if resp.status_code in (200, 201):
|
||||
return resp.json().get("id")
|
||||
logger.warning("Label creation returned %s: %s", resp.status_code, resp.text[:200])
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to create Gitea label: %s", exc)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
async def create_kimi_research_issue(
|
||||
task: str,
|
||||
context: str,
|
||||
question: str,
|
||||
priority: str = "normal",
|
||||
) -> dict[str, Any]:
|
||||
"""Create a Gitea issue labeled `kimi-ready` for Kimi to pick up.
|
||||
|
||||
Args:
|
||||
task: Short title for the research task (used as issue title).
|
||||
context: Background information and project context.
|
||||
question: The specific research question.
|
||||
priority: Task priority — "low", "normal", or "high".
|
||||
|
||||
Returns:
|
||||
Dict with `success`, `issue_number`, `issue_url`, and `error` keys.
|
||||
"""
|
||||
try:
|
||||
import httpx
|
||||
|
||||
from config import settings
|
||||
except ImportError as exc:
|
||||
return {"success": False, "error": f"Missing dependency: {exc}"}
|
||||
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Gitea integration not configured (no token or disabled).",
|
||||
}
|
||||
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
repo = settings.gitea_repo
|
||||
headers = {
|
||||
"Authorization": f"token {settings.gitea_token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
label_id = await _get_or_create_label(client, base_url, headers, repo)
|
||||
|
||||
body = _build_research_template(task, context, question, priority)
|
||||
issue_payload: dict[str, Any] = {"title": task, "body": body}
|
||||
if label_id is not None:
|
||||
issue_payload["labels"] = [label_id]
|
||||
|
||||
resp = await client.post(
|
||||
f"{base_url}/repos/{repo}/issues",
|
||||
headers=headers,
|
||||
json=issue_payload,
|
||||
)
|
||||
|
||||
if resp.status_code in (200, 201):
|
||||
data = resp.json()
|
||||
number = data.get("number")
|
||||
url = data.get("html_url", "")
|
||||
logger.info("Created kimi-ready issue #%s: %s", number, task[:60])
|
||||
return {
|
||||
"success": True,
|
||||
"issue_number": number,
|
||||
"issue_url": url,
|
||||
"error": None,
|
||||
}
|
||||
|
||||
logger.warning("Issue creation failed (%s): %s", resp.status_code, resp.text[:200])
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Gitea API error {resp.status_code}: {resp.text[:200]}",
|
||||
}
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning("create_kimi_research_issue failed: %s", exc)
|
||||
return {"success": False, "error": str(exc)}
|
||||
|
||||
|
||||
async def poll_kimi_issue(
|
||||
issue_number: int,
|
||||
poll_interval: int = 60,
|
||||
max_wait: int = 3600,
|
||||
) -> dict[str, Any]:
|
||||
"""Poll a Gitea issue until it is closed (Kimi completed) or timeout.
|
||||
|
||||
Args:
|
||||
issue_number: The Gitea issue number to watch.
|
||||
poll_interval: Seconds between polls. Default 60.
|
||||
max_wait: Maximum total seconds to wait. Default 3600 (1 hour).
|
||||
|
||||
Returns:
|
||||
Dict with `completed` bool, `state`, `body`, and `error` keys.
|
||||
"""
|
||||
try:
|
||||
import httpx
|
||||
|
||||
from config import settings
|
||||
except ImportError as exc:
|
||||
return {"completed": False, "error": f"Missing dependency: {exc}"}
|
||||
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
return {"completed": False, "error": "Gitea not configured."}
|
||||
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
repo = settings.gitea_repo
|
||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
issue_url = f"{base_url}/repos/{repo}/issues/{issue_number}"
|
||||
|
||||
elapsed = 0
|
||||
while elapsed < max_wait:
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
resp = await client.get(issue_url, headers=headers)
|
||||
|
||||
if resp.status_code == 200:
|
||||
data = resp.json()
|
||||
state = data.get("state", "open")
|
||||
if state == "closed":
|
||||
logger.info("Kimi completed issue #%s", issue_number)
|
||||
return {
|
||||
"completed": True,
|
||||
"state": state,
|
||||
"body": data.get("body", ""),
|
||||
"error": None,
|
||||
}
|
||||
else:
|
||||
logger.warning(
|
||||
"Poll issue #%s returned %s", issue_number, resp.status_code
|
||||
)
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning("Poll error for issue #%s: %s", issue_number, exc)
|
||||
|
||||
await asyncio.sleep(poll_interval)
|
||||
elapsed += poll_interval
|
||||
|
||||
return {
|
||||
"completed": False,
|
||||
"state": "timeout",
|
||||
"body": "",
|
||||
"error": f"Timed out after {max_wait}s waiting for issue #{issue_number}",
|
||||
}
|
||||
|
||||
|
||||
def _extract_action_items(text: str) -> list[str]:
|
||||
"""Extract action items from markdown text.
|
||||
|
||||
Looks for lines that start with checklist markers, numbered items,
|
||||
or explicit "Action:" / "TODO:" prefixes.
|
||||
|
||||
Args:
|
||||
text: Markdown text from Kimi's artifact.
|
||||
|
||||
Returns:
|
||||
List of action item strings (deduplicated, whitespace-stripped).
|
||||
"""
|
||||
items: list[str] = []
|
||||
patterns = [
|
||||
re.compile(r"^[-*]\s+\[ \]\s+(.+)", re.MULTILINE), # - [ ] checkbox
|
||||
re.compile(r"^\d+\.\s+(.+)", re.MULTILINE), # 1. numbered list
|
||||
re.compile(r"^(?:Action|TODO|Next step):\s*(.+)", re.MULTILINE | re.IGNORECASE),
|
||||
]
|
||||
seen: set[str] = set()
|
||||
for pat in patterns:
|
||||
for m in pat.finditer(text):
|
||||
item = m.group(1).strip()
|
||||
if item and item not in seen:
|
||||
items.append(item)
|
||||
seen.add(item)
|
||||
return items
|
||||
|
||||
|
||||
async def index_kimi_artifact(
|
||||
issue_number: int,
|
||||
title: str,
|
||||
artifact_content: str,
|
||||
) -> dict[str, Any]:
|
||||
"""Index Kimi's research artifact into Timmy's semantic memory.
|
||||
|
||||
Args:
|
||||
issue_number: Source Gitea issue number (used as task_id).
|
||||
title: Human-readable title for the memory entry.
|
||||
artifact_content: The research artifact text to index.
|
||||
|
||||
Returns:
|
||||
Dict with `success` bool and `memory_id` or `error`.
|
||||
"""
|
||||
if not artifact_content.strip():
|
||||
return {"success": False, "error": "Empty artifact — nothing to index."}
|
||||
|
||||
try:
|
||||
import asyncio
|
||||
|
||||
from timmy.memory_system import store_memory
|
||||
|
||||
# store_memory is synchronous — wrap in thread to avoid blocking event loop
|
||||
entry = await asyncio.to_thread(
|
||||
store_memory,
|
||||
content=artifact_content,
|
||||
source="kimi",
|
||||
context_type="document",
|
||||
task_id=str(issue_number),
|
||||
metadata={"issue_number": issue_number, "title": title},
|
||||
)
|
||||
logger.info("Indexed Kimi artifact for issue #%s (id=%s)", issue_number, entry.id)
|
||||
return {"success": True, "memory_id": entry.id}
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to index Kimi artifact for issue #%s: %s", issue_number, exc)
|
||||
return {"success": False, "error": str(exc)}
|
||||
|
||||
|
||||
async def extract_and_create_followups(
|
||||
artifact_content: str,
|
||||
source_issue_number: int,
|
||||
) -> dict[str, Any]:
|
||||
"""Extract action items from artifact and create follow-up Gitea issues.
|
||||
|
||||
Args:
|
||||
artifact_content: Text of Kimi's research artifact.
|
||||
source_issue_number: Issue number that produced the artifact (for cross-links).
|
||||
|
||||
Returns:
|
||||
Dict with `success`, `created` (list of issue numbers), and `error`.
|
||||
"""
|
||||
items = _extract_action_items(artifact_content)
|
||||
if not items:
|
||||
logger.info("No action items found in artifact for issue #%s", source_issue_number)
|
||||
return {"success": True, "created": [], "error": None}
|
||||
|
||||
try:
|
||||
import httpx
|
||||
|
||||
from config import settings
|
||||
except ImportError as exc:
|
||||
return {"success": False, "created": [], "error": str(exc)}
|
||||
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
return {
|
||||
"success": False,
|
||||
"created": [],
|
||||
"error": "Gitea not configured.",
|
||||
}
|
||||
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
repo = settings.gitea_repo
|
||||
headers = {
|
||||
"Authorization": f"token {settings.gitea_token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
created: list[int] = []
|
||||
|
||||
for item in items:
|
||||
body = (
|
||||
f"Follow-up from Kimi research artifact in #{source_issue_number}.\n\n"
|
||||
f"**Action item:** {item}"
|
||||
)
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
resp = await client.post(
|
||||
f"{base_url}/repos/{repo}/issues",
|
||||
headers=headers,
|
||||
json={"title": item[:120], "body": body},
|
||||
)
|
||||
if resp.status_code in (200, 201):
|
||||
num = resp.json().get("number")
|
||||
if num:
|
||||
created.append(num)
|
||||
logger.info(
|
||||
"Created follow-up issue #%s from kimi artifact #%s",
|
||||
num,
|
||||
source_issue_number,
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"Follow-up issue creation returned %s for item: %s",
|
||||
resp.status_code,
|
||||
item[:60],
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to create follow-up for item '%s': %s", item[:60], exc)
|
||||
|
||||
return {"success": True, "created": created, "error": None}
|
||||
|
||||
|
||||
async def delegate_research_to_kimi(
|
||||
task: str,
|
||||
context: str,
|
||||
question: str,
|
||||
priority: str = "normal",
|
||||
) -> dict[str, Any]:
|
||||
"""Top-level entry point: delegate a heavy research task to Kimi.
|
||||
|
||||
Creates the `kimi-ready` Gitea issue and returns immediately.
|
||||
Monitoring, artifact indexing, and follow-up creation happen
|
||||
separately via `poll_kimi_issue`, `index_kimi_artifact`, and
|
||||
`extract_and_create_followups`.
|
||||
|
||||
Args:
|
||||
task: Short title (becomes the issue title).
|
||||
context: Background / project context.
|
||||
question: The specific research question Kimi should answer.
|
||||
priority: "low", "normal", or "high".
|
||||
|
||||
Returns:
|
||||
Dict with `success`, `issue_number`, `issue_url`, and `error`.
|
||||
"""
|
||||
if not task.strip() or not question.strip():
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Both `task` and `question` are required.",
|
||||
}
|
||||
|
||||
logger.info("Delegating research to Kimi: %s", task[:80])
|
||||
return await create_kimi_research_issue(task, context, question, priority)
|
||||
Reference in New Issue
Block a user