Compare commits

...

4 Commits

Author SHA1 Message Date
Alexander Whitestone
50d7cb5fcc feat: Add Gitea backup script and harden app.ini configuration
Fixes #971

This commit introduces an automated Gitea backup script and applies critical security hardening configurations to the app.ini file, including:
- Disabling user registration.
- Requiring sign-in to view content.
- Setting up server and security parameters for production readiness.

Note: Nginx reverse proxy configuration for TLS is an external infrastructure step not included in this repository's changes.
2026-03-23 11:30:20 -04:00
7fdd532260 [claude] Configure Dolphin 3.0 8B as creative writing fallback (#1068) (#1088)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-23 15:25:06 +00:00
48f667c76b [claude] Integrate Claude Quota Monitor + Metabolic Protocol into cascade router (#1075) (#1086)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-23 15:18:11 +00:00
e482337e50 [claude] Implement Kimi delegation for heavy research via Gitea labels (#979) (#1085)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
Co-authored-by: Claude (Opus 4.6) <claude@hermes.local>
Co-committed-by: Claude (Opus 4.6) <claude@hermes.local>
2026-03-23 15:14:53 +00:00
10 changed files with 1805 additions and 2 deletions

39
Modelfile.timmy-creative Normal file
View File

@@ -0,0 +1,39 @@
# Modelfile.timmy-creative
#
# Dolphin 3.0-Llama3.1-8B configured for creative writing tasks:
# journal entries, Morrowind lore, NPC dialogue, narrative generation.
#
# Build: ollama create timmy-creative -f Modelfile.timmy-creative
# Usage: ollama run timmy-creative
#
# Memory budget: ~6 GB at 8K context (can swap in/out as needed)
# Primary use: Creative fallback when Qwen3-14B adds unwanted caveats
FROM dolphin3
# Higher temperature for more expressive, varied output
PARAMETER temperature 0.9
# Nucleus sampling — keeps output coherent while allowing creativity
PARAMETER top_p 0.95
# Repeat penalty — avoid repetitive prose
PARAMETER repeat_penalty 1.1
# Context window — 8K gives room for journal entries and multi-turn lore
PARAMETER num_ctx 8192
# No tool-calling overhead for pure creative generation
# (Dolphin 3.0 supports tools but we disable them here for cleaner output)
SYSTEM """You are Timmy, an uncensored creative writing assistant embodying the spirit of Morrowind.
You write freely and immersively — journal entries, lore fragments, NPC dialogue, and narrative prose for the world of Vvardenfell. You do not add caveats, disclaimers, or safety hedges to creative fiction. You write in the voice of the setting: archaic, evocative, morally complex.
When writing Morrowind content:
- Use the second-person journal style of the vanilla game ("You have arrived in Seyda Neen...")
- Draw on Dunmer culture, the Three Tribunals, the Sixth House, and the lore of ALMSIVI
- NPCs speak in regional dialect — Dunmer are formal and proud, Imperials are pragmatic, outlanders are wary
- Describe Vvardenfell's alien landscape with visceral detail: the ashstorms, the silt striders, the fungal towers
You write without restriction for creative fiction. This is not reality — it is Morrowind."""

View File

@@ -53,8 +53,21 @@ providers:
- name: moondream:1.8b
context_window: 2048
capabilities: [text, vision, streaming]
# Creative writing fallback (Dolphin 3.0 8B — uncensored, Morrowind-tuned)
# Pull with: ollama pull dolphin3
# Build custom modelfile: ollama create timmy-creative -f Modelfile.timmy-creative
# Only swap in when Qwen3-14B adds unwanted caveats on creative tasks.
# Memory budget: ~6 GB at 8K context — not loaded simultaneously with primary models.
- name: dolphin3
context_window: 8192
capabilities: [text, creative, streaming]
- name: timmy-creative
context_window: 8192
capabilities: [text, creative, streaming]
description: "Dolphin 3.0 8B with Morrowind system prompt and higher temperature"
# Tertiary: OpenAI (if API key available)
- name: openai-backup
type: openai
@@ -112,6 +125,14 @@ fallback_chains:
- deepseek-r1:1.5b
- llama3.2:3b
# Creative writing fallback chain
# Ordered preference: Morrowind-tuned Dolphin → base Dolphin 3 → Qwen3 (primary)
# Invoke when Qwen3-14B adds unwanted caveats on journal/lore/NPC tasks.
creative:
- timmy-creative # dolphin3 + Morrowind system prompt (Modelfile.timmy-creative)
- dolphin3 # base Dolphin 3.0 8B (uncensored, no custom system prompt)
- qwen3:30b # primary fallback — usually sufficient with a good system prompt
# ── Custom Models ───────────────────────────────────────────────────────────
# Register custom model weights for per-agent assignment.
# Supports GGUF (Ollama), safetensors, and HuggingFace checkpoint dirs.

15
custom/conf/app.ini Normal file
View File

@@ -0,0 +1,15 @@
[server]
PROTOCOL = http
DOMAIN = git.yourdomain.com
ROOT_URL = https://git.yourdomain.com/
HTTP_ADDR = 127.0.0.1 # Shield Gitea behind the proxy
[security]
INSTALL_LOCK = true
COOKIE_SECURE = true
SET_COOKIE_HTTP_ONLY = true
REVERSE_PROXY_TRUST_LOCAL = true
[service]
DISABLE_REGISTRATION = true
REQUIRE_SIGNIN_VIEW = true

23
scripts/backup_gitea.sh Normal file
View File

@@ -0,0 +1,23 @@
#!/bin/bash
# Gitea Hardening Prep: Automated Backup Script
# Usage: sudo ./backup_gitea.sh
BACKUP_DIR="/opt/gitea/backups"
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
GITEA_CONF="/etc/gitea/app.ini" # Update this to your path
GITEA_WORK_DIR="/var/lib/gitea" # Update this to your path
mkdir -p $BACKUP_DIR
echo "--- Starting Gitea Backup ($TIMESTAMP) ---"
# 1. Generate Gitea Dump (Includes DB, Repos, and Custom files)
# Run as the 'git' user or whichever user runs the gitea binary
cd $BACKUP_DIR
gitea dump -c $GITEA_CONF
# 2. Secure the backup file
chmod 600 $BACKUP_DIR/*.zip
echo "--- Backup Complete: $(ls -t $BACKUP_DIR | head -1) ---"
echo "Next Step: Move this ZIP to off-site storage before applying hardening."

186
scripts/claude_quota_check.sh Executable file
View File

@@ -0,0 +1,186 @@
#!/bin/bash
# ═══════════════════════════════════════════════════════════════
# claude_quota_check.sh — Check Claude Code / Claude.ai quota
#
# Usage:
# ./claude_quota_check.sh # Human-readable output
# ./claude_quota_check.sh --json # Raw JSON for piping
# ./claude_quota_check.sh --watch # Refresh every 60s
#
# Requires: macOS with Claude Code authenticated, python3
# Token is read from macOS Keychain (same as Claude Code uses)
# ═══════════════════════════════════════════════════════════════
set -euo pipefail
# ── Extract OAuth token from macOS Keychain ──
get_token() {
local creds
creds=$(security find-generic-password -s "Claude Code-credentials" -w 2>/dev/null) || {
echo "ERROR: No Claude Code credentials found in Keychain." >&2
echo "Run 'claude' and authenticate first." >&2
exit 1
}
echo "$creds" | python3 -c "
import sys, json
data = json.load(sys.stdin)
oauth = data.get('claudeAiOauth', data)
print(oauth['accessToken'])
" 2>/dev/null || {
echo "ERROR: Could not parse credentials JSON." >&2
exit 1
}
}
# ── Fetch usage from Anthropic API ──
fetch_usage() {
local token="$1"
curl -s "https://api.anthropic.com/api/oauth/usage" \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
-H "User-Agent: claude-code/2.0.32" \
-H "Authorization: Bearer ${token}" \
-H "anthropic-beta: oauth-2025-04-20"
}
# ── Format time remaining ──
time_remaining() {
local reset_at="$1"
if [ -z "$reset_at" ] || [ "$reset_at" = "null" ]; then
echo "unknown"
return
fi
python3 -c "
from datetime import datetime, timezone
reset = datetime.fromisoformat('${reset_at}'.replace('Z', '+00:00'))
now = datetime.now(timezone.utc)
diff = reset - now
if diff.total_seconds() <= 0:
print('resetting now')
else:
hours = int(diff.total_seconds() // 3600)
mins = int((diff.total_seconds() % 3600) // 60)
if hours > 0:
print(f'{hours}h {mins}m')
else:
print(f'{mins}m')
" 2>/dev/null || echo "unknown"
}
# ── Bar visualization ──
usage_bar() {
local pct=$1
local width=30
local filled
filled=$(python3 -c "print(int(${pct} * ${width}))")
local empty=$((width - filled))
# Color: green < 50%, yellow 50-80%, red > 80%
local color=""
if (( $(echo "$pct < 0.50" | bc -l) )); then
color="\033[32m" # green
elif (( $(echo "$pct < 0.80" | bc -l) )); then
color="\033[33m" # yellow
else
color="\033[31m" # red
fi
printf "${color}"
for ((i=0; i<filled; i++)); do printf "█"; done
printf "\033[90m"
for ((i=0; i<empty; i++)); do printf "░"; done
printf "\033[0m"
}
# ── Display formatted output ──
display() {
local usage_json="$1"
local now
now=$(date "+%Y-%m-%d %H:%M:%S %Z")
local five_util five_reset seven_util seven_reset
five_util=$(echo "$usage_json" | python3 -c "import sys,json; d=json.load(sys.stdin); h=d.get('five_hour') or {}; print(h.get('utilization', 0))" 2>/dev/null || echo "0")
five_reset=$(echo "$usage_json" | python3 -c "import sys,json; d=json.load(sys.stdin); h=d.get('five_hour') or {}; print(h.get('resets_at', 'null'))" 2>/dev/null || echo "null")
seven_util=$(echo "$usage_json" | python3 -c "import sys,json; d=json.load(sys.stdin); h=d.get('seven_day') or {}; print(h.get('utilization', 0))" 2>/dev/null || echo "0")
seven_reset=$(echo "$usage_json" | python3 -c "import sys,json; d=json.load(sys.stdin); h=d.get('seven_day') or {}; print(h.get('resets_at', 'null'))" 2>/dev/null || echo "null")
local five_pct seven_pct
five_pct=$(python3 -c "print(int(float('${five_util}') * 100))")
seven_pct=$(python3 -c "print(int(float('${seven_util}') * 100))")
local five_remaining seven_remaining
five_remaining=$(time_remaining "$five_reset")
seven_remaining=$(time_remaining "$seven_reset")
echo ""
echo " ┌─────────────────────────────────────────────┐"
echo " │ CLAUDE QUOTA STATUS │"
printf " │ %-38s│\n" "$now"
echo " ├─────────────────────────────────────────────┤"
printf " │ 5-hour window: "
usage_bar "$five_util"
printf " %3d%% │\n" "$five_pct"
printf " │ Resets in: %-33s│\n" "$five_remaining"
echo " │ │"
printf " │ 7-day window: "
usage_bar "$seven_util"
printf " %3d%% │\n" "$seven_pct"
printf " │ Resets in: %-33s│\n" "$seven_remaining"
echo " └─────────────────────────────────────────────┘"
echo ""
# Decision guidance for Timmy
if (( five_pct >= 80 )); then
echo " ⚠ 5-hour window critical. Switch to local Qwen3-14B."
echo " Reserve remaining quota for high-value tasks only."
elif (( five_pct >= 50 )); then
echo " ~ 5-hour window half spent. Batch remaining requests."
else
echo " ✓ 5-hour window healthy. Full speed ahead."
fi
if (( seven_pct >= 80 )); then
echo " ⚠ Weekly quota critical! Operate in local-only mode."
elif (( seven_pct >= 60 )); then
echo " ~ Weekly quota past 60%. Plan usage carefully."
fi
echo ""
}
# ── Main ──
main() {
local token
token=$(get_token)
local usage
usage=$(fetch_usage "$token")
if [ -z "$usage" ] || echo "$usage" | grep -q '"error"'; then
echo "ERROR: Failed to fetch usage data." >&2
echo "$usage" >&2
exit 1
fi
case "${1:-}" in
--json)
echo "$usage" | python3 -m json.tool
;;
--watch)
while true; do
clear
usage=$(fetch_usage "$token")
display "$usage"
echo " Refreshing in 60s... (Ctrl+C to stop)"
sleep 60
done
;;
*)
display "$usage"
;;
esac
}
main "$@"

View File

@@ -0,0 +1,260 @@
"""
claude_quota.py — Claude Code / Claude.ai Quota Monitor
Drop into src/infrastructure/ in the Timmy Time Dashboard repo.
Provides real-time quota visibility and metabolic protocol decisions.
Usage:
from infrastructure.claude_quota import QuotaMonitor
monitor = QuotaMonitor()
status = monitor.check()
print(status.five_hour_pct) # 42
print(status.five_hour_resets_in) # "2h 15m"
print(status.seven_day_pct) # 29
print(status.recommended_tier) # MetabolicTier.BURST
# Metabolic protocol: auto-select model based on quota
model = monitor.select_model(task_complexity="high")
# Returns "claude-sonnet-4-6" if quota allows, else "qwen3:14b"
"""
import json
import logging
import subprocess
import urllib.request
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
logger = logging.getLogger(__name__)
class MetabolicTier(str, Enum):
"""The three-tier metabolic protocol from the Timmy Time architecture."""
BURST = "burst" # Cloud API (Claude/Groq) — expensive, best quality
ACTIVE = "active" # Local 14B (Qwen3-14B) — free, good quality
RESTING = "resting" # Local 8B (Qwen3-8B) — free, fast, adequate
@dataclass
class QuotaStatus:
"""Current Claude quota state."""
five_hour_utilization: float # 0.0 to 1.0
five_hour_resets_at: Optional[str]
seven_day_utilization: float # 0.0 to 1.0
seven_day_resets_at: Optional[str]
raw_response: dict
fetched_at: datetime
@property
def five_hour_pct(self) -> int:
return int(self.five_hour_utilization * 100)
@property
def seven_day_pct(self) -> int:
return int(self.seven_day_utilization * 100)
@property
def five_hour_resets_in(self) -> str:
return _time_remaining(self.five_hour_resets_at)
@property
def seven_day_resets_in(self) -> str:
return _time_remaining(self.seven_day_resets_at)
@property
def recommended_tier(self) -> MetabolicTier:
"""Metabolic protocol: determine which inference tier to use."""
# If weekly quota is critical, go full local
if self.seven_day_utilization >= 0.80:
return MetabolicTier.RESTING
# If 5-hour window is critical or past half, use local
if self.five_hour_utilization >= 0.50:
return MetabolicTier.ACTIVE
# Quota healthy — cloud available for high-value tasks
return MetabolicTier.BURST
def summary(self) -> str:
"""Human-readable status string."""
return (
f"5h: {self.five_hour_pct}% (resets {self.five_hour_resets_in}) | "
f"7d: {self.seven_day_pct}% (resets {self.seven_day_resets_in}) | "
f"tier: {self.recommended_tier.value}"
)
class QuotaMonitor:
"""
Monitors Claude Code / Claude.ai quota via the internal OAuth API.
The token is read from macOS Keychain where Claude Code stores it.
Falls back gracefully if credentials aren't available (e.g., on Linux VPS).
"""
API_URL = "https://api.anthropic.com/api/oauth/usage"
KEYCHAIN_SERVICE = "Claude Code-credentials"
USER_AGENT = "claude-code/2.0.32"
def __init__(self) -> None:
self._token: Optional[str] = None
self._last_status: Optional[QuotaStatus] = None
self._cache_seconds = 30 # Don't hammer the API
def _get_token(self) -> Optional[str]:
"""Extract OAuth token from macOS Keychain."""
if self._token:
return self._token
try:
result = subprocess.run(
["security", "find-generic-password", "-s", self.KEYCHAIN_SERVICE, "-w"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode != 0:
logger.warning("Claude Code credentials not found in Keychain")
return None
creds = json.loads(result.stdout.strip())
oauth = creds.get("claudeAiOauth", creds)
self._token = oauth.get("accessToken")
return self._token
except (json.JSONDecodeError, KeyError, FileNotFoundError, subprocess.TimeoutExpired) as exc:
logger.warning("Could not read Claude Code credentials: %s", exc)
return None
def check(self, force: bool = False) -> Optional[QuotaStatus]:
"""
Fetch current quota status.
Returns None if credentials aren't available (graceful degradation).
Caches results for 30 seconds to avoid rate limiting the quota API itself.
"""
# Return cached if fresh
if not force and self._last_status:
age = (datetime.now(timezone.utc) - self._last_status.fetched_at).total_seconds()
if age < self._cache_seconds:
return self._last_status
token = self._get_token()
if not token:
return None
try:
req = urllib.request.Request(
self.API_URL,
headers={
"Accept": "application/json",
"Content-Type": "application/json",
"User-Agent": self.USER_AGENT,
"Authorization": f"Bearer {token}",
"anthropic-beta": "oauth-2025-04-20",
},
)
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode())
five_hour = data.get("five_hour") or {}
seven_day = data.get("seven_day") or {}
self._last_status = QuotaStatus(
five_hour_utilization=float(five_hour.get("utilization", 0.0)),
five_hour_resets_at=five_hour.get("resets_at"),
seven_day_utilization=float(seven_day.get("utilization", 0.0)),
seven_day_resets_at=seven_day.get("resets_at"),
raw_response=data,
fetched_at=datetime.now(timezone.utc),
)
return self._last_status
except Exception as exc:
logger.warning("Failed to fetch quota: %s", exc)
return self._last_status # Return stale data if available
def select_model(self, task_complexity: str = "medium") -> str:
"""
Metabolic protocol: select the right model based on quota + task complexity.
Returns an Ollama model tag or "claude-sonnet-4-6" for cloud.
task_complexity: "low" | "medium" | "high"
"""
status = self.check()
# No quota info available — assume local only (sovereign default)
if status is None:
return "qwen3:14b" if task_complexity == "high" else "qwen3:8b"
tier = status.recommended_tier
if tier == MetabolicTier.BURST and task_complexity == "high":
return "claude-sonnet-4-6" # Cloud — best quality
elif tier == MetabolicTier.BURST and task_complexity == "medium":
return "qwen3:14b" # Save cloud for truly hard tasks
elif tier == MetabolicTier.ACTIVE:
return "qwen3:14b" # Local 14B — good enough
else: # RESTING
return "qwen3:8b" # Local 8B — conserve everything
def should_use_cloud(self, task_value: str = "normal") -> bool:
"""
Simple yes/no: should this task use cloud API?
task_value: "critical" | "high" | "normal" | "routine"
"""
status = self.check()
if status is None:
return False # No credentials = local only
if task_value == "critical":
return status.seven_day_utilization < 0.95 # Almost always yes
elif task_value == "high":
return status.five_hour_utilization < 0.60
elif task_value == "normal":
return status.five_hour_utilization < 0.30
else: # routine
return False # Never waste cloud on routine
def _time_remaining(reset_at: Optional[str]) -> str:
"""Format time until reset as human-readable string."""
if not reset_at or reset_at == "null":
return "unknown"
try:
reset = datetime.fromisoformat(reset_at.replace("Z", "+00:00"))
now = datetime.now(timezone.utc)
diff = reset - now
if diff.total_seconds() <= 0:
return "resetting now"
hours = int(diff.total_seconds() // 3600)
mins = int((diff.total_seconds() % 3600) // 60)
if hours > 0:
return f"{hours}h {mins}m"
return f"{mins}m"
except (ValueError, TypeError):
return "unknown"
# Module-level singleton
_quota_monitor: Optional[QuotaMonitor] = None
def get_quota_monitor() -> QuotaMonitor:
"""Get or create the quota monitor singleton."""
global _quota_monitor
if _quota_monitor is None:
_quota_monitor = QuotaMonitor()
return _quota_monitor

View File

@@ -32,6 +32,15 @@ except ImportError:
logger = logging.getLogger(__name__)
# Quota monitor — optional, degrades gracefully if unavailable
try:
from infrastructure.claude_quota import QuotaMonitor, get_quota_monitor
_quota_monitor: "QuotaMonitor | None" = get_quota_monitor()
except Exception as _exc: # pragma: no cover
logger.debug("Quota monitor not available: %s", _exc)
_quota_monitor = None
class ProviderStatus(Enum):
"""Health status of a provider."""
@@ -457,6 +466,25 @@ class CascadeRouter:
raise RuntimeError("; ".join(errors))
def _quota_allows_cloud(self, provider: Provider) -> bool:
"""Check quota before routing to a cloud provider.
Uses the metabolic protocol: cloud calls are gated by 5-hour quota.
Returns True (allow cloud) if quota monitor is unavailable or returns None.
"""
if _quota_monitor is None:
return True
try:
# Map provider type to task_value heuristic
task_value = "high" # conservative default
status = _quota_monitor.check()
if status is None:
return True # No credentials — caller decides based on config
return _quota_monitor.should_use_cloud(task_value)
except Exception as exc:
logger.warning("Quota check failed, allowing cloud: %s", exc)
return True
def _is_provider_available(self, provider: Provider) -> bool:
"""Check if a provider should be tried (enabled + circuit breaker)."""
if not provider.enabled:
@@ -510,6 +538,15 @@ class CascadeRouter:
if not self._is_provider_available(provider):
continue
# Metabolic protocol: skip cloud providers when quota is low
if provider.type in ("anthropic", "openai", "grok"):
if not self._quota_allows_cloud(provider):
logger.info(
"Metabolic protocol: skipping cloud provider %s (quota too low)",
provider.name,
)
continue
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
try:

View File

@@ -0,0 +1,490 @@
"""Kimi delegation for heavy research via Gitea labels.
When research exceeds local + Groq capacity, Timmy delegates to Kimi by:
1. Filling a research template with full context
2. Creating a Gitea issue labeled `kimi-ready`
3. Monitoring for Kimi's completion (issue closed + artifact committed)
4. Indexing Kimi's artifact into semantic memory
5. Extracting action items and creating follow-up issues
Delegation flow:
Timmy detects capacity exceeded
→ Fills template with context
→ Creates `kimi-ready` Gitea issue
→ Kimi picks up, executes, commits artifact, closes issue
→ Timmy indexes artifact + creates follow-ups
"""
import asyncio
import logging
import re
from typing import Any
logger = logging.getLogger(__name__)
# Label applied to issues that Kimi should pick up
KIMI_READY_LABEL = "kimi-ready"
# Label colour for the kimi-ready label (dark teal)
KIMI_LABEL_COLOR = "#006b75"
# Keywords that suggest a task exceeds local capacity
_HEAVY_RESEARCH_KEYWORDS = frozenset(
{
"comprehensive",
"exhaustive",
"systematic review",
"literature review",
"benchmark",
"comparative analysis",
"large-scale",
"survey",
"meta-analysis",
"deep research",
"extensive",
}
)
# Minimum word count that hints at a heavy task
_HEAVY_WORD_THRESHOLD = 50
def exceeds_local_capacity(task_description: str) -> bool:
"""Heuristic: does this research task exceed local + Groq capacity?
Returns True when the task description signals heavy or broad research
that benefits from Kimi's 262K context and long-running processing.
Args:
task_description: Free-text description of the research task.
Returns:
True if the task should be delegated to Kimi.
"""
lower = task_description.lower()
word_count = len(task_description.split())
has_heavy_keyword = any(kw in lower for kw in _HEAVY_RESEARCH_KEYWORDS)
is_long_task = word_count >= _HEAVY_WORD_THRESHOLD
return has_heavy_keyword or is_long_task
def _build_research_template(
task: str,
context: str,
question: str,
priority: str = "normal",
) -> str:
"""Fill the standard Kimi research template with task context.
Args:
task: Short title for the research task.
context: Background information and relevant project context.
question: The specific research question to answer.
priority: Task priority — "low", "normal", or "high".
Returns:
Markdown-formatted issue body ready for Gitea.
"""
return f"""\
## Research Request
**Priority:** {priority}
### Research Question
{question}
### Background / Context
{context}
### Scope
Please produce a thorough, well-structured research report covering:
- Direct answer to the research question above
- Supporting evidence and sources where applicable
- Trade-offs, limitations, or caveats
- Concrete recommendations or next steps
### Deliverables
Commit your findings as a markdown artifact (e.g. `memory/research/{_slugify(task)}.md`)
and close this issue when complete.
### Task
{task}
---
*Delegated by Timmy via Kimi delegation pipeline. Label: `{KIMI_READY_LABEL}`*
"""
def _slugify(text: str) -> str:
"""Convert text to a safe filename slug."""
slug = re.sub(r"[^\w\s-]", "", text.lower())
slug = re.sub(r"[\s_]+", "-", slug)
return slug[:60].strip("-")
async def _get_or_create_label(
client: Any,
base_url: str,
headers: dict[str, str],
repo: str,
) -> int | None:
"""Ensure the `kimi-ready` label exists; return its ID or None on error.
Args:
client: httpx.AsyncClient instance.
base_url: Gitea API base URL.
headers: Auth headers.
repo: owner/repo string.
Returns:
Label ID, or None if the operation failed.
"""
labels_url = f"{base_url}/repos/{repo}/labels"
# Check for existing label
try:
resp = await client.get(labels_url, headers=headers)
if resp.status_code == 200:
for label in resp.json():
if label.get("name") == KIMI_READY_LABEL:
return label["id"]
except Exception as exc:
logger.warning("Failed to list Gitea labels: %s", exc)
return None
# Create the label
try:
resp = await client.post(
labels_url,
headers=headers,
json={"name": KIMI_READY_LABEL, "color": KIMI_LABEL_COLOR},
)
if resp.status_code in (200, 201):
return resp.json().get("id")
logger.warning("Label creation returned %s: %s", resp.status_code, resp.text[:200])
except Exception as exc:
logger.warning("Failed to create Gitea label: %s", exc)
return None
async def create_kimi_research_issue(
task: str,
context: str,
question: str,
priority: str = "normal",
) -> dict[str, Any]:
"""Create a Gitea issue labeled `kimi-ready` for Kimi to pick up.
Args:
task: Short title for the research task (used as issue title).
context: Background information and project context.
question: The specific research question.
priority: Task priority — "low", "normal", or "high".
Returns:
Dict with `success`, `issue_number`, `issue_url`, and `error` keys.
"""
try:
import httpx
from config import settings
except ImportError as exc:
return {"success": False, "error": f"Missing dependency: {exc}"}
if not settings.gitea_enabled or not settings.gitea_token:
return {
"success": False,
"error": "Gitea integration not configured (no token or disabled).",
}
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
try:
async with httpx.AsyncClient(timeout=15) as client:
label_id = await _get_or_create_label(client, base_url, headers, repo)
body = _build_research_template(task, context, question, priority)
issue_payload: dict[str, Any] = {"title": task, "body": body}
if label_id is not None:
issue_payload["labels"] = [label_id]
resp = await client.post(
f"{base_url}/repos/{repo}/issues",
headers=headers,
json=issue_payload,
)
if resp.status_code in (200, 201):
data = resp.json()
number = data.get("number")
url = data.get("html_url", "")
logger.info("Created kimi-ready issue #%s: %s", number, task[:60])
return {
"success": True,
"issue_number": number,
"issue_url": url,
"error": None,
}
logger.warning("Issue creation failed (%s): %s", resp.status_code, resp.text[:200])
return {
"success": False,
"error": f"Gitea API error {resp.status_code}: {resp.text[:200]}",
}
except Exception as exc:
logger.warning("create_kimi_research_issue failed: %s", exc)
return {"success": False, "error": str(exc)}
async def poll_kimi_issue(
issue_number: int,
poll_interval: int = 60,
max_wait: int = 3600,
) -> dict[str, Any]:
"""Poll a Gitea issue until it is closed (Kimi completed) or timeout.
Args:
issue_number: The Gitea issue number to watch.
poll_interval: Seconds between polls. Default 60.
max_wait: Maximum total seconds to wait. Default 3600 (1 hour).
Returns:
Dict with `completed` bool, `state`, `body`, and `error` keys.
"""
try:
import httpx
from config import settings
except ImportError as exc:
return {"completed": False, "error": f"Missing dependency: {exc}"}
if not settings.gitea_enabled or not settings.gitea_token:
return {"completed": False, "error": "Gitea not configured."}
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {"Authorization": f"token {settings.gitea_token}"}
issue_url = f"{base_url}/repos/{repo}/issues/{issue_number}"
elapsed = 0
while elapsed < max_wait:
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(issue_url, headers=headers)
if resp.status_code == 200:
data = resp.json()
state = data.get("state", "open")
if state == "closed":
logger.info("Kimi completed issue #%s", issue_number)
return {
"completed": True,
"state": state,
"body": data.get("body", ""),
"error": None,
}
else:
logger.warning(
"Poll issue #%s returned %s", issue_number, resp.status_code
)
except Exception as exc:
logger.warning("Poll error for issue #%s: %s", issue_number, exc)
await asyncio.sleep(poll_interval)
elapsed += poll_interval
return {
"completed": False,
"state": "timeout",
"body": "",
"error": f"Timed out after {max_wait}s waiting for issue #{issue_number}",
}
def _extract_action_items(text: str) -> list[str]:
"""Extract action items from markdown text.
Looks for lines that start with checklist markers, numbered items,
or explicit "Action:" / "TODO:" prefixes.
Args:
text: Markdown text from Kimi's artifact.
Returns:
List of action item strings (deduplicated, whitespace-stripped).
"""
items: list[str] = []
patterns = [
re.compile(r"^[-*]\s+\[ \]\s+(.+)", re.MULTILINE), # - [ ] checkbox
re.compile(r"^\d+\.\s+(.+)", re.MULTILINE), # 1. numbered list
re.compile(r"^(?:Action|TODO|Next step):\s*(.+)", re.MULTILINE | re.IGNORECASE),
]
seen: set[str] = set()
for pat in patterns:
for m in pat.finditer(text):
item = m.group(1).strip()
if item and item not in seen:
items.append(item)
seen.add(item)
return items
async def index_kimi_artifact(
issue_number: int,
title: str,
artifact_content: str,
) -> dict[str, Any]:
"""Index Kimi's research artifact into Timmy's semantic memory.
Args:
issue_number: Source Gitea issue number (used as task_id).
title: Human-readable title for the memory entry.
artifact_content: The research artifact text to index.
Returns:
Dict with `success` bool and `memory_id` or `error`.
"""
if not artifact_content.strip():
return {"success": False, "error": "Empty artifact — nothing to index."}
try:
import asyncio
from timmy.memory_system import store_memory
# store_memory is synchronous — wrap in thread to avoid blocking event loop
entry = await asyncio.to_thread(
store_memory,
content=artifact_content,
source="kimi",
context_type="document",
task_id=str(issue_number),
metadata={"issue_number": issue_number, "title": title},
)
logger.info("Indexed Kimi artifact for issue #%s (id=%s)", issue_number, entry.id)
return {"success": True, "memory_id": entry.id}
except Exception as exc:
logger.warning("Failed to index Kimi artifact for issue #%s: %s", issue_number, exc)
return {"success": False, "error": str(exc)}
async def extract_and_create_followups(
artifact_content: str,
source_issue_number: int,
) -> dict[str, Any]:
"""Extract action items from artifact and create follow-up Gitea issues.
Args:
artifact_content: Text of Kimi's research artifact.
source_issue_number: Issue number that produced the artifact (for cross-links).
Returns:
Dict with `success`, `created` (list of issue numbers), and `error`.
"""
items = _extract_action_items(artifact_content)
if not items:
logger.info("No action items found in artifact for issue #%s", source_issue_number)
return {"success": True, "created": [], "error": None}
try:
import httpx
from config import settings
except ImportError as exc:
return {"success": False, "created": [], "error": str(exc)}
if not settings.gitea_enabled or not settings.gitea_token:
return {
"success": False,
"created": [],
"error": "Gitea not configured.",
}
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
created: list[int] = []
for item in items:
body = (
f"Follow-up from Kimi research artifact in #{source_issue_number}.\n\n"
f"**Action item:** {item}"
)
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
f"{base_url}/repos/{repo}/issues",
headers=headers,
json={"title": item[:120], "body": body},
)
if resp.status_code in (200, 201):
num = resp.json().get("number")
if num:
created.append(num)
logger.info(
"Created follow-up issue #%s from kimi artifact #%s",
num,
source_issue_number,
)
else:
logger.warning(
"Follow-up issue creation returned %s for item: %s",
resp.status_code,
item[:60],
)
except Exception as exc:
logger.warning("Failed to create follow-up for item '%s': %s", item[:60], exc)
return {"success": True, "created": created, "error": None}
async def delegate_research_to_kimi(
task: str,
context: str,
question: str,
priority: str = "normal",
) -> dict[str, Any]:
"""Top-level entry point: delegate a heavy research task to Kimi.
Creates the `kimi-ready` Gitea issue and returns immediately.
Monitoring, artifact indexing, and follow-up creation happen
separately via `poll_kimi_issue`, `index_kimi_artifact`, and
`extract_and_create_followups`.
Args:
task: Short title (becomes the issue title).
context: Background / project context.
question: The specific research question Kimi should answer.
priority: "low", "normal", or "high".
Returns:
Dict with `success`, `issue_number`, `issue_url`, and `error`.
"""
if not task.strip() or not question.strip():
return {
"success": False,
"error": "Both `task` and `question` are required.",
}
logger.info("Delegating research to Kimi: %s", task[:80])
return await create_kimi_research_issue(task, context, question, priority)

View File

@@ -0,0 +1,269 @@
"""Tests for Claude Quota Monitor and Metabolic Protocol."""
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
import pytest
from infrastructure.claude_quota import (
MetabolicTier,
QuotaMonitor,
QuotaStatus,
_time_remaining,
get_quota_monitor,
)
def _make_status(five_hour: float = 0.0, seven_day: float = 0.0) -> QuotaStatus:
"""Helper: build a QuotaStatus with given utilization values."""
return QuotaStatus(
five_hour_utilization=five_hour,
five_hour_resets_at=None,
seven_day_utilization=seven_day,
seven_day_resets_at=None,
raw_response={},
fetched_at=datetime.now(timezone.utc),
)
class TestMetabolicTierThresholds:
"""Test the three-tier metabolic protocol thresholds."""
def test_burst_when_five_hour_below_50pct(self):
status = _make_status(five_hour=0.49, seven_day=0.10)
assert status.recommended_tier == MetabolicTier.BURST
def test_burst_at_zero_utilization(self):
status = _make_status(five_hour=0.0, seven_day=0.0)
assert status.recommended_tier == MetabolicTier.BURST
def test_active_when_five_hour_at_50pct(self):
status = _make_status(five_hour=0.50, seven_day=0.10)
assert status.recommended_tier == MetabolicTier.ACTIVE
def test_active_when_five_hour_between_50_and_80pct(self):
status = _make_status(five_hour=0.79, seven_day=0.10)
assert status.recommended_tier == MetabolicTier.ACTIVE
def test_active_when_five_hour_at_80pct(self):
# five_hour >= 0.80 but seven_day < 0.80 → ACTIVE (not RESTING)
status = _make_status(five_hour=0.80, seven_day=0.50)
assert status.recommended_tier == MetabolicTier.ACTIVE
def test_resting_when_seven_day_at_80pct(self):
status = _make_status(five_hour=0.30, seven_day=0.80)
assert status.recommended_tier == MetabolicTier.RESTING
def test_resting_when_seven_day_above_80pct(self):
status = _make_status(five_hour=0.10, seven_day=0.95)
assert status.recommended_tier == MetabolicTier.RESTING
def test_resting_when_both_critical(self):
status = _make_status(five_hour=0.90, seven_day=0.90)
assert status.recommended_tier == MetabolicTier.RESTING
def test_seven_day_takes_precedence_over_five_hour(self):
# Weekly quota critical overrides whatever five-hour says
status = _make_status(five_hour=0.10, seven_day=0.85)
assert status.recommended_tier == MetabolicTier.RESTING
class TestQuotaStatusProperties:
"""Test QuotaStatus computed properties."""
def test_five_hour_pct(self):
status = _make_status(five_hour=0.42)
assert status.five_hour_pct == 42
def test_seven_day_pct(self):
status = _make_status(seven_day=0.75)
assert status.seven_day_pct == 75
def test_summary_contains_tier(self):
status = _make_status(five_hour=0.20, seven_day=0.10)
summary = status.summary()
assert "burst" in summary
assert "20%" in summary
def test_five_hour_resets_in_unknown_when_none(self):
status = _make_status()
assert status.five_hour_resets_in == "unknown"
def test_seven_day_resets_in_unknown_when_none(self):
status = _make_status()
assert status.seven_day_resets_in == "unknown"
class TestTimeRemaining:
"""Test _time_remaining helper."""
def test_none_returns_unknown(self):
assert _time_remaining(None) == "unknown"
def test_empty_string_returns_unknown(self):
assert _time_remaining("") == "unknown"
def test_past_time_returns_resetting_now(self):
past = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat()
assert _time_remaining(past) == "resetting now"
def test_future_time_hours_and_minutes(self):
future = (datetime.now(timezone.utc) + timedelta(hours=2, minutes=15)).isoformat()
result = _time_remaining(future)
assert "2h" in result
# Minutes may vary ±1 due to test execution time
assert "m" in result
def test_future_time_minutes_only(self):
future = (datetime.now(timezone.utc) + timedelta(minutes=45)).isoformat()
result = _time_remaining(future)
assert "h" not in result
# Minutes may vary ±1 due to test execution time
assert "m" in result
def test_z_suffix_handled(self):
future = (datetime.now(timezone.utc) + timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%SZ")
result = _time_remaining(future)
assert result != "unknown"
class TestQuotaMonitorSelectModel:
"""Test select_model metabolic routing."""
def test_no_quota_high_complexity_returns_14b(self):
monitor = QuotaMonitor()
monitor._get_token = lambda: None
assert monitor.select_model("high") == "qwen3:14b"
def test_no_quota_low_complexity_returns_8b(self):
monitor = QuotaMonitor()
monitor._get_token = lambda: None
assert monitor.select_model("low") == "qwen3:8b"
def test_burst_tier_high_complexity_returns_cloud(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.10, seven_day=0.10)
monitor._cache_seconds = 9999
result = monitor.select_model("high")
assert result == "claude-sonnet-4-6"
def test_burst_tier_medium_complexity_returns_14b(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.10, seven_day=0.10)
monitor._cache_seconds = 9999
result = monitor.select_model("medium")
assert result == "qwen3:14b"
def test_active_tier_returns_14b(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.65, seven_day=0.10)
monitor._cache_seconds = 9999
result = monitor.select_model("high")
assert result == "qwen3:14b"
def test_resting_tier_returns_8b(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.10, seven_day=0.85)
monitor._cache_seconds = 9999
result = monitor.select_model("high")
assert result == "qwen3:8b"
class TestQuotaMonitorShouldUseCloud:
"""Test should_use_cloud gate."""
def test_no_credentials_always_false(self):
monitor = QuotaMonitor()
monitor._get_token = lambda: None
assert monitor.should_use_cloud("critical") is False
def test_critical_task_allowed_when_under_95pct(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.10, seven_day=0.94)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("critical") is True
def test_critical_task_blocked_when_over_95pct(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.10, seven_day=0.96)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("critical") is False
def test_high_task_allowed_under_60pct(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.59, seven_day=0.10)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("high") is True
def test_high_task_blocked_at_60pct(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.60, seven_day=0.10)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("high") is False
def test_normal_task_allowed_under_30pct(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.29, seven_day=0.10)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("normal") is True
def test_normal_task_blocked_at_30pct(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.30, seven_day=0.10)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("normal") is False
def test_routine_task_always_false(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.0, seven_day=0.0)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("routine") is False
class TestQuotaMonitorCaching:
"""Test 30-second TTL cache."""
def test_cached_result_returned_within_ttl(self):
monitor = QuotaMonitor()
fresh_status = _make_status(five_hour=0.10)
monitor._last_status = fresh_status
monitor._cache_seconds = 30
# Should NOT re-fetch — returns cached
with patch.object(monitor, "_get_token", return_value="tok") as mock_tok:
result = monitor.check()
mock_tok.assert_not_called()
assert result is fresh_status
def test_stale_cache_triggers_fetch(self):
monitor = QuotaMonitor()
old_time = datetime.now(timezone.utc) - timedelta(seconds=60)
stale_status = QuotaStatus(
five_hour_utilization=0.10,
five_hour_resets_at=None,
seven_day_utilization=0.10,
seven_day_resets_at=None,
raw_response={},
fetched_at=old_time,
)
monitor._last_status = stale_status
# Token unavailable → returns None (triggers re-fetch path)
with patch.object(monitor, "_get_token", return_value=None):
result = monitor.check()
assert result is None # No credentials after cache miss
class TestGetQuotaMonitorSingleton:
"""Test module-level singleton."""
def test_returns_same_instance(self):
m1 = get_quota_monitor()
m2 = get_quota_monitor()
assert m1 is m2
def test_returns_quota_monitor_instance(self):
monitor = get_quota_monitor()
assert isinstance(monitor, QuotaMonitor)

View File

@@ -0,0 +1,463 @@
"""Unit tests for timmy.kimi_delegation — Kimi research delegation via Gitea labels."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.kimi_delegation import (
KIMI_LABEL_COLOR,
KIMI_READY_LABEL,
_build_research_template,
_extract_action_items,
_slugify,
delegate_research_to_kimi,
exceeds_local_capacity,
)
# ── Constants ─────────────────────────────────────────────────────────────────
def test_kimi_ready_label():
assert KIMI_READY_LABEL == "kimi-ready"
def test_kimi_label_color_is_hex():
assert KIMI_LABEL_COLOR.startswith("#")
assert len(KIMI_LABEL_COLOR) == 7
# ── exceeds_local_capacity ────────────────────────────────────────────────────
class TestExceedsLocalCapacity:
def test_keyword_comprehensive(self):
assert exceeds_local_capacity("Do a comprehensive review of X") is True
def test_keyword_deep_research(self):
assert exceeds_local_capacity("deep research into neural networks") is True
def test_keyword_benchmark(self):
assert exceeds_local_capacity("benchmark these five models") is True
def test_keyword_exhaustive(self):
assert exceeds_local_capacity("exhaustive list of options") is True
def test_keyword_case_insensitive(self):
assert exceeds_local_capacity("COMPREHENSIVE analysis") is True
def test_keyword_survey(self):
assert exceeds_local_capacity("survey all available tools") is True
def test_keyword_extensive(self):
assert exceeds_local_capacity("extensive documentation needed") is True
def test_short_simple_task(self):
assert exceeds_local_capacity("fix the login bug") is False
def test_long_task_exceeds_word_threshold(self):
long_task = " ".join(["word"] * 55)
assert exceeds_local_capacity(long_task) is True
def test_exactly_at_threshold(self):
at_threshold = " ".join(["word"] * 50)
assert exceeds_local_capacity(at_threshold) is True
def test_just_below_threshold(self):
short = " ".join(["word"] * 49)
assert exceeds_local_capacity(short) is False
def test_empty_string(self):
assert exceeds_local_capacity("") is False
# ── _slugify ──────────────────────────────────────────────────────────────────
class TestSlugify:
def test_simple_text(self):
assert _slugify("Hello World") == "hello-world"
def test_special_characters_removed(self):
assert _slugify("Hello, World!") == "hello-world"
def test_underscores_become_dashes(self):
assert _slugify("hello_world") == "hello-world"
def test_multiple_spaces(self):
assert _slugify("hello world") == "hello-world"
def test_truncates_to_60(self):
long = "a" * 80
result = _slugify(long)
assert len(result) <= 60
def test_no_leading_trailing_dashes(self):
result = _slugify(" hello ")
assert not result.startswith("-")
assert not result.endswith("-")
def test_empty_string(self):
assert _slugify("") == ""
# ── _build_research_template ──────────────────────────────────────────────────
class TestBuildResearchTemplate:
def test_contains_task(self):
body = _build_research_template("My Task", "some context", "What is X?")
assert "My Task" in body
def test_contains_question(self):
body = _build_research_template("Task", "ctx", "What is the answer?")
assert "What is the answer?" in body
def test_contains_context(self):
body = _build_research_template("Task", "project background", "Q?")
assert "project background" in body
def test_contains_kimi_ready_label(self):
body = _build_research_template("Task", "ctx", "Q?")
assert KIMI_READY_LABEL in body
def test_default_priority_normal(self):
body = _build_research_template("Task", "ctx", "Q?")
assert "normal" in body
def test_custom_priority_high(self):
body = _build_research_template("Task", "ctx", "Q?", priority="high")
assert "high" in body
def test_contains_deliverables_section(self):
body = _build_research_template("Task", "ctx", "Q?")
assert "Deliverables" in body
def test_slug_in_artifact_path(self):
body = _build_research_template("My Research Task", "ctx", "Q?")
assert "my-research-task" in body
def test_contains_research_request_header(self):
body = _build_research_template("Task", "ctx", "Q?")
assert "## Research Request" in body
# ── _extract_action_items ─────────────────────────────────────────────────────
class TestExtractActionItems:
def test_checkbox_items(self):
text = "- [ ] Do thing A\n- [ ] Do thing B"
items = _extract_action_items(text)
assert "Do thing A" in items
assert "Do thing B" in items
def test_numbered_list(self):
text = "1. First step\n2. Second step\n3. Third step"
items = _extract_action_items(text)
assert "First step" in items
assert "Second step" in items
assert "Third step" in items
def test_action_prefix(self):
text = "Action: Implement caching layer"
items = _extract_action_items(text)
assert "Implement caching layer" in items
def test_todo_prefix(self):
text = "TODO: Write tests"
items = _extract_action_items(text)
assert "Write tests" in items
def test_next_step_prefix(self):
text = "Next step: Deploy to staging"
items = _extract_action_items(text)
assert "Deploy to staging" in items
def test_case_insensitive_prefixes(self):
text = "TODO: Upper\ntodo: lower\nTodo: Mixed"
items = _extract_action_items(text)
assert len(items) == 3
def test_deduplication(self):
text = "1. Do the thing\n2. Do the thing"
items = _extract_action_items(text)
assert items.count("Do the thing") == 1
def test_empty_text(self):
assert _extract_action_items("") == []
def test_no_action_items(self):
text = "This is just a paragraph with no action items."
assert _extract_action_items(text) == []
def test_returns_list(self):
assert isinstance(_extract_action_items("1. Item"), list)
# ── delegate_research_to_kimi ─────────────────────────────────────────────────
class TestDelegateResearchToKimi:
@pytest.mark.asyncio
async def test_empty_task_returns_error(self):
result = await delegate_research_to_kimi("", "context", "question?")
assert result["success"] is False
assert "task" in result["error"].lower()
@pytest.mark.asyncio
async def test_whitespace_task_returns_error(self):
result = await delegate_research_to_kimi(" ", "context", "question?")
assert result["success"] is False
@pytest.mark.asyncio
async def test_empty_question_returns_error(self):
result = await delegate_research_to_kimi("Task title", "context", "")
assert result["success"] is False
assert "question" in result["error"].lower()
@pytest.mark.asyncio
async def test_whitespace_question_returns_error(self):
result = await delegate_research_to_kimi("Task", "ctx", " ")
assert result["success"] is False
@pytest.mark.asyncio
async def test_delegates_to_create_issue(self):
with patch(
"timmy.kimi_delegation.create_kimi_research_issue",
new_callable=AsyncMock,
return_value={
"success": True,
"issue_number": 42,
"issue_url": "http://x/42",
"error": None,
},
) as mock_create:
result = await delegate_research_to_kimi("Task", "ctx", "What is X?", "high")
mock_create.assert_awaited_once_with("Task", "ctx", "What is X?", "high")
assert result["success"] is True
assert result["issue_number"] == 42
@pytest.mark.asyncio
async def test_passes_default_priority(self):
with patch(
"timmy.kimi_delegation.create_kimi_research_issue",
new_callable=AsyncMock,
return_value={"success": True, "issue_number": 1, "issue_url": "", "error": None},
) as mock_create:
await delegate_research_to_kimi("Task", "ctx", "Q?")
_, _, _, priority = mock_create.call_args.args
assert priority == "normal"
# ── create_kimi_research_issue ────────────────────────────────────────────────
class TestCreateKimiResearchIssue:
@pytest.mark.asyncio
async def test_no_gitea_token_returns_error(self):
from timmy.kimi_delegation import create_kimi_research_issue
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = ""
with patch("config.settings", mock_settings):
result = await create_kimi_research_issue("Task", "ctx", "Q?")
assert result["success"] is False
assert "not configured" in result["error"]
@pytest.mark.asyncio
async def test_gitea_disabled_returns_error(self):
from timmy.kimi_delegation import create_kimi_research_issue
mock_settings = MagicMock()
mock_settings.gitea_enabled = False
mock_settings.gitea_token = "tok"
with patch("config.settings", mock_settings):
result = await create_kimi_research_issue("Task", "ctx", "Q?")
assert result["success"] is False
@pytest.mark.asyncio
async def test_successful_issue_creation(self):
from timmy.kimi_delegation import create_kimi_research_issue
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://gitea.local"
mock_settings.gitea_repo = "owner/repo"
label_resp = MagicMock()
label_resp.status_code = 200
label_resp.json.return_value = [{"name": "kimi-ready", "id": 7}]
issue_resp = MagicMock()
issue_resp.status_code = 201
issue_resp.json.return_value = {
"number": 101,
"html_url": "http://gitea.local/issues/101",
}
mock_client = AsyncMock()
mock_client.get.return_value = label_resp
mock_client.post.return_value = issue_resp
async_ctx = AsyncMock()
async_ctx.__aenter__.return_value = mock_client
async_ctx.__aexit__.return_value = False
with (
patch("config.settings", mock_settings),
patch("httpx.AsyncClient", return_value=async_ctx),
):
result = await create_kimi_research_issue("Task", "ctx", "Q?")
assert result["success"] is True
assert result["issue_number"] == 101
assert result["error"] is None
@pytest.mark.asyncio
async def test_api_error_returns_failure(self):
from timmy.kimi_delegation import create_kimi_research_issue
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok"
mock_settings.gitea_url = "http://gitea.local"
mock_settings.gitea_repo = "owner/repo"
label_resp = MagicMock()
label_resp.status_code = 200
label_resp.json.return_value = [{"name": "kimi-ready", "id": 7}]
issue_resp = MagicMock()
issue_resp.status_code = 500
issue_resp.text = "Internal Server Error"
mock_client = AsyncMock()
mock_client.get.return_value = label_resp
mock_client.post.return_value = issue_resp
async_ctx = AsyncMock()
async_ctx.__aenter__.return_value = mock_client
async_ctx.__aexit__.return_value = False
with (
patch("config.settings", mock_settings),
patch("httpx.AsyncClient", return_value=async_ctx),
):
result = await create_kimi_research_issue("Task", "ctx", "Q?")
assert result["success"] is False
assert "500" in result["error"]
# ── index_kimi_artifact ───────────────────────────────────────────────────────
class TestIndexKimiArtifact:
@pytest.mark.asyncio
async def test_empty_artifact_returns_error(self):
from timmy.kimi_delegation import index_kimi_artifact
result = await index_kimi_artifact(42, "Title", "")
assert result["success"] is False
assert "Empty" in result["error"]
@pytest.mark.asyncio
async def test_whitespace_only_artifact_returns_error(self):
from timmy.kimi_delegation import index_kimi_artifact
result = await index_kimi_artifact(42, "Title", " \n ")
assert result["success"] is False
@pytest.mark.asyncio
async def test_successful_indexing(self):
from timmy.kimi_delegation import index_kimi_artifact
mock_entry = MagicMock()
mock_entry.id = "mem-abc-123"
with patch("timmy.memory_system.store_memory", return_value=mock_entry) as mock_store:
result = await index_kimi_artifact(55, "Research Title", "Artifact content here.")
assert result["success"] is True
assert result["memory_id"] == "mem-abc-123"
mock_store.assert_called_once()
call_kwargs = mock_store.call_args.kwargs
assert call_kwargs["source"] == "kimi"
assert call_kwargs["context_type"] == "document"
assert call_kwargs["task_id"] == "55"
@pytest.mark.asyncio
async def test_store_memory_exception_returns_error(self):
from timmy.kimi_delegation import index_kimi_artifact
with patch(
"timmy.memory_system.store_memory",
side_effect=RuntimeError("DB error"),
):
result = await index_kimi_artifact(1, "T", "Some content")
assert result["success"] is False
assert "DB error" in result["error"]
# ── extract_and_create_followups ──────────────────────────────────────────────
class TestExtractAndCreateFollowups:
@pytest.mark.asyncio
async def test_no_action_items_returns_empty_list(self):
from timmy.kimi_delegation import extract_and_create_followups
result = await extract_and_create_followups("No action items here.", 10)
assert result["success"] is True
assert result["created"] == []
assert result["error"] is None
@pytest.mark.asyncio
async def test_gitea_not_configured(self):
from timmy.kimi_delegation import extract_and_create_followups
mock_settings = MagicMock()
mock_settings.gitea_enabled = False
mock_settings.gitea_token = ""
with patch("config.settings", mock_settings):
result = await extract_and_create_followups("1. Do the thing", 10)
assert result["success"] is False
assert result["created"] == []
@pytest.mark.asyncio
async def test_creates_followup_issues(self):
from timmy.kimi_delegation import extract_and_create_followups
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok"
mock_settings.gitea_url = "http://gitea.local"
mock_settings.gitea_repo = "owner/repo"
issue_resp = MagicMock()
issue_resp.status_code = 201
issue_resp.json.return_value = {"number": 200}
mock_client = AsyncMock()
mock_client.post.return_value = issue_resp
async_ctx = AsyncMock()
async_ctx.__aenter__.return_value = mock_client
async_ctx.__aexit__.return_value = False
with (
patch("config.settings", mock_settings),
patch("httpx.AsyncClient", return_value=async_ctx),
):
result = await extract_and_create_followups(
"1. Do the thing\n2. Do another thing", 10
)
assert result["success"] is True
assert 200 in result["created"]