diff --git a/src/timmy/kimi_delegation.py b/src/timmy/kimi_delegation.py new file mode 100644 index 00000000..7b910302 --- /dev/null +++ b/src/timmy/kimi_delegation.py @@ -0,0 +1,490 @@ +"""Kimi delegation for heavy research via Gitea labels. + +When research exceeds local + Groq capacity, Timmy delegates to Kimi by: +1. Filling a research template with full context +2. Creating a Gitea issue labeled `kimi-ready` +3. Monitoring for Kimi's completion (issue closed + artifact committed) +4. Indexing Kimi's artifact into semantic memory +5. Extracting action items and creating follow-up issues + +Delegation flow: + Timmy detects capacity exceeded + → Fills template with context + → Creates `kimi-ready` Gitea issue + → Kimi picks up, executes, commits artifact, closes issue + → Timmy indexes artifact + creates follow-ups +""" + +import asyncio +import logging +import re +from typing import Any + +logger = logging.getLogger(__name__) + +# Label applied to issues that Kimi should pick up +KIMI_READY_LABEL = "kimi-ready" + +# Label colour for the kimi-ready label (dark teal) +KIMI_LABEL_COLOR = "#006b75" + +# Keywords that suggest a task exceeds local capacity +_HEAVY_RESEARCH_KEYWORDS = frozenset( + { + "comprehensive", + "exhaustive", + "systematic review", + "literature review", + "benchmark", + "comparative analysis", + "large-scale", + "survey", + "meta-analysis", + "deep research", + "extensive", + } +) + +# Minimum word count that hints at a heavy task +_HEAVY_WORD_THRESHOLD = 50 + + +def exceeds_local_capacity(task_description: str) -> bool: + """Heuristic: does this research task exceed local + Groq capacity? + + Returns True when the task description signals heavy or broad research + that benefits from Kimi's 262K context and long-running processing. + + Args: + task_description: Free-text description of the research task. + + Returns: + True if the task should be delegated to Kimi. + """ + lower = task_description.lower() + word_count = len(task_description.split()) + + has_heavy_keyword = any(kw in lower for kw in _HEAVY_RESEARCH_KEYWORDS) + is_long_task = word_count >= _HEAVY_WORD_THRESHOLD + + return has_heavy_keyword or is_long_task + + +def _build_research_template( + task: str, + context: str, + question: str, + priority: str = "normal", +) -> str: + """Fill the standard Kimi research template with task context. + + Args: + task: Short title for the research task. + context: Background information and relevant project context. + question: The specific research question to answer. + priority: Task priority — "low", "normal", or "high". + + Returns: + Markdown-formatted issue body ready for Gitea. + """ + return f"""\ +## Research Request + +**Priority:** {priority} + +### Research Question + +{question} + +### Background / Context + +{context} + +### Scope + +Please produce a thorough, well-structured research report covering: + +- Direct answer to the research question above +- Supporting evidence and sources where applicable +- Trade-offs, limitations, or caveats +- Concrete recommendations or next steps + +### Deliverables + +Commit your findings as a markdown artifact (e.g. `memory/research/{_slugify(task)}.md`) +and close this issue when complete. + +### Task + +{task} + +--- +*Delegated by Timmy via Kimi delegation pipeline. Label: `{KIMI_READY_LABEL}`* +""" + + +def _slugify(text: str) -> str: + """Convert text to a safe filename slug.""" + slug = re.sub(r"[^\w\s-]", "", text.lower()) + slug = re.sub(r"[\s_]+", "-", slug) + return slug[:60].strip("-") + + +async def _get_or_create_label( + client: Any, + base_url: str, + headers: dict[str, str], + repo: str, +) -> int | None: + """Ensure the `kimi-ready` label exists; return its ID or None on error. + + Args: + client: httpx.AsyncClient instance. + base_url: Gitea API base URL. + headers: Auth headers. + repo: owner/repo string. + + Returns: + Label ID, or None if the operation failed. + """ + labels_url = f"{base_url}/repos/{repo}/labels" + + # Check for existing label + try: + resp = await client.get(labels_url, headers=headers) + if resp.status_code == 200: + for label in resp.json(): + if label.get("name") == KIMI_READY_LABEL: + return label["id"] + except Exception as exc: + logger.warning("Failed to list Gitea labels: %s", exc) + return None + + # Create the label + try: + resp = await client.post( + labels_url, + headers=headers, + json={"name": KIMI_READY_LABEL, "color": KIMI_LABEL_COLOR}, + ) + if resp.status_code in (200, 201): + return resp.json().get("id") + logger.warning("Label creation returned %s: %s", resp.status_code, resp.text[:200]) + except Exception as exc: + logger.warning("Failed to create Gitea label: %s", exc) + + return None + + +async def create_kimi_research_issue( + task: str, + context: str, + question: str, + priority: str = "normal", +) -> dict[str, Any]: + """Create a Gitea issue labeled `kimi-ready` for Kimi to pick up. + + Args: + task: Short title for the research task (used as issue title). + context: Background information and project context. + question: The specific research question. + priority: Task priority — "low", "normal", or "high". + + Returns: + Dict with `success`, `issue_number`, `issue_url`, and `error` keys. + """ + try: + import httpx + + from config import settings + except ImportError as exc: + return {"success": False, "error": f"Missing dependency: {exc}"} + + if not settings.gitea_enabled or not settings.gitea_token: + return { + "success": False, + "error": "Gitea integration not configured (no token or disabled).", + } + + base_url = f"{settings.gitea_url}/api/v1" + repo = settings.gitea_repo + headers = { + "Authorization": f"token {settings.gitea_token}", + "Content-Type": "application/json", + } + + try: + async with httpx.AsyncClient(timeout=15) as client: + label_id = await _get_or_create_label(client, base_url, headers, repo) + + body = _build_research_template(task, context, question, priority) + issue_payload: dict[str, Any] = {"title": task, "body": body} + if label_id is not None: + issue_payload["labels"] = [label_id] + + resp = await client.post( + f"{base_url}/repos/{repo}/issues", + headers=headers, + json=issue_payload, + ) + + if resp.status_code in (200, 201): + data = resp.json() + number = data.get("number") + url = data.get("html_url", "") + logger.info("Created kimi-ready issue #%s: %s", number, task[:60]) + return { + "success": True, + "issue_number": number, + "issue_url": url, + "error": None, + } + + logger.warning("Issue creation failed (%s): %s", resp.status_code, resp.text[:200]) + return { + "success": False, + "error": f"Gitea API error {resp.status_code}: {resp.text[:200]}", + } + + except Exception as exc: + logger.warning("create_kimi_research_issue failed: %s", exc) + return {"success": False, "error": str(exc)} + + +async def poll_kimi_issue( + issue_number: int, + poll_interval: int = 60, + max_wait: int = 3600, +) -> dict[str, Any]: + """Poll a Gitea issue until it is closed (Kimi completed) or timeout. + + Args: + issue_number: The Gitea issue number to watch. + poll_interval: Seconds between polls. Default 60. + max_wait: Maximum total seconds to wait. Default 3600 (1 hour). + + Returns: + Dict with `completed` bool, `state`, `body`, and `error` keys. + """ + try: + import httpx + + from config import settings + except ImportError as exc: + return {"completed": False, "error": f"Missing dependency: {exc}"} + + if not settings.gitea_enabled or not settings.gitea_token: + return {"completed": False, "error": "Gitea not configured."} + + base_url = f"{settings.gitea_url}/api/v1" + repo = settings.gitea_repo + headers = {"Authorization": f"token {settings.gitea_token}"} + issue_url = f"{base_url}/repos/{repo}/issues/{issue_number}" + + elapsed = 0 + while elapsed < max_wait: + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get(issue_url, headers=headers) + + if resp.status_code == 200: + data = resp.json() + state = data.get("state", "open") + if state == "closed": + logger.info("Kimi completed issue #%s", issue_number) + return { + "completed": True, + "state": state, + "body": data.get("body", ""), + "error": None, + } + else: + logger.warning( + "Poll issue #%s returned %s", issue_number, resp.status_code + ) + + except Exception as exc: + logger.warning("Poll error for issue #%s: %s", issue_number, exc) + + await asyncio.sleep(poll_interval) + elapsed += poll_interval + + return { + "completed": False, + "state": "timeout", + "body": "", + "error": f"Timed out after {max_wait}s waiting for issue #{issue_number}", + } + + +def _extract_action_items(text: str) -> list[str]: + """Extract action items from markdown text. + + Looks for lines that start with checklist markers, numbered items, + or explicit "Action:" / "TODO:" prefixes. + + Args: + text: Markdown text from Kimi's artifact. + + Returns: + List of action item strings (deduplicated, whitespace-stripped). + """ + items: list[str] = [] + patterns = [ + re.compile(r"^[-*]\s+\[ \]\s+(.+)", re.MULTILINE), # - [ ] checkbox + re.compile(r"^\d+\.\s+(.+)", re.MULTILINE), # 1. numbered list + re.compile(r"^(?:Action|TODO|Next step):\s*(.+)", re.MULTILINE | re.IGNORECASE), + ] + seen: set[str] = set() + for pat in patterns: + for m in pat.finditer(text): + item = m.group(1).strip() + if item and item not in seen: + items.append(item) + seen.add(item) + return items + + +async def index_kimi_artifact( + issue_number: int, + title: str, + artifact_content: str, +) -> dict[str, Any]: + """Index Kimi's research artifact into Timmy's semantic memory. + + Args: + issue_number: Source Gitea issue number (used as task_id). + title: Human-readable title for the memory entry. + artifact_content: The research artifact text to index. + + Returns: + Dict with `success` bool and `memory_id` or `error`. + """ + if not artifact_content.strip(): + return {"success": False, "error": "Empty artifact — nothing to index."} + + try: + import asyncio + + from timmy.memory_system import store_memory + + # store_memory is synchronous — wrap in thread to avoid blocking event loop + entry = await asyncio.to_thread( + store_memory, + content=artifact_content, + source="kimi", + context_type="document", + task_id=str(issue_number), + metadata={"issue_number": issue_number, "title": title}, + ) + logger.info("Indexed Kimi artifact for issue #%s (id=%s)", issue_number, entry.id) + return {"success": True, "memory_id": entry.id} + + except Exception as exc: + logger.warning("Failed to index Kimi artifact for issue #%s: %s", issue_number, exc) + return {"success": False, "error": str(exc)} + + +async def extract_and_create_followups( + artifact_content: str, + source_issue_number: int, +) -> dict[str, Any]: + """Extract action items from artifact and create follow-up Gitea issues. + + Args: + artifact_content: Text of Kimi's research artifact. + source_issue_number: Issue number that produced the artifact (for cross-links). + + Returns: + Dict with `success`, `created` (list of issue numbers), and `error`. + """ + items = _extract_action_items(artifact_content) + if not items: + logger.info("No action items found in artifact for issue #%s", source_issue_number) + return {"success": True, "created": [], "error": None} + + try: + import httpx + + from config import settings + except ImportError as exc: + return {"success": False, "created": [], "error": str(exc)} + + if not settings.gitea_enabled or not settings.gitea_token: + return { + "success": False, + "created": [], + "error": "Gitea not configured.", + } + + base_url = f"{settings.gitea_url}/api/v1" + repo = settings.gitea_repo + headers = { + "Authorization": f"token {settings.gitea_token}", + "Content-Type": "application/json", + } + created: list[int] = [] + + for item in items: + body = ( + f"Follow-up from Kimi research artifact in #{source_issue_number}.\n\n" + f"**Action item:** {item}" + ) + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post( + f"{base_url}/repos/{repo}/issues", + headers=headers, + json={"title": item[:120], "body": body}, + ) + if resp.status_code in (200, 201): + num = resp.json().get("number") + if num: + created.append(num) + logger.info( + "Created follow-up issue #%s from kimi artifact #%s", + num, + source_issue_number, + ) + else: + logger.warning( + "Follow-up issue creation returned %s for item: %s", + resp.status_code, + item[:60], + ) + except Exception as exc: + logger.warning("Failed to create follow-up for item '%s': %s", item[:60], exc) + + return {"success": True, "created": created, "error": None} + + +async def delegate_research_to_kimi( + task: str, + context: str, + question: str, + priority: str = "normal", +) -> dict[str, Any]: + """Top-level entry point: delegate a heavy research task to Kimi. + + Creates the `kimi-ready` Gitea issue and returns immediately. + Monitoring, artifact indexing, and follow-up creation happen + separately via `poll_kimi_issue`, `index_kimi_artifact`, and + `extract_and_create_followups`. + + Args: + task: Short title (becomes the issue title). + context: Background / project context. + question: The specific research question Kimi should answer. + priority: "low", "normal", or "high". + + Returns: + Dict with `success`, `issue_number`, `issue_url`, and `error`. + """ + if not task.strip() or not question.strip(): + return { + "success": False, + "error": "Both `task` and `question` are required.", + } + + logger.info("Delegating research to Kimi: %s", task[:80]) + return await create_kimi_research_issue(task, context, question, priority)