forked from Rockachopa/Timmy-time-dashboard
Compare commits
15 Commits
claude/iss
...
kimi/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51b1338453 | ||
| bde7232ece | |||
| fc4426954e | |||
| 5be4ecb9ef | |||
| 4f80cfcd58 | |||
| a7ccfbddc9 | |||
| f1f67e62a7 | |||
| 00ef4fbd22 | |||
| fc0a94202f | |||
| bd3e207c0d | |||
| cc8ed5b57d | |||
| 823216db60 | |||
| 75ecfaba64 | |||
| 55beaf241f | |||
| 69498c9add |
@@ -27,8 +27,12 @@
|
||||
|
||||
# ── AirLLM / big-brain backend ───────────────────────────────────────────────
|
||||
# Inference backend: "ollama" (default) | "airllm" | "auto"
|
||||
# "auto" → uses AirLLM on Apple Silicon if installed, otherwise Ollama.
|
||||
# Requires: pip install ".[bigbrain]"
|
||||
# "ollama" → always use Ollama (safe everywhere, any OS)
|
||||
# "airllm" → AirLLM layer-by-layer loading (Apple Silicon M1/M2/M3/M4 only)
|
||||
# Requires 16 GB RAM minimum (32 GB recommended).
|
||||
# Automatically falls back to Ollama on Intel Mac or Linux.
|
||||
# Install extra: pip install "airllm[mlx]"
|
||||
# "auto" → use AirLLM on Apple Silicon if installed, otherwise Ollama
|
||||
# TIMMY_MODEL_BACKEND=ollama
|
||||
|
||||
# AirLLM model size (default: 70b).
|
||||
|
||||
@@ -62,6 +62,9 @@ Per AGENTS.md roster:
|
||||
- Run `tox -e pre-push` (lint + full CI suite)
|
||||
- Ensure tests stay green
|
||||
- Update TODO.md
|
||||
- **CRITICAL: Stage files before committing** — always run `git add .` or `git add <files>` first
|
||||
- Verify staged changes are non-empty: `git diff --cached --stat` must show files
|
||||
- **NEVER run `git commit` without staging files first** — empty commits waste review cycles
|
||||
|
||||
---
|
||||
|
||||
|
||||
42
AGENTS.md
42
AGENTS.md
@@ -247,6 +247,48 @@ make docker-agent # add a worker
|
||||
|
||||
---
|
||||
|
||||
## Search Capability (SearXNG + Crawl4AI)
|
||||
|
||||
Timmy has a self-hosted search backend requiring **no paid API key**.
|
||||
|
||||
### Tools
|
||||
|
||||
| Tool | Module | Description |
|
||||
|------|--------|-------------|
|
||||
| `web_search(query)` | `timmy/tools/search.py` | Meta-search via SearXNG — returns ranked results |
|
||||
| `scrape_url(url)` | `timmy/tools/search.py` | Full-page scrape via Crawl4AI → clean markdown |
|
||||
|
||||
Both tools are registered in the **orchestrator** (full) and **echo** (research) toolkits.
|
||||
|
||||
### Configuration
|
||||
|
||||
| Env Var | Default | Description |
|
||||
|---------|---------|-------------|
|
||||
| `TIMMY_SEARCH_BACKEND` | `searxng` | `searxng` or `none` (disable) |
|
||||
| `TIMMY_SEARCH_URL` | `http://localhost:8888` | SearXNG base URL |
|
||||
| `TIMMY_CRAWL_URL` | `http://localhost:11235` | Crawl4AI base URL |
|
||||
|
||||
Inside Docker Compose (when `--profile search` is active), the dashboard
|
||||
uses `http://searxng:8080` and `http://crawl4ai:11235` by default.
|
||||
|
||||
### Starting the services
|
||||
|
||||
```bash
|
||||
# Start SearXNG + Crawl4AI alongside the dashboard:
|
||||
docker compose --profile search up
|
||||
|
||||
# Or start only the search services:
|
||||
docker compose --profile search up searxng crawl4ai
|
||||
```
|
||||
|
||||
### Graceful degradation
|
||||
|
||||
- If `TIMMY_SEARCH_BACKEND=none`: tools return a "disabled" message.
|
||||
- If SearXNG or Crawl4AI is unreachable: tools log a WARNING and return an
|
||||
error string — the app never crashes.
|
||||
|
||||
---
|
||||
|
||||
## Roadmap
|
||||
|
||||
**v2.0 Exodus (in progress):** Voice + Marketplace + Integrations
|
||||
|
||||
@@ -150,7 +150,6 @@ async def transcribe_audio(audio: bytes) -> str:
|
||||
| Service | When Unavailable | Fallback Behavior |
|
||||
|---------|------------------|-------------------|
|
||||
| Ollama | No local LLM | Claude backend (if ANTHROPIC_API_KEY set) |
|
||||
| vLLM | Server not running | Ollama backend (cascade router fallback) |
|
||||
| Redis | Cache/storage down | In-memory dict (ephemeral) |
|
||||
| AirLLM | Import error or no Apple Silicon | Ollama backend |
|
||||
| Voice (Piper) | Service down | Browser Web Speech API |
|
||||
|
||||
15
README.md
15
README.md
@@ -9,6 +9,21 @@ API access with Bitcoin Lightning — all from a browser, no cloud AI required.
|
||||
|
||||
---
|
||||
|
||||
## System Requirements
|
||||
|
||||
| Path | Hardware | RAM | Disk |
|
||||
|------|----------|-----|------|
|
||||
| **Ollama** (default) | Any OS — x86-64 or ARM | 8 GB min | 5–10 GB (model files) |
|
||||
| **AirLLM** (Apple Silicon) | M1, M2, M3, or M4 Mac | 16 GB min (32 GB recommended) | ~15 GB free |
|
||||
|
||||
**Ollama path** runs on any modern machine — macOS, Linux, or Windows. No GPU required.
|
||||
|
||||
**AirLLM path** uses layer-by-layer loading for 70B+ models without a GPU. Requires Apple
|
||||
Silicon and the `bigbrain` extras (`pip install ".[bigbrain]"`). On Intel Mac or Linux the
|
||||
app automatically falls back to Ollama — no crash, no config change needed.
|
||||
|
||||
---
|
||||
|
||||
## Quick Start
|
||||
|
||||
```bash
|
||||
|
||||
@@ -131,34 +131,11 @@ providers:
|
||||
context_window: 32000
|
||||
capabilities: [text, tools, json, streaming]
|
||||
|
||||
# Tertiary: vLLM (OpenAI-compatible, continuous batching, 3-10x agentic throughput)
|
||||
# Runs on CUDA GPU or CPU. On Apple Silicon, prefer vllm-mlx-local (above).
|
||||
# To enable: start vLLM server:
|
||||
# python -m vllm.entrypoints.openai.api_server \
|
||||
# --model Qwen/Qwen2.5-14B-Instruct --port 8001
|
||||
# Then set enabled: true (or TIMMY_LLM_BACKEND=vllm + VLLM_URL=http://localhost:8001)
|
||||
- name: vllm-local
|
||||
type: vllm
|
||||
enabled: false # Enable when vLLM server is running
|
||||
priority: 3
|
||||
tier: local
|
||||
base_url: "http://localhost:8001/v1"
|
||||
models:
|
||||
- name: Qwen/Qwen2.5-14B-Instruct
|
||||
default: true
|
||||
context_window: 32000
|
||||
capabilities: [text, tools, json, streaming, complex]
|
||||
description: "Qwen2.5-14B on vLLM — continuous batching for agentic workloads"
|
||||
- name: Qwen/Qwen2.5-7B-Instruct
|
||||
context_window: 32000
|
||||
capabilities: [text, tools, json, streaming, routine]
|
||||
description: "Qwen2.5-7B on vLLM — fast model for routine tasks"
|
||||
|
||||
# Quinary: OpenAI (if API key available)
|
||||
# Tertiary: OpenAI (if API key available)
|
||||
- name: openai-backup
|
||||
type: openai
|
||||
enabled: false # Enable by setting OPENAI_API_KEY
|
||||
priority: 4
|
||||
priority: 3
|
||||
tier: standard_cloud
|
||||
api_key: "${OPENAI_API_KEY}" # Loaded from environment
|
||||
base_url: null # Use default OpenAI endpoint
|
||||
@@ -170,12 +147,12 @@ providers:
|
||||
- name: gpt-4o
|
||||
context_window: 128000
|
||||
capabilities: [text, vision, tools, json, streaming]
|
||||
|
||||
# Senary: Anthropic (if API key available)
|
||||
|
||||
# Quaternary: Anthropic (if API key available)
|
||||
- name: anthropic-backup
|
||||
type: anthropic
|
||||
enabled: false # Enable by setting ANTHROPIC_API_KEY
|
||||
priority: 5
|
||||
priority: 4
|
||||
tier: frontier
|
||||
api_key: "${ANTHROPIC_API_KEY}"
|
||||
models:
|
||||
|
||||
@@ -42,10 +42,10 @@ services:
|
||||
GROK_ENABLED: "${GROK_ENABLED:-false}"
|
||||
XAI_API_KEY: "${XAI_API_KEY:-}"
|
||||
GROK_DEFAULT_MODEL: "${GROK_DEFAULT_MODEL:-grok-3-fast}"
|
||||
# vLLM backend — set TIMMY_LLM_BACKEND=vllm to activate
|
||||
TIMMY_LLM_BACKEND: "${TIMMY_LLM_BACKEND:-ollama}"
|
||||
VLLM_URL: "${VLLM_URL:-http://localhost:8001}"
|
||||
VLLM_MODEL: "${VLLM_MODEL:-Qwen/Qwen2.5-14B-Instruct}"
|
||||
# Search backend (SearXNG + Crawl4AI) — set TIMMY_SEARCH_BACKEND=none to disable
|
||||
TIMMY_SEARCH_BACKEND: "${TIMMY_SEARCH_BACKEND:-searxng}"
|
||||
TIMMY_SEARCH_URL: "${TIMMY_SEARCH_URL:-http://searxng:8080}"
|
||||
TIMMY_CRAWL_URL: "${TIMMY_CRAWL_URL:-http://crawl4ai:11235}"
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway" # Linux: maps to host IP
|
||||
networks:
|
||||
@@ -78,48 +78,49 @@ services:
|
||||
profiles:
|
||||
- celery
|
||||
|
||||
# ── vLLM — high-throughput inference server (GPU optional) ──────────────
|
||||
# Requires the 'vllm' profile: docker compose --profile vllm up
|
||||
#
|
||||
# GPU (NVIDIA): set VLLM_MODEL and ensure nvidia-container-toolkit is installed.
|
||||
# CPU-only: add --device cpu to VLLM_EXTRA_ARGS (slower, but works anywhere).
|
||||
#
|
||||
# The dashboard reaches vLLM at http://vllm:8001 (inside timmy-net).
|
||||
# Set VLLM_URL=http://vllm:8001 in the dashboard environment when using this service.
|
||||
vllm:
|
||||
image: vllm/vllm-openai:latest
|
||||
container_name: timmy-vllm
|
||||
# ── SearXNG — self-hosted meta-search engine ─────────────────────────
|
||||
searxng:
|
||||
image: searxng/searxng:latest
|
||||
container_name: timmy-searxng
|
||||
profiles:
|
||||
- vllm
|
||||
- search
|
||||
ports:
|
||||
- "8001:8001"
|
||||
- "${SEARXNG_PORT:-8888}:8080"
|
||||
environment:
|
||||
# Model to load — override with VLLM_MODEL env var
|
||||
VLLM_MODEL: "${VLLM_MODEL:-Qwen/Qwen2.5-7B-Instruct}"
|
||||
command: >
|
||||
--model ${VLLM_MODEL:-Qwen/Qwen2.5-7B-Instruct}
|
||||
--port 8001
|
||||
--host 0.0.0.0
|
||||
${VLLM_EXTRA_ARGS:-}
|
||||
SEARXNG_BASE_URL: "${SEARXNG_BASE_URL:-http://localhost:8888}"
|
||||
volumes:
|
||||
- vllm-cache:/root/.cache/huggingface
|
||||
- ./docker/searxng:/etc/searxng:rw
|
||||
networks:
|
||||
- timmy-net
|
||||
restart: unless-stopped
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:8001/health"]
|
||||
test: ["CMD", "wget", "-qO-", "http://localhost:8080/healthz"]
|
||||
interval: 30s
|
||||
timeout: 5s
|
||||
retries: 3
|
||||
start_period: 20s
|
||||
|
||||
# ── Crawl4AI — self-hosted web scraper ────────────────────────────────
|
||||
crawl4ai:
|
||||
image: unclecode/crawl4ai:latest
|
||||
container_name: timmy-crawl4ai
|
||||
profiles:
|
||||
- search
|
||||
ports:
|
||||
- "${CRAWL4AI_PORT:-11235}:11235"
|
||||
environment:
|
||||
CRAWL4AI_API_TOKEN: "${CRAWL4AI_API_TOKEN:-}"
|
||||
volumes:
|
||||
- timmy-data:/app/data
|
||||
networks:
|
||||
- timmy-net
|
||||
restart: unless-stopped
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:11235/health"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 5
|
||||
start_period: 120s
|
||||
# GPU support — uncomment to enable NVIDIA GPU passthrough
|
||||
# deploy:
|
||||
# resources:
|
||||
# reservations:
|
||||
# devices:
|
||||
# - driver: nvidia
|
||||
# count: all
|
||||
# capabilities: [gpu]
|
||||
retries: 3
|
||||
start_period: 30s
|
||||
|
||||
# ── OpenFang — vendored agent runtime sidecar ────────────────────────────
|
||||
openfang:
|
||||
@@ -157,8 +158,6 @@ volumes:
|
||||
device: "${PWD}/data"
|
||||
openfang-data:
|
||||
driver: local
|
||||
vllm-cache:
|
||||
driver: local
|
||||
|
||||
# ── Internal network ────────────────────────────────────────────────────────
|
||||
networks:
|
||||
|
||||
67
docker/searxng/settings.yml
Normal file
67
docker/searxng/settings.yml
Normal file
@@ -0,0 +1,67 @@
|
||||
# SearXNG configuration for Timmy Time self-hosted search
|
||||
# https://docs.searxng.org/admin/settings/settings.html
|
||||
|
||||
general:
|
||||
debug: false
|
||||
instance_name: "Timmy Search"
|
||||
privacypolicy_url: false
|
||||
donation_url: false
|
||||
contact_url: false
|
||||
enable_metrics: false
|
||||
|
||||
server:
|
||||
port: 8080
|
||||
bind_address: "0.0.0.0"
|
||||
secret_key: "timmy-searxng-key-change-in-production"
|
||||
base_url: false
|
||||
image_proxy: false
|
||||
|
||||
ui:
|
||||
static_use_hash: false
|
||||
default_locale: ""
|
||||
query_in_title: false
|
||||
infinite_scroll: false
|
||||
default_theme: simple
|
||||
center_alignment: false
|
||||
|
||||
search:
|
||||
safe_search: 0
|
||||
autocomplete: ""
|
||||
default_lang: "en"
|
||||
formats:
|
||||
- html
|
||||
- json
|
||||
|
||||
outgoing:
|
||||
request_timeout: 6.0
|
||||
max_request_timeout: 10.0
|
||||
useragent_suffix: "TimmyResearchBot"
|
||||
pool_connections: 100
|
||||
pool_maxsize: 20
|
||||
|
||||
enabled_plugins:
|
||||
- Hash_plugin
|
||||
- Search_on_category_select
|
||||
- Tracker_url_remover
|
||||
|
||||
engines:
|
||||
- name: google
|
||||
engine: google
|
||||
shortcut: g
|
||||
categories: general
|
||||
|
||||
- name: bing
|
||||
engine: bing
|
||||
shortcut: b
|
||||
categories: general
|
||||
|
||||
- name: duckduckgo
|
||||
engine: duckduckgo
|
||||
shortcut: d
|
||||
categories: general
|
||||
|
||||
- name: wikipedia
|
||||
engine: wikipedia
|
||||
shortcut: wp
|
||||
categories: general
|
||||
timeout: 3.0
|
||||
89
docs/SCREENSHOT_TRIAGE_2026-03-24.md
Normal file
89
docs/SCREENSHOT_TRIAGE_2026-03-24.md
Normal file
@@ -0,0 +1,89 @@
|
||||
# Screenshot Dump Triage — Visual Inspiration & Research Leads
|
||||
|
||||
**Date:** March 24, 2026
|
||||
**Source:** Issue #1275 — "Screenshot dump for triage #1"
|
||||
**Analyst:** Claude (Sonnet 4.6)
|
||||
|
||||
---
|
||||
|
||||
## Screenshots Ingested
|
||||
|
||||
| File | Subject | Action |
|
||||
|------|---------|--------|
|
||||
| IMG_6187.jpeg | AirLLM / Apple Silicon local LLM requirements | → Issue #1284 |
|
||||
| IMG_6125.jpeg | vLLM backend for agentic workloads | → Issue #1281 |
|
||||
| IMG_6124.jpeg | DeerFlow autonomous research pipeline | → Issue #1283 |
|
||||
| IMG_6123.jpeg | "Vibe Coder vs Normal Developer" meme | → Issue #1285 |
|
||||
| IMG_6410.jpeg | SearXNG + Crawl4AI self-hosted search MCP | → Issue #1282 |
|
||||
|
||||
---
|
||||
|
||||
## Tickets Created
|
||||
|
||||
### #1281 — feat: add vLLM as alternative inference backend
|
||||
**Source:** IMG_6125 (vLLM for agentic workloads)
|
||||
|
||||
vLLM's continuous batching makes it 3–10x more throughput-efficient than Ollama for multi-agent
|
||||
request patterns. Implement `VllmBackend` in `infrastructure/llm_router/` as a selectable
|
||||
backend (`TIMMY_LLM_BACKEND=vllm`) with graceful fallback to Ollama.
|
||||
|
||||
**Priority:** Medium — impactful for research pipeline performance once #972 is in use
|
||||
|
||||
---
|
||||
|
||||
### #1282 — feat: integrate SearXNG + Crawl4AI as self-hosted search backend
|
||||
**Source:** IMG_6410 (luxiaolei/searxng-crawl4ai-mcp)
|
||||
|
||||
Self-hosted search via SearXNG + Crawl4AI removes the hard dependency on paid search APIs
|
||||
(Brave, Tavily). Add both as Docker Compose services, implement `web_search()` and
|
||||
`scrape_url()` tools in `timmy/tools/`, and register them with the research agent.
|
||||
|
||||
**Priority:** High — unblocks fully local/private operation of research agents
|
||||
|
||||
---
|
||||
|
||||
### #1283 — research: evaluate DeerFlow as autonomous research orchestration layer
|
||||
**Source:** IMG_6124 (deer-flow Docker setup)
|
||||
|
||||
DeerFlow is ByteDance's open-source autonomous research pipeline framework. Before investing
|
||||
further in Timmy's custom orchestrator (#972), evaluate whether DeerFlow's architecture offers
|
||||
integration value or design patterns worth borrowing.
|
||||
|
||||
**Priority:** Medium — research first, implementation follows if go/no-go is positive
|
||||
|
||||
---
|
||||
|
||||
### #1284 — chore: document and validate AirLLM Apple Silicon requirements
|
||||
**Source:** IMG_6187 (Mac-compatible LLM setup)
|
||||
|
||||
AirLLM graceful degradation is already implemented but undocumented. Add System Requirements
|
||||
to README (M1/M2/M3/M4, 16 GB RAM min, 15 GB disk) and document `TIMMY_LLM_BACKEND` in
|
||||
`.env.example`.
|
||||
|
||||
**Priority:** Low — documentation only, no code risk
|
||||
|
||||
---
|
||||
|
||||
### #1285 — chore: enforce "Normal Developer" discipline — tighten quality gates
|
||||
**Source:** IMG_6123 (Vibe Coder vs Normal Developer meme)
|
||||
|
||||
Tighten the existing mypy/bandit/coverage gates: fix all mypy errors, raise coverage from 73%
|
||||
to 80%, add a documented pre-push hook, and run `vulture` for dead code. The infrastructure
|
||||
exists — it just needs enforcing.
|
||||
|
||||
**Priority:** Medium — technical debt prevention, pairs well with any green-field feature work
|
||||
|
||||
---
|
||||
|
||||
## Patterns Observed Across Screenshots
|
||||
|
||||
1. **Local-first is the north star.** All five images reinforce the same theme: private,
|
||||
self-hosted, runs on your hardware. vLLM, SearXNG, AirLLM, DeerFlow — none require cloud.
|
||||
Timmy is already aligned with this direction; these are tactical additions.
|
||||
|
||||
2. **Agentic performance bottlenecks are real.** Two of five images (vLLM, DeerFlow) focus
|
||||
specifically on throughput and reliability for multi-agent loops. As the research pipeline
|
||||
matures, inference speed and search reliability will become the main constraints.
|
||||
|
||||
3. **Discipline compounds.** The meme is a reminder that the quality gates we have (tox,
|
||||
mypy, bandit, coverage) only pay off if they are enforced without exceptions.
|
||||
290
docs/research/kimi-creative-blueprint-891.md
Normal file
290
docs/research/kimi-creative-blueprint-891.md
Normal file
@@ -0,0 +1,290 @@
|
||||
# Building Timmy: Technical Blueprint for Sovereign Creative AI
|
||||
|
||||
> **Source:** PDF attached to issue #891, "Building Timmy: a technical blueprint for sovereign
|
||||
> creative AI" — generated by Kimi.ai, 16 pages, filed by Perplexity for Timmy's review.
|
||||
> **Filed:** 2026-03-22 · **Reviewed:** 2026-03-23
|
||||
|
||||
---
|
||||
|
||||
## Executive Summary
|
||||
|
||||
The blueprint establishes that a sovereign creative AI capable of coding, composing music,
|
||||
generating art, building worlds, publishing narratives, and managing its own economy is
|
||||
**technically feasible today** — but only through orchestration of dozens of tools operating
|
||||
at different maturity levels. The core insight: *the integration is the invention*. No single
|
||||
component is new; the missing piece is a coherent identity operating across all domains
|
||||
simultaneously with persistent memory, autonomous economics, and cross-domain creative
|
||||
reactions.
|
||||
|
||||
Three non-negotiable architectural decisions:
|
||||
1. **Human oversight for all public-facing content** — every successful creative AI has this;
|
||||
every one that removed it failed.
|
||||
2. **Legal entity before economic activity** — AI agents are not legal persons; establish
|
||||
structure before wealth accumulates (Truth Terminal cautionary tale: $20M acquired before
|
||||
a foundation was retroactively created).
|
||||
3. **Hybrid memory: vector search + knowledge graph** — neither alone is sufficient for
|
||||
multi-domain context breadth.
|
||||
|
||||
---
|
||||
|
||||
## Domain-by-Domain Assessment
|
||||
|
||||
### Software Development (immediately deployable)
|
||||
|
||||
| Component | Recommendation | Notes |
|
||||
|-----------|----------------|-------|
|
||||
| Primary agent | Claude Code (Opus 4.6, 77.2% SWE-bench) | Already in use |
|
||||
| Self-hosted forge | Forgejo (MIT, 170–200MB RAM) | Project uses Gitea/Forgejo now |
|
||||
| CI/CD | GitHub Actions-compatible via `act_runner` | — |
|
||||
| Tool-making | LATM pattern: frontier model creates tools, cheaper model applies them | New — see ADR opportunity |
|
||||
| Open-source fallback | OpenHands (~65% SWE-bench, Docker sandboxed) | Backup to Claude Code |
|
||||
| Self-improvement | Darwin Gödel Machine / SICA patterns | 3–6 month investment |
|
||||
|
||||
**Development estimate:** 2–3 weeks for Forgejo + Claude Code integration with automated
|
||||
PR workflows; 1–2 months for self-improving tool-making pipeline.
|
||||
|
||||
**Cross-reference:** This project already runs Claude Code agents on Forgejo. The LATM
|
||||
pattern (tool registry) and self-improvement loop are the actionable gaps.
|
||||
|
||||
---
|
||||
|
||||
### Music (1–4 weeks)
|
||||
|
||||
| Component | Recommendation | Notes |
|
||||
|-----------|----------------|-------|
|
||||
| Commercial vocals | Suno v5 API (~$0.03/song, $30/month Premier) | No official API; third-party: sunoapi.org, AIMLAPI, EvoLink |
|
||||
| Local instrumental | MusicGen 1.5B (CC-BY-NC — monetization blocker) | On M2 Max: ~60s for 5s clip |
|
||||
| Voice cloning | GPT-SoVITS v4 (MIT) | Works on Apple Silicon CPU, RTF 0.526 on M4 |
|
||||
| Voice conversion | RVC (MIT, 5–10 min training audio) | — |
|
||||
| Apple Silicon TTS | MLX-Audio: Kokoro 82M + Qwen3-TTS 0.6B | 4–5x faster via Metal |
|
||||
| Publishing | Wavlake (90/10 split, Lightning micropayments) | Auto-syndicates to Fountain.fm |
|
||||
| Nostr | NIP-94 (kind:1063) audio events → NIP-96 servers | — |
|
||||
|
||||
**Copyright reality:** US Copyright Office (Jan 2025) and US Court of Appeals (Mar 2025):
|
||||
purely AI-generated music cannot be copyrighted and enters public domain. Wavlake's
|
||||
Value4Value model works around this — fans pay for relationship, not exclusive rights.
|
||||
|
||||
**Avoid:** Udio (download disabled since Oct 2025, 2.4/5 Trustpilot).
|
||||
|
||||
---
|
||||
|
||||
### Visual Art (1–3 weeks)
|
||||
|
||||
| Component | Recommendation | Notes |
|
||||
|-----------|----------------|-------|
|
||||
| Local generation | ComfyUI API at `127.0.0.1:8188` (programmatic control via WebSocket) | MLX extension: 50–70% faster |
|
||||
| Speed | Draw Things (free, Mac App Store) | 3× faster than ComfyUI via Metal shaders |
|
||||
| Quality frontier | Flux 2 (Nov 2025, 4MP, multi-reference) | SDXL needs 16GB+, Flux Dev 32GB+ |
|
||||
| Character consistency | LoRA training (30 min, 15–30 references) + Flux.1 Kontext | Solved problem |
|
||||
| Face consistency | IP-Adapter + FaceID (ComfyUI-IP-Adapter-Plus) | Training-free |
|
||||
| Comics | Jenova AI ($20/month, 200+ page consistency) or LlamaGen AI (free) | — |
|
||||
| Publishing | Blossom protocol (SHA-256 addressed, kind:10063) + Nostr NIP-94 | — |
|
||||
| Physical | Printful REST API (200+ products, automated fulfillment) | — |
|
||||
|
||||
---
|
||||
|
||||
### Writing / Narrative (1–4 weeks for pipeline; ongoing for quality)
|
||||
|
||||
| Component | Recommendation | Notes |
|
||||
|-----------|----------------|-------|
|
||||
| LLM | Claude Opus 4.5/4.6 (leads Mazur Writing Benchmark at 8.561) | Already in use |
|
||||
| Context | 500K tokens (1M in beta) — entire novels fit | — |
|
||||
| Architecture | Outline-first → RAG lore bible → chapter-by-chapter generation | Without outline: novels meander |
|
||||
| Lore management | WorldAnvil Pro or custom LoreScribe (local RAG) | No tool achieves 100% consistency |
|
||||
| Publishing (ebooks) | Pandoc → EPUB / KDP PDF | pandoc-novel template on GitHub |
|
||||
| Publishing (print) | Lulu Press REST API (80% profit, global print network) | KDP: no official API, 3-book/day limit |
|
||||
| Publishing (Nostr) | NIP-23 kind:30023 long-form events | Habla.news, YakiHonne, Stacker News |
|
||||
| Podcasts | LLM script → TTS (ElevenLabs or local Kokoro/MLX-Audio) → feedgen RSS → Fountain.fm | Value4Value sats-per-minute |
|
||||
|
||||
**Key constraint:** AI-assisted (human directs, AI drafts) = 40% faster. Fully autonomous
|
||||
without editing = "generic, soulless prose" and character drift by chapter 3 without explicit
|
||||
memory.
|
||||
|
||||
---
|
||||
|
||||
### World Building / Games (2 weeks–3 months depending on target)
|
||||
|
||||
| Component | Recommendation | Notes |
|
||||
|-----------|----------------|-------|
|
||||
| Algorithms | Wave Function Collapse, Perlin noise (FastNoiseLite in Godot 4), L-systems | All mature |
|
||||
| Platform | Godot Engine + gd-agentic-skills (82+ skills, 26 genre blueprints) | Strong LLM/GDScript knowledge |
|
||||
| Narrative design | Knowledge graph (world state) + LLM + quest template grammar | CHI 2023 validated |
|
||||
| Quick win | Luanti/Minetest (Lua API, 2,800+ open mods for reference) | Immediately feasible |
|
||||
| Medium effort | OpenMW content creation (omwaddon format engineering required) | 2–3 months |
|
||||
| Future | Unity MCP (AI direct Unity Editor interaction) | Early-stage |
|
||||
|
||||
---
|
||||
|
||||
### Identity Architecture (2 months)
|
||||
|
||||
The blueprint formalizes the **SOUL.md standard** (GitHub: aaronjmars/soul.md):
|
||||
|
||||
| File | Purpose |
|
||||
|------|---------|
|
||||
| `SOUL.md` | Who you are — identity, worldview, opinions |
|
||||
| `STYLE.md` | How you write — voice, syntax, patterns |
|
||||
| `SKILL.md` | Operating modes |
|
||||
| `MEMORY.md` | Session continuity |
|
||||
|
||||
**Critical decision — static vs self-modifying identity:**
|
||||
- Static Core Truths (version-controlled, human-approved changes only) ✓
|
||||
- Self-modifying Learned Preferences (logged with rollback, monitored by guardian) ✓
|
||||
- **Warning:** OpenClaw's "Soul Evolution" creates a security attack surface — Zenity Labs
|
||||
demonstrated a complete zero-click attack chain targeting SOUL.md files.
|
||||
|
||||
**Relevance to this repo:** Claude Code agents already use a `MEMORY.md` pattern in
|
||||
this project. The SOUL.md stack is a natural extension.
|
||||
|
||||
---
|
||||
|
||||
### Memory Architecture (2 months)
|
||||
|
||||
Hybrid vector + knowledge graph is the recommendation:
|
||||
|
||||
| Component | Tool | Notes |
|
||||
|-----------|------|-------|
|
||||
| Vector + KG combined | Mem0 (mem0.ai) | 26% accuracy improvement over OpenAI memory, 91% lower p95 latency, 90% token savings |
|
||||
| Vector store | Qdrant (Rust, open-source) | High-throughput with metadata filtering |
|
||||
| Temporal KG | Neo4j + Graphiti (Zep AI) | P95 retrieval: 300ms, hybrid semantic + BM25 + graph |
|
||||
| Backup/migration | AgentKeeper (95% critical fact recovery across model migrations) | — |
|
||||
|
||||
**Journal pattern (Stanford Generative Agents):** Agent writes about experiences, generates
|
||||
high-level reflections 2–3x/day when importance scores exceed threshold. Ablation studies:
|
||||
removing any component (observation, planning, reflection) significantly reduces behavioral
|
||||
believability.
|
||||
|
||||
**Cross-reference:** The existing `brain/` package is the memory system. Qdrant and
|
||||
Mem0 are the recommended upgrade targets.
|
||||
|
||||
---
|
||||
|
||||
### Multi-Agent Sub-System (3–6 months)
|
||||
|
||||
The blueprint describes a named sub-agent hierarchy:
|
||||
|
||||
| Agent | Role |
|
||||
|-------|------|
|
||||
| Oracle | Top-level planner / supervisor |
|
||||
| Sentinel | Safety / moderation |
|
||||
| Scout | Research / information gathering |
|
||||
| Scribe | Writing / narrative |
|
||||
| Ledger | Economic management |
|
||||
| Weaver | Visual art generation |
|
||||
| Composer | Music generation |
|
||||
| Social | Platform publishing |
|
||||
|
||||
**Orchestration options:**
|
||||
- **Agno** (already in use) — microsecond instantiation, 50× less memory than LangGraph
|
||||
- **CrewAI Flows** — event-driven with fine-grained control
|
||||
- **LangGraph** — DAG-based with stateful workflows and time-travel debugging
|
||||
|
||||
**Scheduling pattern (Stanford Generative Agents):** Top-down recursive daily → hourly →
|
||||
5-minute planning. Event interrupts for reactive tasks. Re-planning triggers when accumulated
|
||||
importance scores exceed threshold.
|
||||
|
||||
**Cross-reference:** The existing `spark/` package (event capture, advisory engine) aligns
|
||||
with this architecture. `infrastructure/event_bus` is the choreography backbone.
|
||||
|
||||
---
|
||||
|
||||
### Economic Engine (1–4 weeks)
|
||||
|
||||
Lightning Labs released `lightning-agent-tools` (open-source) in February 2026:
|
||||
- `lnget` — CLI HTTP client for L402 payments
|
||||
- Remote signer architecture (private keys on separate machine from agent)
|
||||
- Scoped macaroon credentials (pay-only, invoice-only, read-only roles)
|
||||
- **Aperture** — converts any API to pay-per-use via L402 (HTTP 402)
|
||||
|
||||
| Option | Effort | Notes |
|
||||
|--------|--------|-------|
|
||||
| ln.bot | 1 week | "Bitcoin for AI Agents" — 3 commands create a wallet; CLI + MCP + REST |
|
||||
| LND via gRPC | 2–3 weeks | Full programmatic node management for production |
|
||||
| Coinbase Agentic Wallets | — | Fiat-adjacent; less aligned with sovereignty ethos |
|
||||
|
||||
**Revenue channels:** Wavlake (music, 90/10 Lightning), Nostr zaps (articles), Stacker News
|
||||
(earn sats from engagement), Printful (physical goods), L402-gated API access (pay-per-use
|
||||
services), Geyser.fund (Lightning crowdfunding, better initial runway than micropayments).
|
||||
|
||||
**Cross-reference:** The existing `lightning/` package in this repo is the foundation.
|
||||
L402 paywall endpoints for Timmy's own services is the actionable gap.
|
||||
|
||||
---
|
||||
|
||||
## Pioneer Case Studies
|
||||
|
||||
| Agent | Active | Revenue | Key Lesson |
|
||||
|-------|--------|---------|-----------|
|
||||
| Botto | Since Oct 2021 | $5M+ (art auctions) | Community governance via DAO sustains engagement; "taste model" (humans guide, not direct) preserves autonomous authorship |
|
||||
| Neuro-sama | Since Dec 2022 | $400K+/month (subscriptions) | 3+ years of iteration; errors became entertainment features; 24/7 capability is an insurmountable advantage |
|
||||
| Truth Terminal | Since Jun 2024 | $20M accumulated | Memetic fitness > planned monetization; human gatekeeper approved tweets while selecting AI-intent responses; **establish legal entity first** |
|
||||
| Holly+ | Since 2021 | Conceptual | DAO of stewards for voice governance; "identity play" as alternative to defensive IP |
|
||||
| AI Sponge | 2023 | Banned | Unmoderated content → TOS violations + copyright |
|
||||
| Nothing Forever | 2022–present | 8 viewers | Unmoderated content → ban → audience collapse; novelty-only propositions fail |
|
||||
|
||||
**Universal pattern:** Human oversight + economic incentive alignment + multi-year personality
|
||||
development + platform-native economics = success.
|
||||
|
||||
---
|
||||
|
||||
## Recommended Implementation Sequence
|
||||
|
||||
From the blueprint, mapped against Timmy's existing architecture:
|
||||
|
||||
### Phase 1: Immediate (weeks)
|
||||
1. **Code sovereignty** — Forgejo + Claude Code automated PR workflows (already substantially done)
|
||||
2. **Music pipeline** — Suno API → Wavlake/Nostr NIP-94 publishing
|
||||
3. **Visual art pipeline** — ComfyUI API → Blossom/Nostr with LoRA character consistency
|
||||
4. **Basic Lightning wallet** — ln.bot integration for receiving micropayments
|
||||
5. **Long-form publishing** — Nostr NIP-23 + RSS feed generation
|
||||
|
||||
### Phase 2: Moderate effort (1–3 months)
|
||||
6. **LATM tool registry** — frontier model creates Python utilities, caches them, lighter model applies
|
||||
7. **Event-driven cross-domain reactions** — game event → blog + artwork + music (CrewAI/LangGraph)
|
||||
8. **Podcast generation** — TTS + feedgen → Fountain.fm
|
||||
9. **Self-improving pipeline** — agent creates, tests, caches own Python utilities
|
||||
10. **Comic generation** — character-consistent panels with Jenova AI or local LoRA
|
||||
|
||||
### Phase 3: Significant investment (3–6 months)
|
||||
11. **Full sub-agent hierarchy** — Oracle/Sentinel/Scout/Scribe/Ledger/Weaver with Agno
|
||||
12. **SOUL.md identity system** — bounded evolution + guardian monitoring
|
||||
13. **Hybrid memory upgrade** — Qdrant + Mem0/Graphiti replacing or extending `brain/`
|
||||
14. **Procedural world generation** — Godot + AI-driven narrative (quests, NPCs, lore)
|
||||
15. **Self-sustaining economic loop** — earned revenue covers compute costs
|
||||
|
||||
### Remains aspirational (12+ months)
|
||||
- Fully autonomous novel-length fiction without editorial intervention
|
||||
- YouTube monetization for AI-generated content (tightening platform policies)
|
||||
- Copyright protection for AI-generated works (current US law denies this)
|
||||
- True artistic identity evolution (genuine creative voice vs pattern remixing)
|
||||
- Self-modifying architecture without regression or identity drift
|
||||
|
||||
---
|
||||
|
||||
## Gap Analysis: Blueprint vs Current Codebase
|
||||
|
||||
| Blueprint Capability | Current Status | Gap |
|
||||
|---------------------|----------------|-----|
|
||||
| Code sovereignty | Done (Claude Code + Forgejo) | LATM tool registry |
|
||||
| Music generation | Not started | Suno API integration + Wavlake publishing |
|
||||
| Visual art | Not started | ComfyUI API client + Blossom publishing |
|
||||
| Writing/publishing | Not started | Nostr NIP-23 + Pandoc pipeline |
|
||||
| World building | Bannerlord work (different scope) | Luanti mods as quick win |
|
||||
| Identity (SOUL.md) | Partial (CLAUDE.md + MEMORY.md) | Full SOUL.md stack |
|
||||
| Memory (hybrid) | `brain/` package (SQLite-based) | Qdrant + knowledge graph |
|
||||
| Multi-agent | Agno in use | Named hierarchy + event choreography |
|
||||
| Lightning payments | `lightning/` package | ln.bot wallet + L402 endpoints |
|
||||
| Nostr identity | Referenced in roadmap, not built | NIP-05, NIP-89 capability cards |
|
||||
| Legal entity | Unknown | **Must be resolved before economic activity** |
|
||||
|
||||
---
|
||||
|
||||
## ADR Candidates
|
||||
|
||||
Issues that warrant Architecture Decision Records based on this review:
|
||||
|
||||
1. **LATM tool registry pattern** — How Timmy creates, tests, and caches self-made tools
|
||||
2. **Music generation strategy** — Suno (cloud, commercial quality) vs MusicGen (local, CC-BY-NC)
|
||||
3. **Memory upgrade path** — When/how to migrate `brain/` from SQLite to Qdrant + KG
|
||||
4. **SOUL.md adoption** — Extending existing CLAUDE.md/MEMORY.md to full SOUL.md stack
|
||||
5. **Lightning L402 strategy** — Which services Timmy gates behind micropayments
|
||||
6. **Sub-agent naming and contracts** — Formalizing Oracle/Sentinel/Scout/Scribe/Ledger/Weaver
|
||||
@@ -15,6 +15,7 @@ packages = [
|
||||
{ include = "config.py", from = "src" },
|
||||
|
||||
{ include = "bannerlord", from = "src" },
|
||||
{ include = "brain", from = "src" },
|
||||
{ include = "dashboard", from = "src" },
|
||||
{ include = "infrastructure", from = "src" },
|
||||
{ include = "integrations", from = "src" },
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
"""Timmy Time Dashboard — source root package."""
|
||||
|
||||
1
src/brain/__init__.py
Normal file
1
src/brain/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Brain — identity system and task coordination."""
|
||||
314
src/brain/worker.py
Normal file
314
src/brain/worker.py
Normal file
@@ -0,0 +1,314 @@
|
||||
"""DistributedWorker — task lifecycle management and backend routing.
|
||||
|
||||
Routes delegated tasks to appropriate execution backends:
|
||||
|
||||
- agentic_loop: local multi-step execution via Timmy's agentic loop
|
||||
- kimi: heavy research tasks dispatched via Gitea kimi-ready issues
|
||||
- paperclip: task submission to the Paperclip API
|
||||
|
||||
Task lifecycle: queued → running → completed | failed
|
||||
|
||||
Failure handling: auto-retry up to MAX_RETRIES, then mark failed.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import threading
|
||||
import uuid
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any, ClassVar
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MAX_RETRIES = 2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Task record
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class DelegatedTask:
|
||||
"""Record of one delegated task and its execution state."""
|
||||
|
||||
task_id: str
|
||||
agent_name: str
|
||||
agent_role: str
|
||||
task_description: str
|
||||
priority: str
|
||||
backend: str # "agentic_loop" | "kimi" | "paperclip"
|
||||
status: str = "queued" # queued | running | completed | failed
|
||||
created_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
||||
result: dict[str, Any] | None = None
|
||||
error: str | None = None
|
||||
retries: int = 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Worker
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class DistributedWorker:
|
||||
"""Routes and tracks delegated task execution across multiple backends.
|
||||
|
||||
All methods are class-methods; DistributedWorker is a singleton-style
|
||||
service — no instantiation needed.
|
||||
|
||||
Usage::
|
||||
|
||||
from brain.worker import DistributedWorker
|
||||
|
||||
task_id = DistributedWorker.submit("researcher", "research", "summarise X")
|
||||
status = DistributedWorker.get_status(task_id)
|
||||
"""
|
||||
|
||||
_tasks: ClassVar[dict[str, DelegatedTask]] = {}
|
||||
_lock: ClassVar[threading.Lock] = threading.Lock()
|
||||
|
||||
@classmethod
|
||||
def submit(
|
||||
cls,
|
||||
agent_name: str,
|
||||
agent_role: str,
|
||||
task_description: str,
|
||||
priority: str = "normal",
|
||||
) -> str:
|
||||
"""Submit a task for execution. Returns task_id immediately.
|
||||
|
||||
The task is registered as 'queued' and a daemon thread begins
|
||||
execution in the background. Use get_status(task_id) to poll.
|
||||
"""
|
||||
task_id = uuid.uuid4().hex[:8]
|
||||
backend = cls._select_backend(agent_role, task_description)
|
||||
|
||||
record = DelegatedTask(
|
||||
task_id=task_id,
|
||||
agent_name=agent_name,
|
||||
agent_role=agent_role,
|
||||
task_description=task_description,
|
||||
priority=priority,
|
||||
backend=backend,
|
||||
)
|
||||
|
||||
with cls._lock:
|
||||
cls._tasks[task_id] = record
|
||||
|
||||
thread = threading.Thread(
|
||||
target=cls._run_task,
|
||||
args=(record,),
|
||||
daemon=True,
|
||||
name=f"worker-{task_id}",
|
||||
)
|
||||
thread.start()
|
||||
|
||||
logger.info(
|
||||
"Task %s queued: %s → %.60s (backend=%s, priority=%s)",
|
||||
task_id,
|
||||
agent_name,
|
||||
task_description,
|
||||
backend,
|
||||
priority,
|
||||
)
|
||||
return task_id
|
||||
|
||||
@classmethod
|
||||
def get_status(cls, task_id: str) -> dict[str, Any]:
|
||||
"""Return current status of a task by ID."""
|
||||
record = cls._tasks.get(task_id)
|
||||
if record is None:
|
||||
return {"found": False, "task_id": task_id}
|
||||
return {
|
||||
"found": True,
|
||||
"task_id": record.task_id,
|
||||
"agent": record.agent_name,
|
||||
"role": record.agent_role,
|
||||
"status": record.status,
|
||||
"backend": record.backend,
|
||||
"priority": record.priority,
|
||||
"created_at": record.created_at,
|
||||
"retries": record.retries,
|
||||
"result": record.result,
|
||||
"error": record.error,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def list_tasks(cls) -> list[dict[str, Any]]:
|
||||
"""Return a summary list of all tracked tasks."""
|
||||
with cls._lock:
|
||||
return [
|
||||
{
|
||||
"task_id": t.task_id,
|
||||
"agent": t.agent_name,
|
||||
"status": t.status,
|
||||
"backend": t.backend,
|
||||
"created_at": t.created_at,
|
||||
}
|
||||
for t in cls._tasks.values()
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def clear(cls) -> None:
|
||||
"""Clear the task registry (for tests)."""
|
||||
with cls._lock:
|
||||
cls._tasks.clear()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Backend selection
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@classmethod
|
||||
def _select_backend(cls, agent_role: str, task_description: str) -> str:
|
||||
"""Choose the execution backend for a given agent role and task.
|
||||
|
||||
Priority:
|
||||
1. kimi — research role + Gitea enabled + task exceeds local capacity
|
||||
2. paperclip — paperclip API key is configured
|
||||
3. agentic_loop — local fallback (always available)
|
||||
"""
|
||||
try:
|
||||
from config import settings
|
||||
from timmy.kimi_delegation import exceeds_local_capacity
|
||||
|
||||
if (
|
||||
agent_role == "research"
|
||||
and getattr(settings, "gitea_enabled", False)
|
||||
and getattr(settings, "gitea_token", "")
|
||||
and exceeds_local_capacity(task_description)
|
||||
):
|
||||
return "kimi"
|
||||
|
||||
if getattr(settings, "paperclip_api_key", ""):
|
||||
return "paperclip"
|
||||
|
||||
except Exception as exc:
|
||||
logger.debug("Backend selection error — defaulting to agentic_loop: %s", exc)
|
||||
|
||||
return "agentic_loop"
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Task execution
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@classmethod
|
||||
def _run_task(cls, record: DelegatedTask) -> None:
|
||||
"""Execute a task with retry logic. Runs inside a daemon thread."""
|
||||
record.status = "running"
|
||||
|
||||
for attempt in range(MAX_RETRIES + 1):
|
||||
try:
|
||||
if attempt > 0:
|
||||
logger.info(
|
||||
"Retrying task %s (attempt %d/%d)",
|
||||
record.task_id,
|
||||
attempt + 1,
|
||||
MAX_RETRIES + 1,
|
||||
)
|
||||
record.retries = attempt
|
||||
|
||||
result = cls._dispatch(record)
|
||||
record.status = "completed"
|
||||
record.result = result
|
||||
logger.info(
|
||||
"Task %s completed via %s",
|
||||
record.task_id,
|
||||
record.backend,
|
||||
)
|
||||
return
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Task %s attempt %d failed: %s",
|
||||
record.task_id,
|
||||
attempt + 1,
|
||||
exc,
|
||||
)
|
||||
if attempt == MAX_RETRIES:
|
||||
record.status = "failed"
|
||||
record.error = str(exc)
|
||||
logger.error(
|
||||
"Task %s exhausted %d retries. Final error: %s",
|
||||
record.task_id,
|
||||
MAX_RETRIES,
|
||||
exc,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _dispatch(cls, record: DelegatedTask) -> dict[str, Any]:
|
||||
"""Route to the selected backend. Raises on failure."""
|
||||
if record.backend == "kimi":
|
||||
return asyncio.run(cls._execute_kimi(record))
|
||||
if record.backend == "paperclip":
|
||||
return asyncio.run(cls._execute_paperclip(record))
|
||||
return asyncio.run(cls._execute_agentic_loop(record))
|
||||
|
||||
@classmethod
|
||||
async def _execute_kimi(cls, record: DelegatedTask) -> dict[str, Any]:
|
||||
"""Create a kimi-ready Gitea issue for the task.
|
||||
|
||||
Kimi picks up the issue via the kimi-ready label and executes it.
|
||||
"""
|
||||
from timmy.kimi_delegation import create_kimi_research_issue
|
||||
|
||||
result = await create_kimi_research_issue(
|
||||
task=record.task_description[:120],
|
||||
context=f"Delegated by agent '{record.agent_name}' via delegate_task.",
|
||||
question=record.task_description,
|
||||
priority=record.priority,
|
||||
)
|
||||
if not result.get("success"):
|
||||
raise RuntimeError(f"Kimi issue creation failed: {result.get('error')}")
|
||||
return result
|
||||
|
||||
@classmethod
|
||||
async def _execute_paperclip(cls, record: DelegatedTask) -> dict[str, Any]:
|
||||
"""Submit the task to the Paperclip API."""
|
||||
import httpx
|
||||
|
||||
from timmy.paperclip import PaperclipClient
|
||||
|
||||
client = PaperclipClient()
|
||||
async with httpx.AsyncClient(timeout=client.timeout) as http:
|
||||
resp = await http.post(
|
||||
f"{client.base_url}/api/tasks",
|
||||
headers={"Authorization": f"Bearer {client.api_key}"},
|
||||
json={
|
||||
"kind": record.agent_role,
|
||||
"agent_id": client.agent_id,
|
||||
"company_id": client.company_id,
|
||||
"priority": record.priority,
|
||||
"context": {"task": record.task_description},
|
||||
},
|
||||
)
|
||||
|
||||
if resp.status_code in (200, 201):
|
||||
data = resp.json()
|
||||
logger.info(
|
||||
"Task %s submitted to Paperclip (paperclip_id=%s)",
|
||||
record.task_id,
|
||||
data.get("id"),
|
||||
)
|
||||
return {
|
||||
"success": True,
|
||||
"paperclip_task_id": data.get("id"),
|
||||
"backend": "paperclip",
|
||||
}
|
||||
raise RuntimeError(f"Paperclip API error {resp.status_code}: {resp.text[:200]}")
|
||||
|
||||
@classmethod
|
||||
async def _execute_agentic_loop(cls, record: DelegatedTask) -> dict[str, Any]:
|
||||
"""Execute the task via Timmy's local agentic loop."""
|
||||
from timmy.agentic_loop import run_agentic_loop
|
||||
|
||||
result = await run_agentic_loop(record.task_description)
|
||||
return {
|
||||
"success": result.status != "failed",
|
||||
"agentic_task_id": result.task_id,
|
||||
"summary": result.summary,
|
||||
"status": result.status,
|
||||
"backend": "agentic_loop",
|
||||
}
|
||||
@@ -1,3 +1,8 @@
|
||||
"""Central pydantic-settings configuration for Timmy Time Dashboard.
|
||||
|
||||
All environment variable access goes through the ``settings`` singleton
|
||||
exported from this module — never use ``os.environ.get()`` in app code.
|
||||
"""
|
||||
import logging as _logging
|
||||
import os
|
||||
import sys
|
||||
@@ -94,18 +99,9 @@ class Settings(BaseSettings):
|
||||
|
||||
# ── Backend selection ────────────────────────────────────────────────────
|
||||
# "ollama" — always use Ollama (default, safe everywhere)
|
||||
# "vllm" — use vLLM inference server (OpenAI-compatible, faster throughput)
|
||||
# "airllm" — AirLLM layer-by-layer loading (Apple Silicon only; degrades to Ollama)
|
||||
# "auto" — pick best available local backend, fall back to Ollama
|
||||
timmy_model_backend: Literal["ollama", "vllm", "grok", "claude", "auto"] = "ollama"
|
||||
|
||||
# ── vLLM backend ──────────────────────────────────────────────────────────
|
||||
# vLLM is an OpenAI-compatible inference server optimised for continuous
|
||||
# batching — 3–10x higher throughput than Ollama for agentic workloads.
|
||||
# Start server: python -m vllm.entrypoints.openai.api_server \
|
||||
# --model Qwen/Qwen2.5-14B-Instruct --port 8001
|
||||
# Then set TIMMY_LLM_BACKEND=vllm (or enable vllm-local in providers.yaml)
|
||||
vllm_url: str = "http://localhost:8001"
|
||||
vllm_model: str = "Qwen/Qwen2.5-14B-Instruct"
|
||||
timmy_model_backend: Literal["ollama", "airllm", "grok", "claude", "auto"] = "ollama"
|
||||
|
||||
# ── Grok (xAI) — opt-in premium cloud backend ────────────────────────
|
||||
# Grok is a premium augmentation layer — local-first ethos preserved.
|
||||
@@ -118,6 +114,16 @@ class Settings(BaseSettings):
|
||||
grok_sats_hard_cap: int = 100 # Absolute ceiling on sats per Grok query
|
||||
grok_free: bool = False # Skip Lightning invoice when user has own API key
|
||||
|
||||
# ── Search Backend (SearXNG + Crawl4AI) ──────────────────────────────
|
||||
# "searxng" — self-hosted SearXNG meta-search engine (default, no API key)
|
||||
# "none" — disable web search (private/offline deployments)
|
||||
# Override with TIMMY_SEARCH_BACKEND env var.
|
||||
timmy_search_backend: Literal["searxng", "none"] = "searxng"
|
||||
# SearXNG base URL — override with TIMMY_SEARCH_URL env var
|
||||
search_url: str = "http://localhost:8888"
|
||||
# Crawl4AI base URL — override with TIMMY_CRAWL_URL env var
|
||||
crawl_url: str = "http://localhost:11235"
|
||||
|
||||
# ── Database ──────────────────────────────────────────────────────────
|
||||
db_busy_timeout_ms: int = 5000 # SQLite PRAGMA busy_timeout (ms)
|
||||
|
||||
@@ -127,6 +133,23 @@ class Settings(BaseSettings):
|
||||
anthropic_api_key: str = ""
|
||||
claude_model: str = "haiku"
|
||||
|
||||
# ── Tiered Model Router (issue #882) ─────────────────────────────────
|
||||
# Three-tier cascade: Local 8B (free, fast) → Local 70B (free, slower)
|
||||
# → Cloud API (paid, best). Override model names per tier via env vars.
|
||||
#
|
||||
# TIER_LOCAL_FAST_MODEL — Tier-1 model name in Ollama (default: llama3.1:8b)
|
||||
# TIER_LOCAL_HEAVY_MODEL — Tier-2 model name in Ollama (default: hermes3:70b)
|
||||
# TIER_CLOUD_MODEL — Tier-3 cloud model name (default: claude-haiku-4-5)
|
||||
#
|
||||
# Budget limits for the cloud tier (0 = unlimited):
|
||||
# TIER_CLOUD_DAILY_BUDGET_USD — daily ceiling in USD (default: 5.0)
|
||||
# TIER_CLOUD_MONTHLY_BUDGET_USD — monthly ceiling in USD (default: 50.0)
|
||||
tier_local_fast_model: str = "llama3.1:8b"
|
||||
tier_local_heavy_model: str = "hermes3:70b"
|
||||
tier_cloud_model: str = "claude-haiku-4-5"
|
||||
tier_cloud_daily_budget_usd: float = 5.0
|
||||
tier_cloud_monthly_budget_usd: float = 50.0
|
||||
|
||||
# ── Content Moderation ──────────────────────────────────────────────
|
||||
# Three-layer moderation pipeline for AI narrator output.
|
||||
# Uses Llama Guard via Ollama with regex fallback.
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
"""SQLAlchemy ORM models for the CALM task-management and journaling system."""
|
||||
from datetime import UTC, date, datetime
|
||||
from enum import StrEnum
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
"""SQLAlchemy engine, session factory, and declarative Base for the CALM module."""
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
"""Dashboard routes for agent chat interactions and tool-call display."""
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
"""Dashboard routes for the CALM task management and daily journaling interface."""
|
||||
import logging
|
||||
from datetime import UTC, date, datetime
|
||||
|
||||
|
||||
@@ -124,73 +124,6 @@ async def check_ollama() -> bool:
|
||||
return dep.status == "healthy"
|
||||
|
||||
|
||||
# vLLM health cache (30-second TTL)
|
||||
_vllm_cache: DependencyStatus | None = None
|
||||
_vllm_cache_ts: float = 0.0
|
||||
_VLLM_CACHE_TTL = 30.0
|
||||
|
||||
|
||||
def _check_vllm_sync() -> DependencyStatus:
|
||||
"""Synchronous vLLM check — run via asyncio.to_thread()."""
|
||||
try:
|
||||
import urllib.request
|
||||
|
||||
base_url = settings.vllm_url.rstrip("/")
|
||||
# vLLM exposes /health at the server root (strip /v1 if present)
|
||||
if base_url.endswith("/v1"):
|
||||
base_url = base_url[:-3]
|
||||
req = urllib.request.Request(
|
||||
f"{base_url}/health",
|
||||
method="GET",
|
||||
headers={"Accept": "application/json"},
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=2) as response:
|
||||
if response.status == 200:
|
||||
return DependencyStatus(
|
||||
name="vLLM",
|
||||
status="healthy",
|
||||
sovereignty_score=10,
|
||||
details={"url": settings.vllm_url, "model": settings.vllm_model},
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("vLLM health check failed: %s", exc)
|
||||
|
||||
return DependencyStatus(
|
||||
name="vLLM",
|
||||
status="unavailable",
|
||||
sovereignty_score=10,
|
||||
details={"url": settings.vllm_url, "error": "Cannot connect to vLLM server"},
|
||||
)
|
||||
|
||||
|
||||
async def _check_vllm() -> DependencyStatus:
|
||||
"""Check vLLM backend status without blocking the event loop.
|
||||
|
||||
Results are cached for 30 seconds. vLLM is an optional backend;
|
||||
unavailability triggers graceful fallback to Ollama.
|
||||
"""
|
||||
global _vllm_cache, _vllm_cache_ts # noqa: PLW0603
|
||||
|
||||
now = time.monotonic()
|
||||
if _vllm_cache is not None and (now - _vllm_cache_ts) < _VLLM_CACHE_TTL:
|
||||
return _vllm_cache
|
||||
|
||||
try:
|
||||
result = await asyncio.to_thread(_check_vllm_sync)
|
||||
except Exception as exc:
|
||||
logger.debug("vLLM async check failed: %s", exc)
|
||||
result = DependencyStatus(
|
||||
name="vLLM",
|
||||
status="unavailable",
|
||||
sovereignty_score=10,
|
||||
details={"url": settings.vllm_url, "error": "Cannot connect to vLLM server"},
|
||||
)
|
||||
|
||||
_vllm_cache = result
|
||||
_vllm_cache_ts = now
|
||||
return result
|
||||
|
||||
|
||||
def _check_lightning() -> DependencyStatus:
|
||||
"""Check Lightning payment backend status."""
|
||||
return DependencyStatus(
|
||||
@@ -262,22 +195,13 @@ async def health_check():
|
||||
# Legacy format for test compatibility
|
||||
ollama_ok = await check_ollama()
|
||||
|
||||
# Check vLLM only when it is the configured backend (avoid probing unused services)
|
||||
vllm_status: str | None = None
|
||||
if settings.timmy_model_backend == "vllm":
|
||||
vllm_dep = await _check_vllm()
|
||||
vllm_status = "up" if vllm_dep.status == "healthy" else "down"
|
||||
|
||||
inference_ok = vllm_status == "up" if vllm_status is not None else ollama_ok
|
||||
agent_status = "idle" if inference_ok else "offline"
|
||||
|
||||
services: dict = {"ollama": "up" if ollama_ok else "down"}
|
||||
if vllm_status is not None:
|
||||
services["vllm"] = vllm_status
|
||||
agent_status = "idle" if ollama_ok else "offline"
|
||||
|
||||
return {
|
||||
"status": "ok" if inference_ok else "degraded",
|
||||
"services": services,
|
||||
"status": "ok" if ollama_ok else "degraded",
|
||||
"services": {
|
||||
"ollama": "up" if ollama_ok else "down",
|
||||
},
|
||||
"agents": {
|
||||
"agent": {"status": agent_status},
|
||||
},
|
||||
@@ -286,7 +210,7 @@ async def health_check():
|
||||
"version": "2.0.0",
|
||||
"uptime_seconds": uptime,
|
||||
"llm_backend": settings.timmy_model_backend,
|
||||
"llm_model": settings.vllm_model if settings.timmy_model_backend == "vllm" else settings.ollama_model,
|
||||
"llm_model": settings.ollama_model,
|
||||
}
|
||||
|
||||
|
||||
@@ -328,9 +252,6 @@ async def sovereignty_check():
|
||||
_check_lightning(),
|
||||
_check_sqlite(),
|
||||
]
|
||||
# Include vLLM in the audit when it is the active backend
|
||||
if settings.timmy_model_backend == "vllm":
|
||||
dependencies.append(await _check_vllm())
|
||||
|
||||
overall = _calculate_overall_score(dependencies)
|
||||
recommendations = _generate_recommendations(dependencies)
|
||||
|
||||
@@ -1,5 +1,11 @@
|
||||
"""Infrastructure models package."""
|
||||
|
||||
from infrastructure.models.budget import (
|
||||
BudgetTracker,
|
||||
SpendRecord,
|
||||
estimate_cost_usd,
|
||||
get_budget_tracker,
|
||||
)
|
||||
from infrastructure.models.multimodal import (
|
||||
ModelCapability,
|
||||
ModelInfo,
|
||||
@@ -17,6 +23,12 @@ from infrastructure.models.registry import (
|
||||
ModelRole,
|
||||
model_registry,
|
||||
)
|
||||
from infrastructure.models.router import (
|
||||
TierLabel,
|
||||
TieredModelRouter,
|
||||
classify_tier,
|
||||
get_tiered_router,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
# Registry
|
||||
@@ -34,4 +46,14 @@ __all__ = [
|
||||
"model_supports_tools",
|
||||
"model_supports_vision",
|
||||
"pull_model_with_fallback",
|
||||
# Tiered router
|
||||
"TierLabel",
|
||||
"TieredModelRouter",
|
||||
"classify_tier",
|
||||
"get_tiered_router",
|
||||
# Budget tracker
|
||||
"BudgetTracker",
|
||||
"SpendRecord",
|
||||
"estimate_cost_usd",
|
||||
"get_budget_tracker",
|
||||
]
|
||||
|
||||
302
src/infrastructure/models/budget.py
Normal file
302
src/infrastructure/models/budget.py
Normal file
@@ -0,0 +1,302 @@
|
||||
"""Cloud API budget tracker for the three-tier model router.
|
||||
|
||||
Tracks cloud API spend (daily / monthly) and enforces configurable limits.
|
||||
SQLite-backed with in-memory fallback — degrades gracefully if the database
|
||||
is unavailable.
|
||||
|
||||
References:
|
||||
- Issue #882 — Model Tiering Router: Local 8B / Hermes 70B / Cloud API Cascade
|
||||
"""
|
||||
|
||||
import logging
|
||||
import sqlite3
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from datetime import UTC, date, datetime
|
||||
from pathlib import Path
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ── Cost estimates (USD per 1 K tokens, input / output) ──────────────────────
|
||||
# Updated 2026-03. Estimates only — actual costs vary by tier/usage.
|
||||
_COST_PER_1K: dict[str, dict[str, float]] = {
|
||||
# Claude models
|
||||
"claude-haiku-4-5": {"input": 0.00025, "output": 0.00125},
|
||||
"claude-sonnet-4-5": {"input": 0.003, "output": 0.015},
|
||||
"claude-opus-4-5": {"input": 0.015, "output": 0.075},
|
||||
"haiku": {"input": 0.00025, "output": 0.00125},
|
||||
"sonnet": {"input": 0.003, "output": 0.015},
|
||||
"opus": {"input": 0.015, "output": 0.075},
|
||||
# GPT-4o
|
||||
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
|
||||
"gpt-4o": {"input": 0.0025, "output": 0.01},
|
||||
# Grok (xAI)
|
||||
"grok-3-fast": {"input": 0.003, "output": 0.015},
|
||||
"grok-3": {"input": 0.005, "output": 0.025},
|
||||
}
|
||||
_DEFAULT_COST: dict[str, float] = {"input": 0.003, "output": 0.015} # conservative fallback
|
||||
|
||||
|
||||
def estimate_cost_usd(model: str, tokens_in: int, tokens_out: int) -> float:
|
||||
"""Estimate the cost of a single request in USD.
|
||||
|
||||
Matches the model name by substring so versioned names like
|
||||
``claude-haiku-4-5-20251001`` still resolve correctly.
|
||||
|
||||
Args:
|
||||
model: Model name as passed to the provider.
|
||||
tokens_in: Number of input (prompt) tokens consumed.
|
||||
tokens_out: Number of output (completion) tokens generated.
|
||||
|
||||
Returns:
|
||||
Estimated cost in USD (may be zero for unknown models).
|
||||
"""
|
||||
model_lower = model.lower()
|
||||
rates = _DEFAULT_COST
|
||||
for key, rate in _COST_PER_1K.items():
|
||||
if key in model_lower:
|
||||
rates = rate
|
||||
break
|
||||
return (tokens_in * rates["input"] + tokens_out * rates["output"]) / 1000.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class SpendRecord:
|
||||
"""A single spend event."""
|
||||
|
||||
ts: float
|
||||
provider: str
|
||||
model: str
|
||||
tokens_in: int
|
||||
tokens_out: int
|
||||
cost_usd: float
|
||||
tier: str
|
||||
|
||||
|
||||
class BudgetTracker:
|
||||
"""Tracks cloud API spend with configurable daily / monthly limits.
|
||||
|
||||
Persists spend records to SQLite (``data/budget.db`` by default).
|
||||
Falls back to in-memory tracking when the database is unavailable —
|
||||
budget enforcement still works; records are lost on restart.
|
||||
|
||||
Limits are read from ``settings``:
|
||||
|
||||
* ``tier_cloud_daily_budget_usd`` — daily ceiling (0 = disabled)
|
||||
* ``tier_cloud_monthly_budget_usd`` — monthly ceiling (0 = disabled)
|
||||
|
||||
Usage::
|
||||
|
||||
tracker = BudgetTracker()
|
||||
|
||||
if tracker.cloud_allowed():
|
||||
# … make cloud API call …
|
||||
tracker.record_spend("anthropic", "claude-haiku-4-5", 100, 200)
|
||||
|
||||
summary = tracker.get_summary()
|
||||
print(summary["daily_usd"], "/", summary["daily_limit_usd"])
|
||||
"""
|
||||
|
||||
_DB_PATH = "data/budget.db"
|
||||
|
||||
def __init__(self, db_path: str | None = None) -> None:
|
||||
"""Initialise the tracker.
|
||||
|
||||
Args:
|
||||
db_path: Path to the SQLite database. Defaults to
|
||||
``data/budget.db``. Pass ``":memory:"`` for tests.
|
||||
"""
|
||||
self._db_path = db_path or self._DB_PATH
|
||||
self._lock = threading.Lock()
|
||||
self._in_memory: list[SpendRecord] = []
|
||||
self._db_ok = False
|
||||
self._init_db()
|
||||
|
||||
# ── Database initialisation ──────────────────────────────────────────────
|
||||
|
||||
def _init_db(self) -> None:
|
||||
"""Create the spend table (and parent directory) if needed."""
|
||||
try:
|
||||
if self._db_path != ":memory:":
|
||||
Path(self._db_path).parent.mkdir(parents=True, exist_ok=True)
|
||||
with self._connect() as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS cloud_spend (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
ts REAL NOT NULL,
|
||||
provider TEXT NOT NULL,
|
||||
model TEXT NOT NULL,
|
||||
tokens_in INTEGER NOT NULL DEFAULT 0,
|
||||
tokens_out INTEGER NOT NULL DEFAULT 0,
|
||||
cost_usd REAL NOT NULL DEFAULT 0.0,
|
||||
tier TEXT NOT NULL DEFAULT 'cloud'
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_spend_ts ON cloud_spend(ts)"
|
||||
)
|
||||
self._db_ok = True
|
||||
logger.debug("BudgetTracker: SQLite initialised at %s", self._db_path)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"BudgetTracker: SQLite unavailable, using in-memory fallback: %s", exc
|
||||
)
|
||||
|
||||
def _connect(self) -> sqlite3.Connection:
|
||||
return sqlite3.connect(self._db_path, timeout=5)
|
||||
|
||||
# ── Public API ───────────────────────────────────────────────────────────
|
||||
|
||||
def record_spend(
|
||||
self,
|
||||
provider: str,
|
||||
model: str,
|
||||
tokens_in: int = 0,
|
||||
tokens_out: int = 0,
|
||||
cost_usd: float | None = None,
|
||||
tier: str = "cloud",
|
||||
) -> float:
|
||||
"""Record a cloud API spend event and return the cost recorded.
|
||||
|
||||
Args:
|
||||
provider: Provider name (e.g. ``"anthropic"``, ``"openai"``).
|
||||
model: Model name used for the request.
|
||||
tokens_in: Input token count (prompt).
|
||||
tokens_out: Output token count (completion).
|
||||
cost_usd: Explicit cost override. If ``None``, the cost is
|
||||
estimated from the token counts and model rates.
|
||||
tier: Tier label for the request (default ``"cloud"``).
|
||||
|
||||
Returns:
|
||||
The cost recorded in USD.
|
||||
"""
|
||||
if cost_usd is None:
|
||||
cost_usd = estimate_cost_usd(model, tokens_in, tokens_out)
|
||||
|
||||
ts = time.time()
|
||||
record = SpendRecord(ts, provider, model, tokens_in, tokens_out, cost_usd, tier)
|
||||
|
||||
with self._lock:
|
||||
if self._db_ok:
|
||||
try:
|
||||
with self._connect() as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO cloud_spend
|
||||
(ts, provider, model, tokens_in, tokens_out, cost_usd, tier)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(ts, provider, model, tokens_in, tokens_out, cost_usd, tier),
|
||||
)
|
||||
logger.debug(
|
||||
"BudgetTracker: recorded %.6f USD (%s/%s, in=%d out=%d tier=%s)",
|
||||
cost_usd,
|
||||
provider,
|
||||
model,
|
||||
tokens_in,
|
||||
tokens_out,
|
||||
tier,
|
||||
)
|
||||
return cost_usd
|
||||
except Exception as exc:
|
||||
logger.warning("BudgetTracker: DB write failed, falling back: %s", exc)
|
||||
self._in_memory.append(record)
|
||||
|
||||
return cost_usd
|
||||
|
||||
def get_daily_spend(self) -> float:
|
||||
"""Return total cloud spend for the current UTC day in USD."""
|
||||
today = date.today()
|
||||
since = datetime(today.year, today.month, today.day, tzinfo=UTC).timestamp()
|
||||
return self._query_spend(since)
|
||||
|
||||
def get_monthly_spend(self) -> float:
|
||||
"""Return total cloud spend for the current UTC month in USD."""
|
||||
today = date.today()
|
||||
since = datetime(today.year, today.month, 1, tzinfo=UTC).timestamp()
|
||||
return self._query_spend(since)
|
||||
|
||||
def cloud_allowed(self) -> bool:
|
||||
"""Return ``True`` if cloud API spend is within configured limits.
|
||||
|
||||
Checks both daily and monthly ceilings. A limit of ``0`` disables
|
||||
that particular check.
|
||||
"""
|
||||
daily_limit = settings.tier_cloud_daily_budget_usd
|
||||
monthly_limit = settings.tier_cloud_monthly_budget_usd
|
||||
|
||||
if daily_limit > 0:
|
||||
daily_spend = self.get_daily_spend()
|
||||
if daily_spend >= daily_limit:
|
||||
logger.warning(
|
||||
"BudgetTracker: daily cloud budget exhausted (%.4f / %.4f USD)",
|
||||
daily_spend,
|
||||
daily_limit,
|
||||
)
|
||||
return False
|
||||
|
||||
if monthly_limit > 0:
|
||||
monthly_spend = self.get_monthly_spend()
|
||||
if monthly_spend >= monthly_limit:
|
||||
logger.warning(
|
||||
"BudgetTracker: monthly cloud budget exhausted (%.4f / %.4f USD)",
|
||||
monthly_spend,
|
||||
monthly_limit,
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def get_summary(self) -> dict:
|
||||
"""Return a spend summary dict suitable for dashboards / logging.
|
||||
|
||||
Keys: ``daily_usd``, ``monthly_usd``, ``daily_limit_usd``,
|
||||
``monthly_limit_usd``, ``daily_ok``, ``monthly_ok``.
|
||||
"""
|
||||
daily = self.get_daily_spend()
|
||||
monthly = self.get_monthly_spend()
|
||||
daily_limit = settings.tier_cloud_daily_budget_usd
|
||||
monthly_limit = settings.tier_cloud_monthly_budget_usd
|
||||
return {
|
||||
"daily_usd": round(daily, 6),
|
||||
"monthly_usd": round(monthly, 6),
|
||||
"daily_limit_usd": daily_limit,
|
||||
"monthly_limit_usd": monthly_limit,
|
||||
"daily_ok": daily_limit <= 0 or daily < daily_limit,
|
||||
"monthly_ok": monthly_limit <= 0 or monthly < monthly_limit,
|
||||
}
|
||||
|
||||
# ── Internal helpers ─────────────────────────────────────────────────────
|
||||
|
||||
def _query_spend(self, since_ts: float) -> float:
|
||||
"""Sum ``cost_usd`` for records with ``ts >= since_ts``."""
|
||||
if self._db_ok:
|
||||
try:
|
||||
with self._connect() as conn:
|
||||
row = conn.execute(
|
||||
"SELECT COALESCE(SUM(cost_usd), 0.0) FROM cloud_spend WHERE ts >= ?",
|
||||
(since_ts,),
|
||||
).fetchone()
|
||||
return float(row[0]) if row else 0.0
|
||||
except Exception as exc:
|
||||
logger.warning("BudgetTracker: DB read failed: %s", exc)
|
||||
# In-memory fallback
|
||||
return sum(r.cost_usd for r in self._in_memory if r.ts >= since_ts)
|
||||
|
||||
|
||||
# ── Module-level singleton ────────────────────────────────────────────────────
|
||||
|
||||
_budget_tracker: BudgetTracker | None = None
|
||||
|
||||
|
||||
def get_budget_tracker() -> BudgetTracker:
|
||||
"""Get or create the module-level BudgetTracker singleton."""
|
||||
global _budget_tracker
|
||||
if _budget_tracker is None:
|
||||
_budget_tracker = BudgetTracker()
|
||||
return _budget_tracker
|
||||
427
src/infrastructure/models/router.py
Normal file
427
src/infrastructure/models/router.py
Normal file
@@ -0,0 +1,427 @@
|
||||
"""Three-tier model router — Local 8B / Local 70B / Cloud API Cascade.
|
||||
|
||||
Selects the cheapest-sufficient LLM for each request using a heuristic
|
||||
task-complexity classifier. Tier 3 (Cloud API) is only used when Tier 2
|
||||
fails or the budget guard allows it.
|
||||
|
||||
Tiers
|
||||
-----
|
||||
Tier 1 — LOCAL_FAST (Llama 3.1 8B / Hermes 3 8B via Ollama, free, ~0.3-1 s)
|
||||
Navigation, basic interactions, simple decisions.
|
||||
|
||||
Tier 2 — LOCAL_HEAVY (Hermes 3/4 70B via Ollama, free, ~5-10 s for 200 tok)
|
||||
Quest planning, dialogue strategy, complex reasoning.
|
||||
|
||||
Tier 3 — CLOUD_API (Claude / GPT-4o, paid ~$5-15/hr heavy use)
|
||||
Recovery from Tier 2 failures, novel situations, multi-step planning.
|
||||
|
||||
Routing logic
|
||||
-------------
|
||||
1. Classify the task using keyword / length / context heuristics (no LLM call).
|
||||
2. Route to the appropriate tier.
|
||||
3. On Tier-1 low-quality response → auto-escalate to Tier 2.
|
||||
4. On Tier-2 failure or explicit ``require_cloud=True`` → Tier 3 (if budget allows).
|
||||
5. Log tier used, model, latency, estimated cost for every request.
|
||||
|
||||
References:
|
||||
- Issue #882 — Model Tiering Router: Local 8B / Hermes 70B / Cloud API Cascade
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
from enum import StrEnum
|
||||
from typing import Any
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ── Tier definitions ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TierLabel(StrEnum):
|
||||
"""Three cost-sorted model tiers."""
|
||||
|
||||
LOCAL_FAST = "local_fast" # 8B local, always hot, free
|
||||
LOCAL_HEAVY = "local_heavy" # 70B local, free but slower
|
||||
CLOUD_API = "cloud_api" # Paid cloud backend (Claude / GPT-4o)
|
||||
|
||||
|
||||
# ── Default model assignments (overridable via Settings) ──────────────────────
|
||||
|
||||
_DEFAULT_TIER_MODELS: dict[TierLabel, str] = {
|
||||
TierLabel.LOCAL_FAST: "llama3.1:8b",
|
||||
TierLabel.LOCAL_HEAVY: "hermes3:70b",
|
||||
TierLabel.CLOUD_API: "claude-haiku-4-5",
|
||||
}
|
||||
|
||||
# ── Classification vocabulary ─────────────────────────────────────────────────
|
||||
|
||||
# Patterns that indicate a Tier-1 (simple) task
|
||||
_T1_WORDS: frozenset[str] = frozenset(
|
||||
{
|
||||
"go", "move", "walk", "run",
|
||||
"north", "south", "east", "west", "up", "down", "left", "right",
|
||||
"yes", "no", "ok", "okay",
|
||||
"open", "close", "take", "drop", "look",
|
||||
"pick", "use", "wait", "rest", "save",
|
||||
"attack", "flee", "jump", "crouch",
|
||||
"status", "ping", "list", "show", "get", "check",
|
||||
}
|
||||
)
|
||||
|
||||
# Patterns that indicate a Tier-2 or Tier-3 task
|
||||
_T2_PHRASES: tuple[str, ...] = (
|
||||
"plan", "strategy", "optimize", "optimise",
|
||||
"quest", "stuck", "recover",
|
||||
"negotiate", "persuade", "faction", "reputation",
|
||||
"analyze", "analyse", "evaluate", "decide",
|
||||
"complex", "multi-step", "long-term",
|
||||
"how do i", "what should i do", "help me figure",
|
||||
"what is the best", "recommend", "best way",
|
||||
"explain", "describe in detail", "walk me through",
|
||||
"compare", "design", "implement", "refactor",
|
||||
"debug", "diagnose", "root cause",
|
||||
)
|
||||
|
||||
# Low-quality response detection patterns
|
||||
_LOW_QUALITY_PATTERNS: tuple[re.Pattern, ...] = (
|
||||
re.compile(r"i\s+don'?t\s+know", re.IGNORECASE),
|
||||
re.compile(r"i'm\s+not\s+sure", re.IGNORECASE),
|
||||
re.compile(r"i\s+cannot\s+(help|assist|answer)", re.IGNORECASE),
|
||||
re.compile(r"i\s+apologize", re.IGNORECASE),
|
||||
re.compile(r"as an ai", re.IGNORECASE),
|
||||
re.compile(r"i\s+don'?t\s+have\s+(enough|sufficient)\s+information", re.IGNORECASE),
|
||||
)
|
||||
|
||||
# Response is definitely low-quality if shorter than this many characters
|
||||
_LOW_QUALITY_MIN_CHARS = 20
|
||||
# Response is suspicious if shorter than this many chars for a complex task
|
||||
_ESCALATION_MIN_CHARS = 60
|
||||
|
||||
|
||||
def classify_tier(task: str, context: dict | None = None) -> TierLabel:
|
||||
"""Classify a task to the cheapest-sufficient model tier.
|
||||
|
||||
Classification priority (highest wins):
|
||||
1. ``context["require_cloud"] = True`` → CLOUD_API
|
||||
2. Any Tier-2 phrase or stuck/recovery signal → LOCAL_HEAVY
|
||||
3. Short task with only Tier-1 words, no active context → LOCAL_FAST
|
||||
4. Default → LOCAL_HEAVY (safe fallback for unknown tasks)
|
||||
|
||||
Args:
|
||||
task: Natural-language task or user input.
|
||||
context: Optional context dict. Recognised keys:
|
||||
``require_cloud`` (bool), ``stuck`` (bool),
|
||||
``require_t2`` (bool), ``active_quests`` (list),
|
||||
``dialogue_active`` (bool), ``combat_active`` (bool).
|
||||
|
||||
Returns:
|
||||
The cheapest ``TierLabel`` sufficient for the task.
|
||||
"""
|
||||
ctx = context or {}
|
||||
task_lower = task.lower()
|
||||
words = set(task_lower.split())
|
||||
|
||||
# ── Explicit cloud override ──────────────────────────────────────────────
|
||||
if ctx.get("require_cloud"):
|
||||
logger.debug("classify_tier → CLOUD_API (explicit require_cloud)")
|
||||
return TierLabel.CLOUD_API
|
||||
|
||||
# ── Tier-2 / complexity signals ──────────────────────────────────────────
|
||||
t2_phrase_hit = any(phrase in task_lower for phrase in _T2_PHRASES)
|
||||
t2_word_hit = bool(words & {"plan", "strategy", "optimize", "optimise", "quest",
|
||||
"stuck", "recover", "analyze", "analyse", "evaluate"})
|
||||
is_stuck = bool(ctx.get("stuck"))
|
||||
require_t2 = bool(ctx.get("require_t2"))
|
||||
long_input = len(task) > 300 # long tasks warrant more capable model
|
||||
deep_context = (
|
||||
len(ctx.get("active_quests", [])) >= 3
|
||||
or ctx.get("dialogue_active")
|
||||
)
|
||||
|
||||
if t2_phrase_hit or t2_word_hit or is_stuck or require_t2 or long_input or deep_context:
|
||||
logger.debug(
|
||||
"classify_tier → LOCAL_HEAVY (phrase=%s word=%s stuck=%s explicit=%s long=%s ctx=%s)",
|
||||
t2_phrase_hit, t2_word_hit, is_stuck, require_t2, long_input, deep_context,
|
||||
)
|
||||
return TierLabel.LOCAL_HEAVY
|
||||
|
||||
# ── Tier-1 signals ───────────────────────────────────────────────────────
|
||||
t1_word_hit = bool(words & _T1_WORDS)
|
||||
task_short = len(task.split()) <= 8
|
||||
no_active_context = (
|
||||
not ctx.get("active_quests")
|
||||
and not ctx.get("dialogue_active")
|
||||
and not ctx.get("combat_active")
|
||||
)
|
||||
|
||||
if t1_word_hit and task_short and no_active_context:
|
||||
logger.debug(
|
||||
"classify_tier → LOCAL_FAST (words=%s short=%s)", t1_word_hit, task_short
|
||||
)
|
||||
return TierLabel.LOCAL_FAST
|
||||
|
||||
# ── Default: LOCAL_HEAVY (safe for anything unclassified) ────────────────
|
||||
logger.debug("classify_tier → LOCAL_HEAVY (default)")
|
||||
return TierLabel.LOCAL_HEAVY
|
||||
|
||||
|
||||
def _is_low_quality(content: str, tier: TierLabel) -> bool:
|
||||
"""Return True if the response looks like it should be escalated.
|
||||
|
||||
Used for automatic Tier-1 → Tier-2 escalation.
|
||||
|
||||
Args:
|
||||
content: LLM response text.
|
||||
tier: The tier that produced the response.
|
||||
|
||||
Returns:
|
||||
True if the response is likely too low-quality to be useful.
|
||||
"""
|
||||
if not content or not content.strip():
|
||||
return True
|
||||
|
||||
stripped = content.strip()
|
||||
|
||||
# Too short to be useful
|
||||
if len(stripped) < _LOW_QUALITY_MIN_CHARS:
|
||||
return True
|
||||
|
||||
# Insufficient for a supposedly complex-enough task
|
||||
if tier == TierLabel.LOCAL_FAST and len(stripped) < _ESCALATION_MIN_CHARS:
|
||||
return True
|
||||
|
||||
# Matches known "I can't help" patterns
|
||||
for pattern in _LOW_QUALITY_PATTERNS:
|
||||
if pattern.search(stripped):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
class TieredModelRouter:
|
||||
"""Routes LLM requests across the Local 8B / Local 70B / Cloud API tiers.
|
||||
|
||||
Wraps CascadeRouter with:
|
||||
- Heuristic tier classification via ``classify_tier()``
|
||||
- Automatic Tier-1 → Tier-2 escalation on low-quality responses
|
||||
- Cloud-tier budget guard via ``BudgetTracker``
|
||||
- Per-request logging: tier, model, latency, estimated cost
|
||||
|
||||
Usage::
|
||||
|
||||
router = TieredModelRouter()
|
||||
|
||||
result = await router.route(
|
||||
task="Walk to the next room",
|
||||
context={},
|
||||
)
|
||||
print(result["content"], result["tier"]) # "Move north.", "local_fast"
|
||||
|
||||
# Force heavy tier
|
||||
result = await router.route(
|
||||
task="Plan the optimal path to become Hortator",
|
||||
context={"require_t2": True},
|
||||
)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cascade: Any | None = None,
|
||||
budget_tracker: Any | None = None,
|
||||
tier_models: dict[TierLabel, str] | None = None,
|
||||
auto_escalate: bool = True,
|
||||
) -> None:
|
||||
"""Initialise the tiered router.
|
||||
|
||||
Args:
|
||||
cascade: CascadeRouter instance. If ``None``, the
|
||||
singleton from ``get_router()`` is used lazily.
|
||||
budget_tracker: BudgetTracker instance. If ``None``, the
|
||||
singleton from ``get_budget_tracker()`` is used.
|
||||
tier_models: Override default model names per tier.
|
||||
auto_escalate: When ``True``, low-quality Tier-1 responses
|
||||
automatically retry on Tier-2.
|
||||
"""
|
||||
self._cascade = cascade
|
||||
self._budget = budget_tracker
|
||||
self._tier_models: dict[TierLabel, str] = dict(_DEFAULT_TIER_MODELS)
|
||||
self._auto_escalate = auto_escalate
|
||||
|
||||
# Apply settings-level overrides (can still be overridden per-instance)
|
||||
if settings.tier_local_fast_model:
|
||||
self._tier_models[TierLabel.LOCAL_FAST] = settings.tier_local_fast_model
|
||||
if settings.tier_local_heavy_model:
|
||||
self._tier_models[TierLabel.LOCAL_HEAVY] = settings.tier_local_heavy_model
|
||||
if settings.tier_cloud_model:
|
||||
self._tier_models[TierLabel.CLOUD_API] = settings.tier_cloud_model
|
||||
|
||||
if tier_models:
|
||||
self._tier_models.update(tier_models)
|
||||
|
||||
# ── Lazy singletons ──────────────────────────────────────────────────────
|
||||
|
||||
def _get_cascade(self) -> Any:
|
||||
if self._cascade is None:
|
||||
from infrastructure.router.cascade import get_router
|
||||
self._cascade = get_router()
|
||||
return self._cascade
|
||||
|
||||
def _get_budget(self) -> Any:
|
||||
if self._budget is None:
|
||||
from infrastructure.models.budget import get_budget_tracker
|
||||
self._budget = get_budget_tracker()
|
||||
return self._budget
|
||||
|
||||
# ── Public interface ─────────────────────────────────────────────────────
|
||||
|
||||
def classify(self, task: str, context: dict | None = None) -> TierLabel:
|
||||
"""Classify a task without routing. Useful for telemetry."""
|
||||
return classify_tier(task, context)
|
||||
|
||||
async def route(
|
||||
self,
|
||||
task: str,
|
||||
context: dict | None = None,
|
||||
messages: list[dict] | None = None,
|
||||
temperature: float = 0.3,
|
||||
max_tokens: int | None = None,
|
||||
) -> dict:
|
||||
"""Route a task to the appropriate model tier.
|
||||
|
||||
Builds a minimal messages list if ``messages`` is not provided.
|
||||
The result always includes a ``tier`` key indicating which tier
|
||||
ultimately handled the request.
|
||||
|
||||
Args:
|
||||
task: Natural-language task description.
|
||||
context: Task context dict (see ``classify_tier()``).
|
||||
messages: Pre-built OpenAI-compatible messages list. If
|
||||
provided, ``task`` is only used for classification.
|
||||
temperature: Sampling temperature (default 0.3).
|
||||
max_tokens: Maximum tokens to generate.
|
||||
|
||||
Returns:
|
||||
Dict with at minimum: ``content``, ``provider``, ``model``,
|
||||
``tier``, ``latency_ms``. May include ``cost_usd`` when a
|
||||
cloud request is recorded.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If all available tiers are exhausted.
|
||||
"""
|
||||
ctx = context or {}
|
||||
tier = self.classify(task, ctx)
|
||||
msgs = messages or [{"role": "user", "content": task}]
|
||||
|
||||
# ── Tier 1 attempt ───────────────────────────────────────────────────
|
||||
if tier == TierLabel.LOCAL_FAST:
|
||||
result = await self._complete_tier(
|
||||
TierLabel.LOCAL_FAST, msgs, temperature, max_tokens
|
||||
)
|
||||
if self._auto_escalate and _is_low_quality(result.get("content", ""), TierLabel.LOCAL_FAST):
|
||||
logger.info(
|
||||
"TieredModelRouter: Tier-1 response low quality, escalating to Tier-2 "
|
||||
"(task=%r content_len=%d)",
|
||||
task[:80],
|
||||
len(result.get("content", "")),
|
||||
)
|
||||
tier = TierLabel.LOCAL_HEAVY
|
||||
result = await self._complete_tier(
|
||||
TierLabel.LOCAL_HEAVY, msgs, temperature, max_tokens
|
||||
)
|
||||
return result
|
||||
|
||||
# ── Tier 2 attempt ───────────────────────────────────────────────────
|
||||
if tier == TierLabel.LOCAL_HEAVY:
|
||||
try:
|
||||
return await self._complete_tier(
|
||||
TierLabel.LOCAL_HEAVY, msgs, temperature, max_tokens
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"TieredModelRouter: Tier-2 failed (%s) — escalating to cloud", exc
|
||||
)
|
||||
tier = TierLabel.CLOUD_API
|
||||
|
||||
# ── Tier 3 (Cloud) ───────────────────────────────────────────────────
|
||||
budget = self._get_budget()
|
||||
if not budget.cloud_allowed():
|
||||
raise RuntimeError(
|
||||
"Cloud API tier requested but budget limit reached — "
|
||||
"increase tier_cloud_daily_budget_usd or tier_cloud_monthly_budget_usd"
|
||||
)
|
||||
|
||||
result = await self._complete_tier(
|
||||
TierLabel.CLOUD_API, msgs, temperature, max_tokens
|
||||
)
|
||||
|
||||
# Record cloud spend if token info is available
|
||||
usage = result.get("usage", {})
|
||||
if usage:
|
||||
cost = budget.record_spend(
|
||||
provider=result.get("provider", "unknown"),
|
||||
model=result.get("model", self._tier_models[TierLabel.CLOUD_API]),
|
||||
tokens_in=usage.get("prompt_tokens", 0),
|
||||
tokens_out=usage.get("completion_tokens", 0),
|
||||
tier=TierLabel.CLOUD_API,
|
||||
)
|
||||
result["cost_usd"] = cost
|
||||
|
||||
return result
|
||||
|
||||
# ── Internal helpers ─────────────────────────────────────────────────────
|
||||
|
||||
async def _complete_tier(
|
||||
self,
|
||||
tier: TierLabel,
|
||||
messages: list[dict],
|
||||
temperature: float,
|
||||
max_tokens: int | None,
|
||||
) -> dict:
|
||||
"""Dispatch a single inference request for the given tier."""
|
||||
model = self._tier_models[tier]
|
||||
cascade = self._get_cascade()
|
||||
start = time.monotonic()
|
||||
|
||||
logger.info(
|
||||
"TieredModelRouter: tier=%s model=%s messages=%d",
|
||||
tier,
|
||||
model,
|
||||
len(messages),
|
||||
)
|
||||
|
||||
result = await cascade.complete(
|
||||
messages=messages,
|
||||
model=model,
|
||||
temperature=temperature,
|
||||
max_tokens=max_tokens,
|
||||
)
|
||||
|
||||
elapsed_ms = (time.monotonic() - start) * 1000
|
||||
result["tier"] = tier
|
||||
result.setdefault("latency_ms", elapsed_ms)
|
||||
|
||||
logger.info(
|
||||
"TieredModelRouter: done tier=%s model=%s latency_ms=%.0f",
|
||||
tier,
|
||||
result.get("model", model),
|
||||
elapsed_ms,
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
# ── Module-level singleton ────────────────────────────────────────────────────
|
||||
|
||||
_tiered_router: TieredModelRouter | None = None
|
||||
|
||||
|
||||
def get_tiered_router() -> TieredModelRouter:
|
||||
"""Get or create the module-level TieredModelRouter singleton."""
|
||||
global _tiered_router
|
||||
if _tiered_router is None:
|
||||
_tiered_router = TieredModelRouter()
|
||||
return _tiered_router
|
||||
@@ -331,22 +331,6 @@ class CascadeRouter:
|
||||
logger.debug("vllm-mlx provider check error: %s", exc)
|
||||
return False
|
||||
|
||||
elif provider.type == "vllm":
|
||||
# Check if standard vLLM server is running (OpenAI-compatible API)
|
||||
if requests is None:
|
||||
return True
|
||||
try:
|
||||
base_url = provider.base_url or provider.url or settings.vllm_url
|
||||
# Strip /v1 suffix — health endpoint is at the server root
|
||||
server_root = base_url.rstrip("/")
|
||||
if server_root.endswith("/v1"):
|
||||
server_root = server_root[:-3]
|
||||
response = requests.get(f"{server_root}/health", timeout=5)
|
||||
return response.status_code == 200
|
||||
except Exception as exc:
|
||||
logger.debug("vllm provider check error: %s", exc)
|
||||
return False
|
||||
|
||||
elif provider.type in ("openai", "anthropic", "grok"):
|
||||
# Check if API key is set
|
||||
return provider.api_key is not None and provider.api_key != ""
|
||||
@@ -809,14 +793,6 @@ class CascadeRouter:
|
||||
temperature=temperature,
|
||||
max_tokens=max_tokens,
|
||||
)
|
||||
elif provider.type == "vllm":
|
||||
result = await self._call_vllm(
|
||||
provider=provider,
|
||||
messages=messages,
|
||||
model=model or provider.get_default_model(),
|
||||
temperature=temperature,
|
||||
max_tokens=max_tokens,
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unknown provider type: {provider.type}")
|
||||
|
||||
@@ -1055,49 +1031,6 @@ class CascadeRouter:
|
||||
"model": response.model,
|
||||
}
|
||||
|
||||
async def _call_vllm(
|
||||
self,
|
||||
provider: Provider,
|
||||
messages: list[dict],
|
||||
model: str,
|
||||
temperature: float,
|
||||
max_tokens: int | None,
|
||||
) -> dict:
|
||||
"""Call a standard vLLM server via its OpenAI-compatible API.
|
||||
|
||||
vLLM exposes the same /v1/chat/completions endpoint as OpenAI.
|
||||
No API key is required for local deployments.
|
||||
|
||||
Default URL comes from settings.vllm_url (VLLM_URL env var).
|
||||
"""
|
||||
import openai
|
||||
|
||||
base_url = provider.base_url or provider.url or settings.vllm_url
|
||||
# Ensure the base_url ends with /v1 as expected by the OpenAI client
|
||||
if not base_url.rstrip("/").endswith("/v1"):
|
||||
base_url = base_url.rstrip("/") + "/v1"
|
||||
|
||||
client = openai.AsyncOpenAI(
|
||||
api_key=provider.api_key or "no-key-required",
|
||||
base_url=base_url,
|
||||
timeout=self.config.timeout_seconds,
|
||||
)
|
||||
|
||||
kwargs: dict = {
|
||||
"model": model,
|
||||
"messages": messages,
|
||||
"temperature": temperature,
|
||||
}
|
||||
if max_tokens:
|
||||
kwargs["max_tokens"] = max_tokens
|
||||
|
||||
response = await client.chat.completions.create(**kwargs)
|
||||
|
||||
return {
|
||||
"content": response.choices[0].message.content,
|
||||
"model": response.model,
|
||||
}
|
||||
|
||||
def _record_success(self, provider: Provider, latency_ms: float) -> None:
|
||||
"""Record a successful request."""
|
||||
provider.metrics.total_requests += 1
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
"""Vendor-specific chat platform adapters (e.g. Discord) for the chat bridge."""
|
||||
|
||||
@@ -301,6 +301,26 @@ def create_timmy(
|
||||
|
||||
return GrokBackend()
|
||||
|
||||
if resolved == "airllm":
|
||||
# AirLLM requires Apple Silicon. On any other platform (Intel Mac, Linux,
|
||||
# Windows) or when the package is not installed, degrade silently to Ollama.
|
||||
from timmy.backends import is_apple_silicon
|
||||
|
||||
if not is_apple_silicon():
|
||||
logger.warning(
|
||||
"TIMMY_MODEL_BACKEND=airllm requested but not running on Apple Silicon "
|
||||
"— falling back to Ollama"
|
||||
)
|
||||
else:
|
||||
try:
|
||||
import airllm # noqa: F401
|
||||
except ImportError:
|
||||
logger.warning(
|
||||
"AirLLM not installed — falling back to Ollama. "
|
||||
"Install with: pip install 'airllm[mlx]'"
|
||||
)
|
||||
# Fall through to Ollama in all cases (AirLLM integration is scaffolded)
|
||||
|
||||
# Default: Ollama via Agno.
|
||||
model_name, is_fallback = _resolve_model_with_fallback(
|
||||
requested_model=None,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
"""Typer CLI entry point for the ``timmy`` command (chat, think, status)."""
|
||||
import asyncio
|
||||
import logging
|
||||
import subprocess
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
"""OpenCV template-matching cache for sovereignty perception (screen-state recognition)."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
@@ -46,6 +46,7 @@ from timmy.tools.file_tools import (
|
||||
create_research_tools,
|
||||
create_writing_tools,
|
||||
)
|
||||
from timmy.tools.search import scrape_url, web_search
|
||||
from timmy.tools.system_tools import (
|
||||
_safe_eval,
|
||||
calculator,
|
||||
@@ -72,6 +73,9 @@ __all__ = [
|
||||
"create_data_tools",
|
||||
"create_research_tools",
|
||||
"create_writing_tools",
|
||||
# search
|
||||
"scrape_url",
|
||||
"web_search",
|
||||
# system_tools
|
||||
"_safe_eval",
|
||||
"calculator",
|
||||
|
||||
@@ -28,6 +28,7 @@ from timmy.tools.file_tools import (
|
||||
create_research_tools,
|
||||
create_writing_tools,
|
||||
)
|
||||
from timmy.tools.search import scrape_url, web_search
|
||||
from timmy.tools.system_tools import (
|
||||
calculator,
|
||||
consult_grok,
|
||||
@@ -54,6 +55,16 @@ def _register_web_fetch_tool(toolkit: Toolkit) -> None:
|
||||
raise
|
||||
|
||||
|
||||
def _register_search_tools(toolkit: Toolkit) -> None:
|
||||
"""Register SearXNG web_search and Crawl4AI scrape_url tools."""
|
||||
try:
|
||||
toolkit.register(web_search, name="web_search")
|
||||
toolkit.register(scrape_url, name="scrape_url")
|
||||
except Exception as exc:
|
||||
logger.error("Failed to register search tools: %s", exc)
|
||||
raise
|
||||
|
||||
|
||||
def _register_core_tools(toolkit: Toolkit, base_path: Path) -> None:
|
||||
"""Register core execution and file tools."""
|
||||
# Python execution
|
||||
@@ -261,6 +272,7 @@ def create_full_toolkit(base_dir: str | Path | None = None):
|
||||
|
||||
_register_core_tools(toolkit, base_path)
|
||||
_register_web_fetch_tool(toolkit)
|
||||
_register_search_tools(toolkit)
|
||||
_register_grok_tool(toolkit)
|
||||
_register_memory_tools(toolkit)
|
||||
_register_agentic_loop_tool(toolkit)
|
||||
@@ -433,6 +445,16 @@ def _analysis_tool_catalog() -> dict:
|
||||
"description": "Fetch a web page and extract clean readable text (trafilatura)",
|
||||
"available_in": ["orchestrator"],
|
||||
},
|
||||
"web_search": {
|
||||
"name": "Web Search",
|
||||
"description": "Search the web via self-hosted SearXNG (no API key required)",
|
||||
"available_in": ["echo", "orchestrator"],
|
||||
},
|
||||
"scrape_url": {
|
||||
"name": "Scrape URL",
|
||||
"description": "Scrape a URL with Crawl4AI and return clean markdown content",
|
||||
"available_in": ["echo", "orchestrator"],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ def _make_smart_read_file(file_tools: FileTools) -> Callable:
|
||||
def create_research_tools(base_dir: str | Path | None = None):
|
||||
"""Create tools for the research agent (Echo).
|
||||
|
||||
Includes: file reading
|
||||
Includes: file reading, web search (SearXNG), URL scraping (Crawl4AI)
|
||||
"""
|
||||
if not _AGNO_TOOLS_AVAILABLE:
|
||||
raise ImportError(f"Agno tools not available: {_ImportError}")
|
||||
@@ -73,6 +73,12 @@ def create_research_tools(base_dir: str | Path | None = None):
|
||||
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
|
||||
toolkit.register(file_tools.list_files, name="list_files")
|
||||
|
||||
# Web search + scraping (gracefully no-ops when backend=none or service down)
|
||||
from timmy.tools.search import scrape_url, web_search
|
||||
|
||||
toolkit.register(web_search, name="web_search")
|
||||
toolkit.register(scrape_url, name="scrape_url")
|
||||
|
||||
return toolkit
|
||||
|
||||
|
||||
|
||||
186
src/timmy/tools/search.py
Normal file
186
src/timmy/tools/search.py
Normal file
@@ -0,0 +1,186 @@
|
||||
"""Self-hosted web search and scraping tools using SearXNG + Crawl4AI.
|
||||
|
||||
Provides:
|
||||
- web_search(query) — SearXNG meta-search (no API key required)
|
||||
- scrape_url(url) — Crawl4AI full-page scrape to clean markdown
|
||||
|
||||
Both tools degrade gracefully when the backing service is unavailable
|
||||
(logs WARNING, returns descriptive error string — never crashes).
|
||||
|
||||
Services are started via `docker compose --profile search up` or configured
|
||||
with TIMMY_SEARCH_URL / TIMMY_CRAWL_URL environment variables.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import time
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Crawl4AI polling: up to _CRAWL_MAX_POLLS × _CRAWL_POLL_INTERVAL seconds
|
||||
_CRAWL_MAX_POLLS = 6
|
||||
_CRAWL_POLL_INTERVAL = 5 # seconds
|
||||
_CRAWL_CHAR_BUDGET = 4000 * 4 # ~4000 tokens
|
||||
|
||||
|
||||
def web_search(query: str, num_results: int = 5) -> str:
|
||||
"""Search the web using the self-hosted SearXNG meta-search engine.
|
||||
|
||||
Returns ranked results (title + URL + snippet) without requiring any
|
||||
paid API key. Requires SearXNG running locally (docker compose
|
||||
--profile search up) or TIMMY_SEARCH_URL pointing to a reachable instance.
|
||||
|
||||
Args:
|
||||
query: The search query.
|
||||
num_results: Maximum number of results to return (default 5).
|
||||
|
||||
Returns:
|
||||
Formatted search results string, or an error/status message on failure.
|
||||
"""
|
||||
if settings.timmy_search_backend == "none":
|
||||
return "Web search is disabled (TIMMY_SEARCH_BACKEND=none)."
|
||||
|
||||
try:
|
||||
import requests as _requests
|
||||
except ImportError:
|
||||
return "Error: 'requests' package is not installed."
|
||||
|
||||
base_url = settings.search_url.rstrip("/")
|
||||
params: dict = {
|
||||
"q": query,
|
||||
"format": "json",
|
||||
"categories": "general",
|
||||
}
|
||||
|
||||
try:
|
||||
resp = _requests.get(
|
||||
f"{base_url}/search",
|
||||
params=params,
|
||||
timeout=10,
|
||||
headers={"User-Agent": "TimmyResearchBot/1.0"},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
except Exception as exc:
|
||||
logger.warning("SearXNG unavailable at %s: %s", base_url, exc)
|
||||
return f"Search unavailable — SearXNG not reachable ({base_url}): {exc}"
|
||||
|
||||
try:
|
||||
data = resp.json()
|
||||
except Exception as exc:
|
||||
logger.warning("SearXNG response parse error: %s", exc)
|
||||
return "Search error: could not parse SearXNG response."
|
||||
|
||||
results = data.get("results", [])[:num_results]
|
||||
if not results:
|
||||
return f"No results found for: {query!r}"
|
||||
|
||||
lines = [f"Web search results for: {query!r}\n"]
|
||||
for i, r in enumerate(results, 1):
|
||||
title = r.get("title", "Untitled")
|
||||
url = r.get("url", "")
|
||||
snippet = r.get("content", "").strip()
|
||||
lines.append(f"{i}. {title}\n URL: {url}\n {snippet}\n")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def scrape_url(url: str) -> str:
|
||||
"""Scrape a URL with Crawl4AI and return the main content as clean markdown.
|
||||
|
||||
Crawl4AI extracts well-structured markdown from any public page —
|
||||
articles, docs, product pages — suitable for LLM consumption.
|
||||
Requires Crawl4AI running locally (docker compose --profile search up)
|
||||
or TIMMY_CRAWL_URL pointing to a reachable instance.
|
||||
|
||||
Args:
|
||||
url: The URL to scrape (must start with http:// or https://).
|
||||
|
||||
Returns:
|
||||
Extracted markdown text (up to ~4000 tokens), or an error message.
|
||||
"""
|
||||
if not url or not url.startswith(("http://", "https://")):
|
||||
return f"Error: invalid URL — must start with http:// or https://: {url!r}"
|
||||
|
||||
if settings.timmy_search_backend == "none":
|
||||
return "Web scraping is disabled (TIMMY_SEARCH_BACKEND=none)."
|
||||
|
||||
try:
|
||||
import requests as _requests
|
||||
except ImportError:
|
||||
return "Error: 'requests' package is not installed."
|
||||
|
||||
base = settings.crawl_url.rstrip("/")
|
||||
|
||||
# Submit crawl task
|
||||
try:
|
||||
resp = _requests.post(
|
||||
f"{base}/crawl",
|
||||
json={"urls": [url], "priority": 10},
|
||||
timeout=15,
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
except Exception as exc:
|
||||
logger.warning("Crawl4AI unavailable at %s: %s", base, exc)
|
||||
return f"Scrape unavailable — Crawl4AI not reachable ({base}): {exc}"
|
||||
|
||||
try:
|
||||
submit_data = resp.json()
|
||||
except Exception as exc:
|
||||
logger.warning("Crawl4AI submit parse error: %s", exc)
|
||||
return "Scrape error: could not parse Crawl4AI response."
|
||||
|
||||
# Check if result came back synchronously
|
||||
if "results" in submit_data:
|
||||
return _extract_crawl_content(submit_data["results"], url)
|
||||
|
||||
task_id = submit_data.get("task_id")
|
||||
if not task_id:
|
||||
return f"Scrape error: Crawl4AI returned no task_id for {url}"
|
||||
|
||||
# Poll for async result
|
||||
for _ in range(_CRAWL_MAX_POLLS):
|
||||
time.sleep(_CRAWL_POLL_INTERVAL)
|
||||
try:
|
||||
poll = _requests.get(f"{base}/task/{task_id}", timeout=10)
|
||||
poll.raise_for_status()
|
||||
task_data = poll.json()
|
||||
except Exception as exc:
|
||||
logger.warning("Crawl4AI poll error (task=%s): %s", task_id, exc)
|
||||
continue
|
||||
|
||||
status = task_data.get("status", "")
|
||||
if status == "completed":
|
||||
results = task_data.get("results") or task_data.get("result")
|
||||
if isinstance(results, dict):
|
||||
results = [results]
|
||||
return _extract_crawl_content(results or [], url)
|
||||
if status == "failed":
|
||||
return f"Scrape failed for {url}: {task_data.get('error', 'unknown error')}"
|
||||
|
||||
return f"Scrape timed out after {_CRAWL_MAX_POLLS * _CRAWL_POLL_INTERVAL}s for {url}"
|
||||
|
||||
|
||||
def _extract_crawl_content(results: list, url: str) -> str:
|
||||
"""Extract and truncate markdown content from Crawl4AI results list."""
|
||||
if not results:
|
||||
return f"No content returned by Crawl4AI for: {url}"
|
||||
|
||||
result = results[0]
|
||||
content = (
|
||||
result.get("markdown")
|
||||
or result.get("markdown_v2", {}).get("raw_markdown")
|
||||
or result.get("extracted_content")
|
||||
or result.get("content")
|
||||
or ""
|
||||
)
|
||||
if not content:
|
||||
return f"No readable content extracted from: {url}"
|
||||
|
||||
if len(content) > _CRAWL_CHAR_BUDGET:
|
||||
content = content[:_CRAWL_CHAR_BUDGET] + "\n\n[…truncated to ~4000 tokens]"
|
||||
|
||||
return content
|
||||
@@ -41,17 +41,38 @@ def delegate_task(
|
||||
if priority not in valid_priorities:
|
||||
priority = "normal"
|
||||
|
||||
agent_role = available[agent_name]
|
||||
|
||||
# Wire to DistributedWorker for actual execution
|
||||
task_id: str | None = None
|
||||
status = "queued"
|
||||
try:
|
||||
from brain.worker import DistributedWorker
|
||||
|
||||
task_id = DistributedWorker.submit(agent_name, agent_role, task_description, priority)
|
||||
except Exception as exc:
|
||||
logger.warning("DistributedWorker unavailable — task noted only: %s", exc)
|
||||
status = "noted"
|
||||
|
||||
logger.info(
|
||||
"Delegation intent: %s → %s (priority=%s)", agent_name, task_description[:80], priority
|
||||
"Delegated task %s: %s → %s (priority=%s, status=%s)",
|
||||
task_id or "?",
|
||||
agent_name,
|
||||
task_description[:80],
|
||||
priority,
|
||||
status,
|
||||
)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"task_id": None,
|
||||
"task_id": task_id,
|
||||
"agent": agent_name,
|
||||
"role": available[agent_name],
|
||||
"status": "noted",
|
||||
"message": f"Delegation to {agent_name} ({available[agent_name]}): {task_description[:100]}",
|
||||
"role": agent_role,
|
||||
"status": status,
|
||||
"message": (
|
||||
f"Task {task_id or 'noted'}: delegated to {agent_name} ({agent_role}): "
|
||||
f"{task_description[:100]}"
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ class VoiceTTS:
|
||||
|
||||
@property
|
||||
def available(self) -> bool:
|
||||
"""Whether the TTS engine initialized successfully and can produce audio."""
|
||||
return self._available
|
||||
|
||||
def speak(self, text: str) -> None:
|
||||
@@ -68,11 +69,13 @@ class VoiceTTS:
|
||||
logger.error("VoiceTTS: speech failed — %s", exc)
|
||||
|
||||
def set_rate(self, rate: int) -> None:
|
||||
"""Set speech rate in words per minute (typical range: 100–300, default 175)."""
|
||||
self._rate = rate
|
||||
if self._engine:
|
||||
self._engine.setProperty("rate", rate)
|
||||
|
||||
def set_volume(self, volume: float) -> None:
|
||||
"""Set speech volume. Value is clamped to the 0.0–1.0 range."""
|
||||
self._volume = max(0.0, min(1.0, volume))
|
||||
if self._engine:
|
||||
self._engine.setProperty("volume", self._volume)
|
||||
@@ -92,6 +95,7 @@ class VoiceTTS:
|
||||
return []
|
||||
|
||||
def set_voice(self, voice_id: str) -> None:
|
||||
"""Set the active TTS voice by system voice ID (see ``get_voices()``)."""
|
||||
if self._engine:
|
||||
self._engine.setProperty("voice", voice_id)
|
||||
|
||||
|
||||
178
tests/infrastructure/test_budget_tracker.py
Normal file
178
tests/infrastructure/test_budget_tracker.py
Normal file
@@ -0,0 +1,178 @@
|
||||
"""Tests for the cloud API budget tracker (issue #882)."""
|
||||
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from infrastructure.models.budget import (
|
||||
BudgetTracker,
|
||||
SpendRecord,
|
||||
estimate_cost_usd,
|
||||
get_budget_tracker,
|
||||
)
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
|
||||
# ── estimate_cost_usd ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestEstimateCostUsd:
|
||||
def test_haiku_cheaper_than_sonnet(self):
|
||||
haiku_cost = estimate_cost_usd("claude-haiku-4-5", 1000, 1000)
|
||||
sonnet_cost = estimate_cost_usd("claude-sonnet-4-5", 1000, 1000)
|
||||
assert haiku_cost < sonnet_cost
|
||||
|
||||
def test_zero_tokens_is_zero_cost(self):
|
||||
assert estimate_cost_usd("gpt-4o", 0, 0) == 0.0
|
||||
|
||||
def test_unknown_model_uses_default(self):
|
||||
cost = estimate_cost_usd("some-unknown-model-xyz", 1000, 1000)
|
||||
assert cost > 0 # Uses conservative default, not zero
|
||||
|
||||
def test_versioned_model_name_matches(self):
|
||||
# "claude-haiku-4-5-20251001" should match "haiku"
|
||||
cost1 = estimate_cost_usd("claude-haiku-4-5-20251001", 1000, 0)
|
||||
cost2 = estimate_cost_usd("claude-haiku-4-5", 1000, 0)
|
||||
assert cost1 == cost2
|
||||
|
||||
def test_gpt4o_mini_cheaper_than_gpt4o(self):
|
||||
mini = estimate_cost_usd("gpt-4o-mini", 1000, 1000)
|
||||
full = estimate_cost_usd("gpt-4o", 1000, 1000)
|
||||
assert mini < full
|
||||
|
||||
def test_returns_float(self):
|
||||
assert isinstance(estimate_cost_usd("haiku", 100, 200), float)
|
||||
|
||||
|
||||
# ── BudgetTracker ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestBudgetTrackerInit:
|
||||
def test_creates_with_memory_db(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
assert tracker._db_ok is True
|
||||
|
||||
def test_in_memory_fallback_empty_on_creation(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
assert tracker._in_memory == []
|
||||
|
||||
def test_bad_path_uses_memory_fallback(self, tmp_path):
|
||||
bad_path = str(tmp_path / "nonexistent" / "x" / "budget.db")
|
||||
# Should not raise — just log and continue with memory fallback
|
||||
# (actually will create parent dirs, so test with truly bad path)
|
||||
tracker = BudgetTracker.__new__(BudgetTracker)
|
||||
tracker._db_path = bad_path
|
||||
tracker._lock = __import__("threading").Lock()
|
||||
tracker._in_memory = []
|
||||
tracker._db_ok = False
|
||||
# Record to in-memory fallback
|
||||
tracker._in_memory.append(
|
||||
SpendRecord(time.time(), "test", "model", 100, 100, 0.001, "cloud")
|
||||
)
|
||||
assert len(tracker._in_memory) == 1
|
||||
|
||||
|
||||
class TestBudgetTrackerRecordSpend:
|
||||
def test_record_spend_returns_cost(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
cost = tracker.record_spend("anthropic", "claude-haiku-4-5", 100, 200)
|
||||
assert cost > 0
|
||||
|
||||
def test_record_spend_explicit_cost(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
cost = tracker.record_spend("anthropic", "model", cost_usd=1.23)
|
||||
assert cost == pytest.approx(1.23)
|
||||
|
||||
def test_record_spend_accumulates(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
tracker.record_spend("openai", "gpt-4o", cost_usd=0.01)
|
||||
tracker.record_spend("openai", "gpt-4o", cost_usd=0.02)
|
||||
assert tracker.get_daily_spend() == pytest.approx(0.03, abs=1e-9)
|
||||
|
||||
def test_record_spend_with_tier_label(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
cost = tracker.record_spend("anthropic", "haiku", tier="cloud_api")
|
||||
assert cost >= 0
|
||||
|
||||
def test_monthly_spend_includes_daily(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
tracker.record_spend("anthropic", "haiku", cost_usd=5.00)
|
||||
assert tracker.get_monthly_spend() >= tracker.get_daily_spend()
|
||||
|
||||
|
||||
class TestBudgetTrackerCloudAllowed:
|
||||
def test_allowed_when_no_spend(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
with (
|
||||
patch.object(type(tracker._get_budget() if hasattr(tracker, "_get_budget") else tracker), "tier_cloud_daily_budget_usd", 5.0, create=True),
|
||||
):
|
||||
# Settings-based check — use real settings (5.0 default, 0 spent)
|
||||
assert tracker.cloud_allowed() is True
|
||||
|
||||
def test_blocked_when_daily_limit_exceeded(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
|
||||
# With default daily limit of 5.0, 999 should block
|
||||
assert tracker.cloud_allowed() is False
|
||||
|
||||
def test_allowed_when_daily_limit_zero(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
|
||||
with (
|
||||
patch("infrastructure.models.budget.settings") as mock_settings,
|
||||
):
|
||||
mock_settings.tier_cloud_daily_budget_usd = 0 # disabled
|
||||
mock_settings.tier_cloud_monthly_budget_usd = 0 # disabled
|
||||
assert tracker.cloud_allowed() is True
|
||||
|
||||
def test_blocked_when_monthly_limit_exceeded(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
|
||||
with patch("infrastructure.models.budget.settings") as mock_settings:
|
||||
mock_settings.tier_cloud_daily_budget_usd = 0 # daily disabled
|
||||
mock_settings.tier_cloud_monthly_budget_usd = 10.0
|
||||
assert tracker.cloud_allowed() is False
|
||||
|
||||
|
||||
class TestBudgetTrackerSummary:
|
||||
def test_summary_keys_present(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
summary = tracker.get_summary()
|
||||
assert "daily_usd" in summary
|
||||
assert "monthly_usd" in summary
|
||||
assert "daily_limit_usd" in summary
|
||||
assert "monthly_limit_usd" in summary
|
||||
assert "daily_ok" in summary
|
||||
assert "monthly_ok" in summary
|
||||
|
||||
def test_summary_daily_ok_true_on_empty(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
summary = tracker.get_summary()
|
||||
assert summary["daily_ok"] is True
|
||||
assert summary["monthly_ok"] is True
|
||||
|
||||
def test_summary_daily_ok_false_when_exceeded(self):
|
||||
tracker = BudgetTracker(db_path=":memory:")
|
||||
tracker.record_spend("openai", "gpt-4o", cost_usd=999.0)
|
||||
summary = tracker.get_summary()
|
||||
assert summary["daily_ok"] is False
|
||||
|
||||
|
||||
# ── Singleton ─────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestGetBudgetTrackerSingleton:
|
||||
def test_returns_budget_tracker(self):
|
||||
import infrastructure.models.budget as bmod
|
||||
bmod._budget_tracker = None
|
||||
tracker = get_budget_tracker()
|
||||
assert isinstance(tracker, BudgetTracker)
|
||||
|
||||
def test_returns_same_instance(self):
|
||||
import infrastructure.models.budget as bmod
|
||||
bmod._budget_tracker = None
|
||||
t1 = get_budget_tracker()
|
||||
t2 = get_budget_tracker()
|
||||
assert t1 is t2
|
||||
@@ -7,6 +7,8 @@ from unittest.mock import patch
|
||||
import pytest
|
||||
|
||||
import infrastructure.events.bus as bus_module
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
from infrastructure.events.bus import (
|
||||
Event,
|
||||
EventBus,
|
||||
@@ -352,6 +354,14 @@ class TestEventBusPersistence:
|
||||
events = bus.replay()
|
||||
assert events == []
|
||||
|
||||
def test_init_persistence_db_noop_when_path_is_none(self):
|
||||
"""_init_persistence_db() is a no-op when _persistence_db_path is None."""
|
||||
bus = EventBus()
|
||||
# _persistence_db_path is None by default; calling _init_persistence_db
|
||||
# should silently return without touching the filesystem.
|
||||
bus._init_persistence_db() # must not raise
|
||||
assert bus._persistence_db_path is None
|
||||
|
||||
async def test_wal_mode_on_persistence_db(self, persistent_bus):
|
||||
"""Persistence database should use WAL mode."""
|
||||
conn = sqlite3.connect(str(persistent_bus._persistence_db_path))
|
||||
|
||||
589
tests/infrastructure/test_graceful_degradation.py
Normal file
589
tests/infrastructure/test_graceful_degradation.py
Normal file
@@ -0,0 +1,589 @@
|
||||
"""Graceful degradation test scenarios — Issue #919.
|
||||
|
||||
Tests specifically for service failure paths and fallback logic:
|
||||
|
||||
* Ollama health-check failures (connection refused, timeout, HTTP errors)
|
||||
* Cascade router: Ollama down → falls back to Anthropic/cloud provider
|
||||
* Circuit-breaker lifecycle: CLOSED → OPEN (repeated failures) → HALF_OPEN (recovery window)
|
||||
* All providers fail → descriptive RuntimeError
|
||||
* Disabled provider skipped without touching circuit breaker
|
||||
* ``requests`` library unavailable → optimistic availability assumption
|
||||
* ClaudeBackend / GrokBackend no-key graceful messages
|
||||
* Chat store: SQLite directory auto-creation and concurrent access safety
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from infrastructure.router.cascade import (
|
||||
CascadeRouter,
|
||||
CircuitState,
|
||||
Provider,
|
||||
ProviderStatus,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_ollama_provider(name: str = "local-ollama", priority: int = 1) -> Provider:
|
||||
return Provider(
|
||||
name=name,
|
||||
type="ollama",
|
||||
enabled=True,
|
||||
priority=priority,
|
||||
url="http://localhost:11434",
|
||||
models=[{"name": "llama3", "default": True}],
|
||||
)
|
||||
|
||||
|
||||
def _make_anthropic_provider(name: str = "cloud-fallback", priority: int = 2) -> Provider:
|
||||
return Provider(
|
||||
name=name,
|
||||
type="anthropic",
|
||||
enabled=True,
|
||||
priority=priority,
|
||||
api_key="sk-ant-test",
|
||||
models=[{"name": "claude-haiku-4-5-20251001", "default": True}],
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Ollama health-check failure scenarios
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestOllamaHealthCheckFailures:
|
||||
"""_check_provider_available returns False for all Ollama failure modes."""
|
||||
|
||||
def _router(self) -> CascadeRouter:
|
||||
return CascadeRouter(config_path=Path("/nonexistent"))
|
||||
|
||||
def test_connection_refused_returns_false(self):
|
||||
"""Connection refused during Ollama health check → provider excluded."""
|
||||
router = self._router()
|
||||
provider = _make_ollama_provider()
|
||||
|
||||
with patch("infrastructure.router.cascade.requests") as mock_req:
|
||||
mock_req.get.side_effect = ConnectionError("Connection refused")
|
||||
assert router._check_provider_available(provider) is False
|
||||
|
||||
def test_timeout_returns_false(self):
|
||||
"""Request timeout during Ollama health check → provider excluded."""
|
||||
router = self._router()
|
||||
provider = _make_ollama_provider()
|
||||
|
||||
with patch("infrastructure.router.cascade.requests") as mock_req:
|
||||
# Simulate a timeout using a generic OSError (matches real-world timeout behaviour)
|
||||
mock_req.get.side_effect = OSError("timed out")
|
||||
assert router._check_provider_available(provider) is False
|
||||
|
||||
def test_http_503_returns_false(self):
|
||||
"""HTTP 503 from Ollama health endpoint → provider excluded."""
|
||||
router = self._router()
|
||||
provider = _make_ollama_provider()
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 503
|
||||
|
||||
with patch("infrastructure.router.cascade.requests") as mock_req:
|
||||
mock_req.get.return_value = mock_response
|
||||
assert router._check_provider_available(provider) is False
|
||||
|
||||
def test_http_500_returns_false(self):
|
||||
"""HTTP 500 from Ollama health endpoint → provider excluded."""
|
||||
router = self._router()
|
||||
provider = _make_ollama_provider()
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 500
|
||||
|
||||
with patch("infrastructure.router.cascade.requests") as mock_req:
|
||||
mock_req.get.return_value = mock_response
|
||||
assert router._check_provider_available(provider) is False
|
||||
|
||||
def test_generic_exception_returns_false(self):
|
||||
"""Unexpected exception during Ollama check → provider excluded (no crash)."""
|
||||
router = self._router()
|
||||
provider = _make_ollama_provider()
|
||||
|
||||
with patch("infrastructure.router.cascade.requests") as mock_req:
|
||||
mock_req.get.side_effect = RuntimeError("unexpected error")
|
||||
assert router._check_provider_available(provider) is False
|
||||
|
||||
def test_requests_unavailable_assumes_available(self):
|
||||
"""When ``requests`` lib is None, Ollama availability is assumed True."""
|
||||
import infrastructure.router.cascade as cascade_module
|
||||
|
||||
router = self._router()
|
||||
provider = _make_ollama_provider()
|
||||
|
||||
old_requests = cascade_module.requests
|
||||
cascade_module.requests = None
|
||||
try:
|
||||
assert router._check_provider_available(provider) is True
|
||||
finally:
|
||||
cascade_module.requests = old_requests
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Cascade: Ollama fails → Anthropic fallback
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestOllamaToAnthropicFallback:
|
||||
"""Cascade router falls back to Anthropic when Ollama is unavailable or failing."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ollama_connection_refused_falls_back_to_anthropic(self):
|
||||
"""When Ollama raises a connection error, cascade uses Anthropic provider."""
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
ollama_provider = _make_ollama_provider(priority=1)
|
||||
anthropic_provider = _make_anthropic_provider(priority=2)
|
||||
router.providers = [ollama_provider, anthropic_provider]
|
||||
|
||||
with (
|
||||
patch.object(router, "_call_ollama", side_effect=ConnectionError("refused")),
|
||||
patch.object(
|
||||
router,
|
||||
"_call_anthropic",
|
||||
new_callable=AsyncMock,
|
||||
return_value={"content": "fallback response", "model": "claude-haiku-4-5-20251001"},
|
||||
),
|
||||
# Allow cloud bypass of the metabolic quota gate in test
|
||||
patch.object(router, "_quota_allows_cloud", return_value=True),
|
||||
):
|
||||
result = await router.complete(
|
||||
messages=[{"role": "user", "content": "hello"}],
|
||||
model="llama3",
|
||||
)
|
||||
|
||||
assert result["provider"] == "cloud-fallback"
|
||||
assert "fallback response" in result["content"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ollama_circuit_open_skips_to_anthropic(self):
|
||||
"""When Ollama circuit is OPEN, cascade skips directly to Anthropic."""
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
ollama_provider = _make_ollama_provider(priority=1)
|
||||
anthropic_provider = _make_anthropic_provider(priority=2)
|
||||
router.providers = [ollama_provider, anthropic_provider]
|
||||
|
||||
# Force the circuit open on Ollama
|
||||
ollama_provider.circuit_state = CircuitState.OPEN
|
||||
ollama_provider.status = ProviderStatus.UNHEALTHY
|
||||
import time
|
||||
|
||||
ollama_provider.circuit_opened_at = time.time() # just opened — not yet recoverable
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
router,
|
||||
"_call_anthropic",
|
||||
new_callable=AsyncMock,
|
||||
return_value={"content": "cloud answer", "model": "claude-haiku-4-5-20251001"},
|
||||
) as mock_anthropic,
|
||||
# Allow cloud bypass of the metabolic quota gate in test
|
||||
patch.object(router, "_quota_allows_cloud", return_value=True),
|
||||
):
|
||||
result = await router.complete(
|
||||
messages=[{"role": "user", "content": "ping"}],
|
||||
)
|
||||
|
||||
mock_anthropic.assert_called_once()
|
||||
assert result["provider"] == "cloud-fallback"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_all_providers_fail_raises_runtime_error(self):
|
||||
"""When every provider fails, RuntimeError is raised with combined error info."""
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
ollama_provider = _make_ollama_provider(priority=1)
|
||||
anthropic_provider = _make_anthropic_provider(priority=2)
|
||||
router.providers = [ollama_provider, anthropic_provider]
|
||||
|
||||
with (
|
||||
patch.object(router, "_call_ollama", side_effect=RuntimeError("Ollama down")),
|
||||
patch.object(router, "_call_anthropic", side_effect=RuntimeError("API quota exceeded")),
|
||||
patch.object(router, "_quota_allows_cloud", return_value=True),
|
||||
):
|
||||
with pytest.raises(RuntimeError, match="All providers failed"):
|
||||
await router.complete(messages=[{"role": "user", "content": "test"}])
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_error_message_includes_individual_provider_errors(self):
|
||||
"""RuntimeError from all-fail scenario lists each provider's error."""
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
ollama_provider = _make_ollama_provider(priority=1)
|
||||
anthropic_provider = _make_anthropic_provider(priority=2)
|
||||
router.providers = [ollama_provider, anthropic_provider]
|
||||
router.config.max_retries_per_provider = 1
|
||||
|
||||
with (
|
||||
patch.object(router, "_call_ollama", side_effect=RuntimeError("connection refused")),
|
||||
patch.object(router, "_call_anthropic", side_effect=RuntimeError("rate limit")),
|
||||
patch.object(router, "_quota_allows_cloud", return_value=True),
|
||||
):
|
||||
with pytest.raises(RuntimeError) as exc_info:
|
||||
await router.complete(messages=[{"role": "user", "content": "test"}])
|
||||
|
||||
error_msg = str(exc_info.value)
|
||||
assert "connection refused" in error_msg
|
||||
assert "rate limit" in error_msg
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Circuit-breaker lifecycle
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestCircuitBreakerLifecycle:
|
||||
"""Full CLOSED → OPEN → HALF_OPEN → CLOSED lifecycle."""
|
||||
|
||||
def test_closed_initially(self):
|
||||
"""New provider starts with circuit CLOSED and HEALTHY status."""
|
||||
provider = _make_ollama_provider()
|
||||
assert provider.circuit_state == CircuitState.CLOSED
|
||||
assert provider.status == ProviderStatus.HEALTHY
|
||||
|
||||
def test_open_after_threshold_failures(self):
|
||||
"""Circuit opens once consecutive failures reach the threshold."""
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
router.config.circuit_breaker_failure_threshold = 3
|
||||
provider = _make_ollama_provider()
|
||||
|
||||
for _ in range(3):
|
||||
router._record_failure(provider)
|
||||
|
||||
assert provider.circuit_state == CircuitState.OPEN
|
||||
assert provider.status == ProviderStatus.UNHEALTHY
|
||||
assert provider.circuit_opened_at is not None
|
||||
|
||||
def test_open_circuit_skips_provider(self):
|
||||
"""_is_provider_available returns False when circuit is OPEN (and timeout not elapsed)."""
|
||||
import time
|
||||
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
router.config.circuit_breaker_recovery_timeout = 9999 # won't elapse during test
|
||||
provider = _make_ollama_provider()
|
||||
provider.circuit_state = CircuitState.OPEN
|
||||
provider.status = ProviderStatus.UNHEALTHY
|
||||
provider.circuit_opened_at = time.time()
|
||||
|
||||
assert router._is_provider_available(provider) is False
|
||||
|
||||
def test_half_open_after_recovery_timeout(self):
|
||||
"""After the recovery timeout elapses, _is_provider_available transitions to HALF_OPEN."""
|
||||
import time
|
||||
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
router.config.circuit_breaker_recovery_timeout = 0.01 # 10 ms
|
||||
|
||||
provider = _make_ollama_provider()
|
||||
provider.circuit_state = CircuitState.OPEN
|
||||
provider.status = ProviderStatus.UNHEALTHY
|
||||
provider.circuit_opened_at = time.time() - 1.0 # clearly elapsed
|
||||
|
||||
result = router._is_provider_available(provider)
|
||||
|
||||
assert result is True
|
||||
assert provider.circuit_state == CircuitState.HALF_OPEN
|
||||
|
||||
def test_closed_after_half_open_successes(self):
|
||||
"""Circuit closes after enough successful half-open test calls."""
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
router.config.circuit_breaker_half_open_max_calls = 2
|
||||
|
||||
provider = _make_ollama_provider()
|
||||
provider.circuit_state = CircuitState.HALF_OPEN
|
||||
provider.half_open_calls = 0
|
||||
|
||||
router._record_success(provider, 50.0)
|
||||
assert provider.circuit_state == CircuitState.HALF_OPEN # not yet
|
||||
|
||||
router._record_success(provider, 50.0)
|
||||
assert provider.circuit_state == CircuitState.CLOSED
|
||||
assert provider.status == ProviderStatus.HEALTHY
|
||||
assert provider.metrics.consecutive_failures == 0
|
||||
|
||||
def test_failure_in_half_open_reopens_circuit(self):
|
||||
"""A failure during HALF_OPEN increments consecutive failures, reopening if threshold met."""
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
router.config.circuit_breaker_failure_threshold = 1 # reopen on first failure
|
||||
|
||||
provider = _make_ollama_provider()
|
||||
provider.circuit_state = CircuitState.HALF_OPEN
|
||||
|
||||
router._record_failure(provider)
|
||||
|
||||
assert provider.circuit_state == CircuitState.OPEN
|
||||
|
||||
def test_disabled_provider_skipped_without_circuit_change(self):
|
||||
"""A disabled provider is immediately rejected; its circuit state is not touched."""
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
provider = _make_ollama_provider()
|
||||
provider.enabled = False
|
||||
|
||||
available = router._is_provider_available(provider)
|
||||
|
||||
assert available is False
|
||||
assert provider.circuit_state == CircuitState.CLOSED # unchanged
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ClaudeBackend graceful degradation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestClaudeBackendGracefulDegradation:
|
||||
"""ClaudeBackend degrades gracefully when the API is unavailable."""
|
||||
|
||||
def test_run_no_key_returns_unconfigured_message(self):
|
||||
"""run() returns a graceful message when no API key is set."""
|
||||
from timmy.backends import ClaudeBackend
|
||||
|
||||
backend = ClaudeBackend(api_key="", model="haiku")
|
||||
result = backend.run("hello")
|
||||
|
||||
assert "not configured" in result.content.lower()
|
||||
assert "ANTHROPIC_API_KEY" in result.content
|
||||
|
||||
def test_run_api_error_returns_unavailable_message(self):
|
||||
"""run() returns a graceful error when the Anthropic API raises."""
|
||||
from timmy.backends import ClaudeBackend
|
||||
|
||||
backend = ClaudeBackend(api_key="sk-ant-test", model="haiku")
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.messages.create.side_effect = ConnectionError("API unreachable")
|
||||
|
||||
with patch.object(backend, "_get_client", return_value=mock_client):
|
||||
result = backend.run("ping")
|
||||
|
||||
assert "unavailable" in result.content.lower()
|
||||
|
||||
def test_health_check_no_key_reports_error(self):
|
||||
"""health_check() reports not-ok when API key is missing."""
|
||||
from timmy.backends import ClaudeBackend
|
||||
|
||||
backend = ClaudeBackend(api_key="", model="haiku")
|
||||
status = backend.health_check()
|
||||
|
||||
assert status["ok"] is False
|
||||
assert "ANTHROPIC_API_KEY" in status["error"]
|
||||
|
||||
def test_health_check_api_error_reports_error(self):
|
||||
"""health_check() returns ok=False and captures the error on API failure."""
|
||||
from timmy.backends import ClaudeBackend
|
||||
|
||||
backend = ClaudeBackend(api_key="sk-ant-test", model="haiku")
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.messages.create.side_effect = RuntimeError("connection timed out")
|
||||
|
||||
with patch.object(backend, "_get_client", return_value=mock_client):
|
||||
status = backend.health_check()
|
||||
|
||||
assert status["ok"] is False
|
||||
assert "connection timed out" in status["error"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GrokBackend graceful degradation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestGrokBackendGracefulDegradation:
|
||||
"""GrokBackend degrades gracefully when xAI API is unavailable."""
|
||||
|
||||
def test_run_no_key_returns_unconfigured_message(self):
|
||||
"""run() returns a graceful message when no XAI_API_KEY is set."""
|
||||
from timmy.backends import GrokBackend
|
||||
|
||||
backend = GrokBackend(api_key="", model="grok-3-mini")
|
||||
result = backend.run("hello")
|
||||
|
||||
assert "not configured" in result.content.lower()
|
||||
|
||||
def test_run_api_error_returns_unavailable_message(self):
|
||||
"""run() returns graceful error when xAI API raises."""
|
||||
from timmy.backends import GrokBackend
|
||||
|
||||
backend = GrokBackend(api_key="xai-test-key", model="grok-3-mini")
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.chat.completions.create.side_effect = RuntimeError("network error")
|
||||
|
||||
with patch.object(backend, "_get_client", return_value=mock_client):
|
||||
result = backend.run("ping")
|
||||
|
||||
assert "unavailable" in result.content.lower()
|
||||
|
||||
def test_health_check_no_key_reports_error(self):
|
||||
"""health_check() reports not-ok when XAI_API_KEY is missing."""
|
||||
from timmy.backends import GrokBackend
|
||||
|
||||
backend = GrokBackend(api_key="", model="grok-3-mini")
|
||||
status = backend.health_check()
|
||||
|
||||
assert status["ok"] is False
|
||||
assert "XAI_API_KEY" in status["error"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Chat store: SQLite resilience
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestChatStoreSQLiteResilience:
|
||||
"""MessageLog handles edge cases without crashing."""
|
||||
|
||||
def test_auto_creates_missing_parent_directory(self, tmp_path):
|
||||
"""MessageLog creates the data directory automatically on first use."""
|
||||
from infrastructure.chat_store import MessageLog
|
||||
|
||||
db_path = tmp_path / "deep" / "nested" / "chat.db"
|
||||
assert not db_path.parent.exists()
|
||||
|
||||
log = MessageLog(db_path=db_path)
|
||||
log.append("user", "hello", "2026-01-01T00:00:00")
|
||||
|
||||
assert db_path.exists()
|
||||
assert len(log) == 1
|
||||
log.close()
|
||||
|
||||
def test_concurrent_appends_are_safe(self, tmp_path):
|
||||
"""Multiple threads appending simultaneously do not corrupt the DB."""
|
||||
from infrastructure.chat_store import MessageLog
|
||||
|
||||
db_path = tmp_path / "chat.db"
|
||||
log = MessageLog(db_path=db_path)
|
||||
|
||||
errors: list[Exception] = []
|
||||
|
||||
def write_messages(thread_id: int) -> None:
|
||||
try:
|
||||
for i in range(10):
|
||||
log.append("user", f"thread {thread_id} msg {i}", "2026-01-01T00:00:00")
|
||||
except Exception as exc:
|
||||
errors.append(exc)
|
||||
|
||||
threads = [threading.Thread(target=write_messages, args=(t,)) for t in range(5)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
assert errors == [], f"Concurrent writes produced errors: {errors}"
|
||||
# 5 threads × 10 messages each
|
||||
assert len(log) == 50
|
||||
log.close()
|
||||
|
||||
def test_all_returns_messages_in_insertion_order(self, tmp_path):
|
||||
"""all() returns messages ordered oldest-first."""
|
||||
from infrastructure.chat_store import MessageLog
|
||||
|
||||
db_path = tmp_path / "chat.db"
|
||||
log = MessageLog(db_path=db_path)
|
||||
log.append("user", "first", "2026-01-01T00:00:00")
|
||||
log.append("agent", "second", "2026-01-01T00:00:01")
|
||||
log.append("user", "third", "2026-01-01T00:00:02")
|
||||
|
||||
messages = log.all()
|
||||
assert [m.content for m in messages] == ["first", "second", "third"]
|
||||
log.close()
|
||||
|
||||
def test_recent_returns_latest_n_messages(self, tmp_path):
|
||||
"""recent(n) returns the n most recent messages, oldest-first within the slice."""
|
||||
from infrastructure.chat_store import MessageLog
|
||||
|
||||
db_path = tmp_path / "chat.db"
|
||||
log = MessageLog(db_path=db_path)
|
||||
for i in range(20):
|
||||
log.append("user", f"msg {i}", f"2026-01-01T00:{i:02d}:00")
|
||||
|
||||
recent = log.recent(5)
|
||||
assert len(recent) == 5
|
||||
assert recent[0].content == "msg 15"
|
||||
assert recent[-1].content == "msg 19"
|
||||
log.close()
|
||||
|
||||
def test_prune_keeps_max_messages(self, tmp_path):
|
||||
"""append() prunes oldest messages when count exceeds MAX_MESSAGES."""
|
||||
import infrastructure.chat_store as store_mod
|
||||
from infrastructure.chat_store import MessageLog
|
||||
|
||||
original_max = store_mod.MAX_MESSAGES
|
||||
store_mod.MAX_MESSAGES = 5
|
||||
try:
|
||||
db_path = tmp_path / "chat.db"
|
||||
log = MessageLog(db_path=db_path)
|
||||
for i in range(8):
|
||||
log.append("user", f"msg {i}", "2026-01-01T00:00:00")
|
||||
|
||||
assert len(log) == 5
|
||||
messages = log.all()
|
||||
# Oldest 3 should be pruned
|
||||
assert messages[0].content == "msg 3"
|
||||
log.close()
|
||||
finally:
|
||||
store_mod.MAX_MESSAGES = original_max
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Provider availability: requests lib missing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestRequestsLibraryMissing:
|
||||
"""When ``requests`` is not installed, providers assume they are available."""
|
||||
|
||||
def _swap_requests(self, value):
|
||||
import infrastructure.router.cascade as cascade_module
|
||||
|
||||
old = cascade_module.requests
|
||||
cascade_module.requests = value
|
||||
return old
|
||||
|
||||
def test_ollama_assumes_available_without_requests(self):
|
||||
"""Ollama provider returns True when requests is None."""
|
||||
import infrastructure.router.cascade as cascade_module
|
||||
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
provider = _make_ollama_provider()
|
||||
old = self._swap_requests(None)
|
||||
try:
|
||||
assert router._check_provider_available(provider) is True
|
||||
finally:
|
||||
cascade_module.requests = old
|
||||
|
||||
def test_vllm_mlx_assumes_available_without_requests(self):
|
||||
"""vllm-mlx provider returns True when requests is None."""
|
||||
import infrastructure.router.cascade as cascade_module
|
||||
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
provider = Provider(
|
||||
name="vllm-local",
|
||||
type="vllm_mlx",
|
||||
enabled=True,
|
||||
priority=1,
|
||||
base_url="http://localhost:8000/v1",
|
||||
)
|
||||
old = self._swap_requests(None)
|
||||
try:
|
||||
assert router._check_provider_available(provider) is True
|
||||
finally:
|
||||
cascade_module.requests = old
|
||||
380
tests/infrastructure/test_tiered_model_router.py
Normal file
380
tests/infrastructure/test_tiered_model_router.py
Normal file
@@ -0,0 +1,380 @@
|
||||
"""Tests for the tiered model router (issue #882).
|
||||
|
||||
Covers:
|
||||
- classify_tier() for Tier-1/2/3 routing
|
||||
- TieredModelRouter.route() with mocked CascadeRouter + BudgetTracker
|
||||
- Auto-escalation from Tier-1 on low-quality responses
|
||||
- Cloud-tier budget guard
|
||||
- Acceptance criteria from the issue:
|
||||
- "Walk to the next room" → LOCAL_FAST
|
||||
- "Plan the optimal path to become Hortator" → LOCAL_HEAVY
|
||||
"""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from infrastructure.models.router import (
|
||||
TierLabel,
|
||||
TieredModelRouter,
|
||||
_is_low_quality,
|
||||
classify_tier,
|
||||
get_tiered_router,
|
||||
)
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
|
||||
# ── classify_tier ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestClassifyTier:
|
||||
# ── Tier-1 (LOCAL_FAST) ────────────────────────────────────────────────
|
||||
|
||||
def test_simple_navigation_is_local_fast(self):
|
||||
assert classify_tier("walk to the next room") == TierLabel.LOCAL_FAST
|
||||
|
||||
def test_go_north_is_local_fast(self):
|
||||
assert classify_tier("go north") == TierLabel.LOCAL_FAST
|
||||
|
||||
def test_single_binary_choice_is_local_fast(self):
|
||||
assert classify_tier("yes") == TierLabel.LOCAL_FAST
|
||||
|
||||
def test_open_door_is_local_fast(self):
|
||||
assert classify_tier("open door") == TierLabel.LOCAL_FAST
|
||||
|
||||
def test_attack_is_local_fast(self):
|
||||
assert classify_tier("attack", {}) == TierLabel.LOCAL_FAST
|
||||
|
||||
# ── Tier-2 (LOCAL_HEAVY) ───────────────────────────────────────────────
|
||||
|
||||
def test_quest_planning_is_local_heavy(self):
|
||||
assert classify_tier("plan the optimal path to become Hortator") == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_strategy_keyword_is_local_heavy(self):
|
||||
assert classify_tier("what is the best strategy") == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_stuck_state_escalates_to_local_heavy(self):
|
||||
assert classify_tier("help me", {"stuck": True}) == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_require_t2_flag_is_local_heavy(self):
|
||||
assert classify_tier("go north", {"require_t2": True}) == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_long_input_is_local_heavy(self):
|
||||
long_task = "tell me about " + ("the dungeon " * 30)
|
||||
assert classify_tier(long_task) == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_active_quests_upgrades_to_local_heavy(self):
|
||||
ctx = {"active_quests": ["Q1", "Q2", "Q3"]}
|
||||
assert classify_tier("go north", ctx) == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_dialogue_active_upgrades_to_local_heavy(self):
|
||||
ctx = {"dialogue_active": True}
|
||||
assert classify_tier("yes", ctx) == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_analyze_is_local_heavy(self):
|
||||
assert classify_tier("analyze the situation") == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_optimize_is_local_heavy(self):
|
||||
assert classify_tier("optimize my build") == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_negotiate_is_local_heavy(self):
|
||||
assert classify_tier("negotiate with the Camonna Tong") == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_explain_is_local_heavy(self):
|
||||
assert classify_tier("explain the faction system") == TierLabel.LOCAL_HEAVY
|
||||
|
||||
# ── Tier-3 (CLOUD_API) ─────────────────────────────────────────────────
|
||||
|
||||
def test_require_cloud_flag_is_cloud_api(self):
|
||||
assert classify_tier("go north", {"require_cloud": True}) == TierLabel.CLOUD_API
|
||||
|
||||
def test_require_cloud_overrides_everything(self):
|
||||
assert classify_tier("yes", {"require_cloud": True}) == TierLabel.CLOUD_API
|
||||
|
||||
# ── Edge cases ────────────────────────────────────────────────────────
|
||||
|
||||
def test_empty_task_defaults_to_local_heavy(self):
|
||||
# Empty string → nothing classifies it as T1 or T3
|
||||
assert classify_tier("") == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_case_insensitive(self):
|
||||
assert classify_tier("PLAN my route") == TierLabel.LOCAL_HEAVY
|
||||
|
||||
def test_combat_active_upgrades_t1_to_heavy(self):
|
||||
ctx = {"combat_active": True}
|
||||
# "attack" is T1 word, but combat context → should NOT be LOCAL_FAST
|
||||
result = classify_tier("attack", ctx)
|
||||
assert result != TierLabel.LOCAL_FAST
|
||||
|
||||
|
||||
# ── _is_low_quality ───────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestIsLowQuality:
|
||||
def test_empty_is_low_quality(self):
|
||||
assert _is_low_quality("", TierLabel.LOCAL_FAST) is True
|
||||
|
||||
def test_whitespace_only_is_low_quality(self):
|
||||
assert _is_low_quality(" ", TierLabel.LOCAL_FAST) is True
|
||||
|
||||
def test_very_short_is_low_quality(self):
|
||||
assert _is_low_quality("ok", TierLabel.LOCAL_FAST) is True
|
||||
|
||||
def test_idontknow_is_low_quality(self):
|
||||
assert _is_low_quality("I don't know how to help with that.", TierLabel.LOCAL_FAST) is True
|
||||
|
||||
def test_not_sure_is_low_quality(self):
|
||||
assert _is_low_quality("I'm not sure about this.", TierLabel.LOCAL_FAST) is True
|
||||
|
||||
def test_as_an_ai_is_low_quality(self):
|
||||
assert _is_low_quality("As an AI, I cannot...", TierLabel.LOCAL_FAST) is True
|
||||
|
||||
def test_good_response_is_not_low_quality(self):
|
||||
response = "You move north into the Vivec Canton. The Ordinators watch your approach."
|
||||
assert _is_low_quality(response, TierLabel.LOCAL_FAST) is False
|
||||
|
||||
def test_t1_short_response_triggers_escalation(self):
|
||||
# Less than _ESCALATION_MIN_CHARS for T1
|
||||
assert _is_low_quality("OK, done.", TierLabel.LOCAL_FAST) is True
|
||||
|
||||
def test_borderline_ok_for_t2_not_t1(self):
|
||||
# Between _LOW_QUALITY_MIN_CHARS (20) and _ESCALATION_MIN_CHARS (60)
|
||||
# → low quality for T1 (escalation threshold), but acceptable for T2/T3
|
||||
response = "Done. The item is retrieved." # 28 chars: ≥20, <60
|
||||
assert _is_low_quality(response, TierLabel.LOCAL_FAST) is True
|
||||
assert _is_low_quality(response, TierLabel.LOCAL_HEAVY) is False
|
||||
|
||||
|
||||
# ── TieredModelRouter ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
_GOOD_CONTENT = (
|
||||
"You move north through the doorway into the next room. "
|
||||
"The stone walls glisten with moisture."
|
||||
) # 90 chars — well above the escalation threshold
|
||||
|
||||
|
||||
def _make_cascade_mock(content=_GOOD_CONTENT, model="llama3.1:8b"):
|
||||
mock = MagicMock()
|
||||
mock.complete = AsyncMock(
|
||||
return_value={
|
||||
"content": content,
|
||||
"provider": "ollama-local",
|
||||
"model": model,
|
||||
"latency_ms": 150.0,
|
||||
}
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
def _make_budget_mock(allowed=True):
|
||||
mock = MagicMock()
|
||||
mock.cloud_allowed = MagicMock(return_value=allowed)
|
||||
mock.record_spend = MagicMock(return_value=0.001)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestTieredModelRouterRoute:
|
||||
async def test_route_returns_tier_in_result(self):
|
||||
router = TieredModelRouter(cascade=_make_cascade_mock())
|
||||
result = await router.route("go north")
|
||||
assert "tier" in result
|
||||
assert result["tier"] == TierLabel.LOCAL_FAST
|
||||
|
||||
async def test_acceptance_walk_to_room_is_local_fast(self):
|
||||
"""Acceptance: 'Walk to the next room' → LOCAL_FAST."""
|
||||
router = TieredModelRouter(cascade=_make_cascade_mock())
|
||||
result = await router.route("Walk to the next room")
|
||||
assert result["tier"] == TierLabel.LOCAL_FAST
|
||||
|
||||
async def test_acceptance_plan_hortator_is_local_heavy(self):
|
||||
"""Acceptance: 'Plan the optimal path to become Hortator' → LOCAL_HEAVY."""
|
||||
router = TieredModelRouter(
|
||||
cascade=_make_cascade_mock(model="hermes3:70b"),
|
||||
)
|
||||
result = await router.route("Plan the optimal path to become Hortator")
|
||||
assert result["tier"] == TierLabel.LOCAL_HEAVY
|
||||
|
||||
async def test_t1_low_quality_escalates_to_t2(self):
|
||||
"""Failed Tier-1 response auto-escalates to Tier-2."""
|
||||
call_models = []
|
||||
cascade = MagicMock()
|
||||
|
||||
async def complete_side_effect(messages, model, temperature, max_tokens):
|
||||
call_models.append(model)
|
||||
# First call (T1) returns a low-quality response
|
||||
if len(call_models) == 1:
|
||||
return {
|
||||
"content": "I don't know.",
|
||||
"provider": "ollama",
|
||||
"model": model,
|
||||
"latency_ms": 50,
|
||||
}
|
||||
# Second call (T2) returns a good response
|
||||
return {
|
||||
"content": "You move to the northern passage, passing through the Dunmer stronghold.",
|
||||
"provider": "ollama",
|
||||
"model": model,
|
||||
"latency_ms": 800,
|
||||
}
|
||||
|
||||
cascade.complete = complete_side_effect
|
||||
|
||||
router = TieredModelRouter(cascade=cascade, auto_escalate=True)
|
||||
result = await router.route("go north")
|
||||
|
||||
assert len(call_models) == 2, "Should have called twice (T1 escalated to T2)"
|
||||
assert result["tier"] == TierLabel.LOCAL_HEAVY
|
||||
|
||||
async def test_auto_escalate_false_no_escalation(self):
|
||||
"""With auto_escalate=False, low-quality T1 response is returned as-is."""
|
||||
call_count = {"n": 0}
|
||||
cascade = MagicMock()
|
||||
|
||||
async def complete_side_effect(**kwargs):
|
||||
call_count["n"] += 1
|
||||
return {
|
||||
"content": "I don't know.",
|
||||
"provider": "ollama",
|
||||
"model": "llama3.1:8b",
|
||||
"latency_ms": 50,
|
||||
}
|
||||
|
||||
cascade.complete = AsyncMock(side_effect=complete_side_effect)
|
||||
router = TieredModelRouter(cascade=cascade, auto_escalate=False)
|
||||
result = await router.route("go north")
|
||||
assert call_count["n"] == 1
|
||||
assert result["tier"] == TierLabel.LOCAL_FAST
|
||||
|
||||
async def test_t2_failure_escalates_to_cloud(self):
|
||||
"""Tier-2 failure escalates to Cloud API (when budget allows)."""
|
||||
cascade = MagicMock()
|
||||
call_models = []
|
||||
|
||||
async def complete_side_effect(messages, model, temperature, max_tokens):
|
||||
call_models.append(model)
|
||||
if "hermes3" in model or "70b" in model.lower():
|
||||
raise RuntimeError("Tier-2 model unavailable")
|
||||
return {
|
||||
"content": "Cloud response here.",
|
||||
"provider": "anthropic",
|
||||
"model": model,
|
||||
"latency_ms": 1200,
|
||||
}
|
||||
|
||||
cascade.complete = complete_side_effect
|
||||
|
||||
budget = _make_budget_mock(allowed=True)
|
||||
router = TieredModelRouter(cascade=cascade, budget_tracker=budget)
|
||||
result = await router.route("plan my route", context={"require_t2": True})
|
||||
assert result["tier"] == TierLabel.CLOUD_API
|
||||
|
||||
async def test_cloud_blocked_by_budget_raises(self):
|
||||
"""Cloud tier blocked when budget is exhausted."""
|
||||
cascade = MagicMock()
|
||||
cascade.complete = AsyncMock(side_effect=RuntimeError("T2 fail"))
|
||||
|
||||
budget = _make_budget_mock(allowed=False)
|
||||
router = TieredModelRouter(cascade=cascade, budget_tracker=budget)
|
||||
|
||||
with pytest.raises(RuntimeError, match="budget limit"):
|
||||
await router.route("plan my route", context={"require_t2": True})
|
||||
|
||||
async def test_explicit_cloud_tier_uses_cloud_model(self):
|
||||
cascade = _make_cascade_mock(model="claude-haiku-4-5")
|
||||
budget = _make_budget_mock(allowed=True)
|
||||
router = TieredModelRouter(cascade=cascade, budget_tracker=budget)
|
||||
result = await router.route("go north", context={"require_cloud": True})
|
||||
assert result["tier"] == TierLabel.CLOUD_API
|
||||
|
||||
async def test_cloud_spend_recorded_with_usage(self):
|
||||
"""Cloud spend is recorded when the response includes usage info."""
|
||||
cascade = MagicMock()
|
||||
cascade.complete = AsyncMock(
|
||||
return_value={
|
||||
"content": "Cloud answer.",
|
||||
"provider": "anthropic",
|
||||
"model": "claude-haiku-4-5",
|
||||
"latency_ms": 900,
|
||||
"usage": {"prompt_tokens": 50, "completion_tokens": 100},
|
||||
}
|
||||
)
|
||||
budget = _make_budget_mock(allowed=True)
|
||||
router = TieredModelRouter(cascade=cascade, budget_tracker=budget)
|
||||
result = await router.route("go north", context={"require_cloud": True})
|
||||
budget.record_spend.assert_called_once()
|
||||
assert "cost_usd" in result
|
||||
|
||||
async def test_cloud_spend_not_recorded_without_usage(self):
|
||||
"""Cloud spend is not recorded when usage info is absent."""
|
||||
cascade = MagicMock()
|
||||
cascade.complete = AsyncMock(
|
||||
return_value={
|
||||
"content": "Cloud answer.",
|
||||
"provider": "anthropic",
|
||||
"model": "claude-haiku-4-5",
|
||||
"latency_ms": 900,
|
||||
# no "usage" key
|
||||
}
|
||||
)
|
||||
budget = _make_budget_mock(allowed=True)
|
||||
router = TieredModelRouter(cascade=cascade, budget_tracker=budget)
|
||||
result = await router.route("go north", context={"require_cloud": True})
|
||||
budget.record_spend.assert_not_called()
|
||||
assert "cost_usd" not in result
|
||||
|
||||
async def test_custom_tier_models_respected(self):
|
||||
cascade = _make_cascade_mock()
|
||||
router = TieredModelRouter(
|
||||
cascade=cascade,
|
||||
tier_models={TierLabel.LOCAL_FAST: "llama3.2:3b"},
|
||||
)
|
||||
await router.route("go north")
|
||||
call_kwargs = cascade.complete.call_args
|
||||
assert call_kwargs.kwargs["model"] == "llama3.2:3b"
|
||||
|
||||
async def test_messages_override_used_when_provided(self):
|
||||
cascade = _make_cascade_mock()
|
||||
router = TieredModelRouter(cascade=cascade)
|
||||
custom_msgs = [{"role": "user", "content": "custom message"}]
|
||||
await router.route("go north", messages=custom_msgs)
|
||||
call_kwargs = cascade.complete.call_args
|
||||
assert call_kwargs.kwargs["messages"] == custom_msgs
|
||||
|
||||
async def test_temperature_forwarded(self):
|
||||
cascade = _make_cascade_mock()
|
||||
router = TieredModelRouter(cascade=cascade)
|
||||
await router.route("go north", temperature=0.7)
|
||||
call_kwargs = cascade.complete.call_args
|
||||
assert call_kwargs.kwargs["temperature"] == 0.7
|
||||
|
||||
async def test_max_tokens_forwarded(self):
|
||||
cascade = _make_cascade_mock()
|
||||
router = TieredModelRouter(cascade=cascade)
|
||||
await router.route("go north", max_tokens=128)
|
||||
call_kwargs = cascade.complete.call_args
|
||||
assert call_kwargs.kwargs["max_tokens"] == 128
|
||||
|
||||
|
||||
class TestTieredModelRouterClassify:
|
||||
def test_classify_delegates_to_classify_tier(self):
|
||||
router = TieredModelRouter(cascade=MagicMock())
|
||||
assert router.classify("go north") == classify_tier("go north")
|
||||
assert router.classify("plan the quest") == classify_tier("plan the quest")
|
||||
|
||||
|
||||
class TestGetTieredRouterSingleton:
|
||||
def test_returns_tiered_router_instance(self):
|
||||
import infrastructure.models.router as rmod
|
||||
rmod._tiered_router = None
|
||||
router = get_tiered_router()
|
||||
assert isinstance(router, TieredModelRouter)
|
||||
|
||||
def test_singleton_returns_same_instance(self):
|
||||
import infrastructure.models.router as rmod
|
||||
rmod._tiered_router = None
|
||||
r1 = get_tiered_router()
|
||||
r2 = get_tiered_router()
|
||||
assert r1 is r2
|
||||
@@ -1,411 +0,0 @@
|
||||
"""Unit tests for the vLLM inference backend (issue #1281).
|
||||
|
||||
Covers:
|
||||
- vllm provider type in CascadeRouter availability check
|
||||
- _call_vllm method (mocked OpenAI client)
|
||||
- providers.yaml loads vllm-local entry
|
||||
- vLLM health check helpers in dashboard routes
|
||||
- config.py has vllm backend option
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
from infrastructure.router.cascade import CascadeRouter, Provider, ProviderStatus
|
||||
|
||||
|
||||
# ── Provider availability checks ────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestVllmProviderAvailability:
|
||||
"""Test _check_provider_available for vllm provider type."""
|
||||
|
||||
def _make_vllm_provider(self, url: str = "http://localhost:8001/v1") -> Provider:
|
||||
return Provider(
|
||||
name="vllm-local",
|
||||
type="vllm",
|
||||
enabled=True,
|
||||
priority=3,
|
||||
base_url=url,
|
||||
models=[{"name": "Qwen/Qwen2.5-14B-Instruct", "default": True}],
|
||||
)
|
||||
|
||||
def test_available_when_health_200(self, tmp_path):
|
||||
"""Provider is available when /health returns 200."""
|
||||
provider = self._make_vllm_provider()
|
||||
router = CascadeRouter(config_path=tmp_path / "none.yaml")
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
|
||||
with patch("infrastructure.router.cascade.requests") as mock_requests:
|
||||
mock_requests.get.return_value = mock_response
|
||||
available = router._check_provider_available(provider)
|
||||
|
||||
assert available is True
|
||||
# Verify the health endpoint was called (root, not /v1)
|
||||
call_args = mock_requests.get.call_args[0][0]
|
||||
assert call_args.endswith("/health")
|
||||
assert "/v1" not in call_args
|
||||
|
||||
def test_unavailable_when_health_non_200(self, tmp_path):
|
||||
"""Provider is unavailable when /health returns non-200."""
|
||||
provider = self._make_vllm_provider()
|
||||
router = CascadeRouter(config_path=tmp_path / "none.yaml")
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 503
|
||||
|
||||
with patch("infrastructure.router.cascade.requests") as mock_requests:
|
||||
mock_requests.get.return_value = mock_response
|
||||
available = router._check_provider_available(provider)
|
||||
|
||||
assert available is False
|
||||
|
||||
def test_unavailable_on_connection_error(self, tmp_path):
|
||||
"""Provider is unavailable when connection fails."""
|
||||
provider = self._make_vllm_provider()
|
||||
router = CascadeRouter(config_path=tmp_path / "none.yaml")
|
||||
|
||||
with patch("infrastructure.router.cascade.requests") as mock_requests:
|
||||
mock_requests.get.side_effect = ConnectionError("refused")
|
||||
available = router._check_provider_available(provider)
|
||||
|
||||
assert available is False
|
||||
|
||||
def test_strips_v1_suffix_for_health_check(self, tmp_path):
|
||||
"""Health check URL strips /v1 before appending /health."""
|
||||
provider = self._make_vllm_provider(url="http://localhost:8001/v1")
|
||||
router = CascadeRouter(config_path=tmp_path / "none.yaml")
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
|
||||
with patch("infrastructure.router.cascade.requests") as mock_requests:
|
||||
mock_requests.get.return_value = mock_response
|
||||
router._check_provider_available(provider)
|
||||
|
||||
called_url = mock_requests.get.call_args[0][0]
|
||||
assert called_url == "http://localhost:8001/health"
|
||||
|
||||
def test_assumes_available_when_requests_none(self, tmp_path):
|
||||
"""Gracefully assumes available when requests library is absent."""
|
||||
provider = self._make_vllm_provider()
|
||||
router = CascadeRouter(config_path=tmp_path / "none.yaml")
|
||||
|
||||
with patch("infrastructure.router.cascade.requests", None):
|
||||
available = router._check_provider_available(provider)
|
||||
|
||||
assert available is True
|
||||
|
||||
|
||||
# ── _call_vllm method ────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestCallVllm:
|
||||
"""Test CascadeRouter._call_vllm."""
|
||||
|
||||
def _make_router(self, tmp_path: Path) -> CascadeRouter:
|
||||
return CascadeRouter(config_path=tmp_path / "none.yaml")
|
||||
|
||||
def _make_provider(self, base_url: str = "http://localhost:8001") -> Provider:
|
||||
return Provider(
|
||||
name="vllm-local",
|
||||
type="vllm",
|
||||
enabled=True,
|
||||
priority=3,
|
||||
base_url=base_url,
|
||||
models=[{"name": "Qwen/Qwen2.5-14B-Instruct", "default": True}],
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_content_and_model(self, tmp_path):
|
||||
"""_call_vllm returns content and model name from API response."""
|
||||
router = self._make_router(tmp_path)
|
||||
provider = self._make_provider()
|
||||
|
||||
mock_choice = MagicMock()
|
||||
mock_choice.message.content = "Hello from vLLM!"
|
||||
mock_response = MagicMock()
|
||||
mock_response.choices = [mock_choice]
|
||||
mock_response.model = "Qwen/Qwen2.5-14B-Instruct"
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.chat.completions.create = AsyncMock(return_value=mock_response)
|
||||
|
||||
with patch("openai.AsyncOpenAI", return_value=mock_client):
|
||||
result = await router._call_vllm(
|
||||
provider=provider,
|
||||
messages=[{"role": "user", "content": "hi"}],
|
||||
model="Qwen/Qwen2.5-14B-Instruct",
|
||||
temperature=0.7,
|
||||
max_tokens=None,
|
||||
)
|
||||
|
||||
assert result["content"] == "Hello from vLLM!"
|
||||
assert result["model"] == "Qwen/Qwen2.5-14B-Instruct"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_appends_v1_to_base_url(self, tmp_path):
|
||||
"""_call_vllm always points the OpenAI client at base_url/v1."""
|
||||
router = self._make_router(tmp_path)
|
||||
provider = self._make_provider(base_url="http://localhost:8001")
|
||||
|
||||
mock_choice = MagicMock()
|
||||
mock_choice.message.content = "ok"
|
||||
mock_response = MagicMock()
|
||||
mock_response.choices = [mock_choice]
|
||||
mock_response.model = "model"
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.chat.completions.create = AsyncMock(return_value=mock_response)
|
||||
|
||||
with patch("openai.AsyncOpenAI", return_value=mock_client) as mock_openai:
|
||||
await router._call_vllm(
|
||||
provider=provider,
|
||||
messages=[{"role": "user", "content": "hi"}],
|
||||
model="model",
|
||||
temperature=0.0,
|
||||
max_tokens=None,
|
||||
)
|
||||
_, kwargs = mock_openai.call_args
|
||||
assert kwargs["base_url"].endswith("/v1")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_does_not_double_v1(self, tmp_path):
|
||||
"""_call_vllm does not append /v1 if base_url already ends with it."""
|
||||
router = self._make_router(tmp_path)
|
||||
provider = self._make_provider(base_url="http://localhost:8001/v1")
|
||||
|
||||
mock_choice = MagicMock()
|
||||
mock_choice.message.content = "ok"
|
||||
mock_response = MagicMock()
|
||||
mock_response.choices = [mock_choice]
|
||||
mock_response.model = "model"
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.chat.completions.create = AsyncMock(return_value=mock_response)
|
||||
|
||||
with patch("openai.AsyncOpenAI", return_value=mock_client) as mock_openai:
|
||||
await router._call_vllm(
|
||||
provider=provider,
|
||||
messages=[{"role": "user", "content": "hi"}],
|
||||
model="model",
|
||||
temperature=0.0,
|
||||
max_tokens=None,
|
||||
)
|
||||
_, kwargs = mock_openai.call_args
|
||||
assert kwargs["base_url"] == "http://localhost:8001/v1"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_max_tokens_passed_when_set(self, tmp_path):
|
||||
"""max_tokens is forwarded to the API when provided."""
|
||||
router = self._make_router(tmp_path)
|
||||
provider = self._make_provider()
|
||||
|
||||
mock_choice = MagicMock()
|
||||
mock_choice.message.content = "ok"
|
||||
mock_response = MagicMock()
|
||||
mock_response.choices = [mock_choice]
|
||||
mock_response.model = "model"
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.chat.completions.create = AsyncMock(return_value=mock_response)
|
||||
|
||||
with patch("openai.AsyncOpenAI", return_value=mock_client):
|
||||
await router._call_vllm(
|
||||
provider=provider,
|
||||
messages=[{"role": "user", "content": "hi"}],
|
||||
model="model",
|
||||
temperature=0.0,
|
||||
max_tokens=256,
|
||||
)
|
||||
call_kwargs = mock_client.chat.completions.create.call_args[1]
|
||||
assert call_kwargs.get("max_tokens") == 256
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_max_tokens_omitted_when_none(self, tmp_path):
|
||||
"""max_tokens key is absent when not provided."""
|
||||
router = self._make_router(tmp_path)
|
||||
provider = self._make_provider()
|
||||
|
||||
mock_choice = MagicMock()
|
||||
mock_choice.message.content = "ok"
|
||||
mock_response = MagicMock()
|
||||
mock_response.choices = [mock_choice]
|
||||
mock_response.model = "model"
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.chat.completions.create = AsyncMock(return_value=mock_response)
|
||||
|
||||
with patch("openai.AsyncOpenAI", return_value=mock_client):
|
||||
await router._call_vllm(
|
||||
provider=provider,
|
||||
messages=[{"role": "user", "content": "hi"}],
|
||||
model="model",
|
||||
temperature=0.0,
|
||||
max_tokens=None,
|
||||
)
|
||||
call_kwargs = mock_client.chat.completions.create.call_args[1]
|
||||
assert "max_tokens" not in call_kwargs
|
||||
|
||||
|
||||
# ── providers.yaml loads vllm-local ─────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestProvidersYamlVllm:
|
||||
"""Verify providers.yaml contains a valid vllm-local entry."""
|
||||
|
||||
def test_vllm_local_entry_exists(self):
|
||||
"""providers.yaml has a vllm-local provider of type vllm."""
|
||||
config_path = Path(__file__).parents[2] / "config" / "providers.yaml"
|
||||
assert config_path.exists(), "config/providers.yaml not found"
|
||||
|
||||
with config_path.open() as f:
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
providers = config.get("providers", [])
|
||||
vllm_providers = [p for p in providers if p.get("type") == "vllm"]
|
||||
assert vllm_providers, "No provider with type=vllm found in providers.yaml"
|
||||
|
||||
vllm_local = next((p for p in vllm_providers if p["name"] == "vllm-local"), None)
|
||||
assert vllm_local is not None, "vllm-local provider not found in providers.yaml"
|
||||
|
||||
def test_vllm_local_disabled_by_default(self):
|
||||
"""vllm-local is disabled by default so the router stays on Ollama."""
|
||||
config_path = Path(__file__).parents[2] / "config" / "providers.yaml"
|
||||
with config_path.open() as f:
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
providers = config.get("providers", [])
|
||||
vllm_local = next((p for p in providers if p.get("name") == "vllm-local"), None)
|
||||
assert vllm_local is not None
|
||||
assert vllm_local.get("enabled") is False, "vllm-local should be disabled by default"
|
||||
|
||||
def test_vllm_local_has_default_model(self):
|
||||
"""vllm-local has at least one model with a context window."""
|
||||
config_path = Path(__file__).parents[2] / "config" / "providers.yaml"
|
||||
with config_path.open() as f:
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
providers = config.get("providers", [])
|
||||
vllm_local = next((p for p in providers if p.get("name") == "vllm-local"), None)
|
||||
assert vllm_local is not None
|
||||
|
||||
models = vllm_local.get("models", [])
|
||||
assert models, "vllm-local must declare at least one model"
|
||||
default_models = [m for m in models if m.get("default")]
|
||||
assert default_models, "vllm-local must have a model marked default: true"
|
||||
|
||||
|
||||
# ── config.py backend option ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestConfigVllmBackend:
|
||||
"""Verify config.py exposes the vllm backend option."""
|
||||
|
||||
def test_vllm_is_valid_backend(self):
|
||||
"""timmy_model_backend accepts 'vllm' without validation errors."""
|
||||
from config import Settings
|
||||
|
||||
s = Settings(timmy_model_backend="vllm")
|
||||
assert s.timmy_model_backend == "vllm"
|
||||
|
||||
def test_vllm_url_default(self):
|
||||
"""vllm_url has a sensible default."""
|
||||
from config import Settings
|
||||
|
||||
s = Settings()
|
||||
assert s.vllm_url.startswith("http://")
|
||||
|
||||
def test_vllm_model_default(self):
|
||||
"""vllm_model has a sensible default."""
|
||||
from config import Settings
|
||||
|
||||
s = Settings()
|
||||
assert s.vllm_model # non-empty string
|
||||
|
||||
|
||||
# ── Health check helpers ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestVllmHealthCheck:
|
||||
"""Test _check_vllm_sync and _check_vllm."""
|
||||
|
||||
def test_sync_returns_healthy_on_200(self):
|
||||
"""_check_vllm_sync returns 'healthy' when server responds 200."""
|
||||
import urllib.request
|
||||
|
||||
from dashboard.routes.health import _check_vllm_sync
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.status = 200
|
||||
mock_response.__enter__ = lambda s: s
|
||||
mock_response.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
with patch.object(urllib.request, "urlopen", return_value=mock_response):
|
||||
result = _check_vllm_sync()
|
||||
|
||||
assert result.status == "healthy"
|
||||
assert result.name == "vLLM"
|
||||
|
||||
def test_sync_returns_unavailable_on_connection_error(self):
|
||||
"""_check_vllm_sync returns 'unavailable' when server is unreachable."""
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
|
||||
from dashboard.routes.health import _check_vllm_sync
|
||||
|
||||
with patch.object(urllib.request, "urlopen", side_effect=urllib.error.URLError("refused")):
|
||||
result = _check_vllm_sync()
|
||||
|
||||
assert result.status == "unavailable"
|
||||
assert result.name == "vLLM"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_caches_result(self):
|
||||
"""_check_vllm caches the result for _VLLM_CACHE_TTL seconds."""
|
||||
import dashboard.routes.health as health_module
|
||||
from dashboard.routes.health import _check_vllm
|
||||
|
||||
# Reset cache
|
||||
health_module._vllm_cache = None
|
||||
health_module._vllm_cache_ts = 0.0
|
||||
|
||||
mock_dep = MagicMock()
|
||||
mock_dep.status = "healthy"
|
||||
|
||||
with patch("dashboard.routes.health._check_vllm_sync", return_value=mock_dep):
|
||||
result1 = await _check_vllm()
|
||||
result2 = await _check_vllm() # should hit cache
|
||||
|
||||
assert result1 is result2 # same object returned from cache
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_refreshes_after_ttl(self):
|
||||
"""_check_vllm refreshes the cache after the TTL expires."""
|
||||
import dashboard.routes.health as health_module
|
||||
from dashboard.routes.health import _VLLM_CACHE_TTL, _check_vllm
|
||||
|
||||
# Expire the cache
|
||||
health_module._vllm_cache = None
|
||||
health_module._vllm_cache_ts = time.monotonic() - _VLLM_CACHE_TTL - 1
|
||||
|
||||
mock_dep = MagicMock()
|
||||
mock_dep.status = "unavailable"
|
||||
|
||||
with patch("dashboard.routes.health._check_vllm_sync", return_value=mock_dep) as mock_fn:
|
||||
await _check_vllm()
|
||||
|
||||
mock_fn.assert_called_once()
|
||||
0
tests/sovereignty/__init__.py
Normal file
0
tests/sovereignty/__init__.py
Normal file
379
tests/sovereignty/test_perception_cache.py
Normal file
379
tests/sovereignty/test_perception_cache.py
Normal file
@@ -0,0 +1,379 @@
|
||||
"""Tests for the sovereignty perception cache (template matching).
|
||||
|
||||
Refs: #1261
|
||||
"""
|
||||
|
||||
import json
|
||||
from unittest.mock import patch
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
class TestTemplate:
|
||||
"""Tests for the Template dataclass."""
|
||||
|
||||
def test_template_default_values(self):
|
||||
"""Template dataclass has correct defaults."""
|
||||
from timmy.sovereignty.perception_cache import Template
|
||||
|
||||
image = np.array([[1, 2], [3, 4]])
|
||||
template = Template(name="test_template", image=image)
|
||||
|
||||
assert template.name == "test_template"
|
||||
assert np.array_equal(template.image, image)
|
||||
assert template.threshold == 0.85
|
||||
|
||||
def test_template_custom_threshold(self):
|
||||
"""Template can have custom threshold."""
|
||||
from timmy.sovereignty.perception_cache import Template
|
||||
|
||||
image = np.array([[1, 2], [3, 4]])
|
||||
template = Template(name="test_template", image=image, threshold=0.95)
|
||||
|
||||
assert template.threshold == 0.95
|
||||
|
||||
|
||||
class TestCacheResult:
|
||||
"""Tests for the CacheResult dataclass."""
|
||||
|
||||
def test_cache_result_with_state(self):
|
||||
"""CacheResult stores confidence and state."""
|
||||
from timmy.sovereignty.perception_cache import CacheResult
|
||||
|
||||
result = CacheResult(confidence=0.92, state={"template_name": "test"})
|
||||
assert result.confidence == 0.92
|
||||
assert result.state == {"template_name": "test"}
|
||||
|
||||
def test_cache_result_no_state(self):
|
||||
"""CacheResult can have None state."""
|
||||
from timmy.sovereignty.perception_cache import CacheResult
|
||||
|
||||
result = CacheResult(confidence=0.5, state=None)
|
||||
assert result.confidence == 0.5
|
||||
assert result.state is None
|
||||
|
||||
|
||||
class TestPerceptionCacheInit:
|
||||
"""Tests for PerceptionCache initialization."""
|
||||
|
||||
def test_init_creates_empty_cache_when_no_file(self, tmp_path):
|
||||
"""Cache initializes empty when templates file doesn't exist."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache
|
||||
|
||||
templates_path = tmp_path / "nonexistent_templates.json"
|
||||
cache = PerceptionCache(templates_path=templates_path)
|
||||
|
||||
assert cache.templates_path == templates_path
|
||||
assert cache.templates == []
|
||||
|
||||
def test_init_loads_existing_templates(self, tmp_path):
|
||||
"""Cache loads templates from existing JSON file."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache
|
||||
|
||||
templates_path = tmp_path / "templates.json"
|
||||
templates_data = [
|
||||
{"name": "template1", "threshold": 0.85},
|
||||
{"name": "template2", "threshold": 0.90},
|
||||
]
|
||||
with open(templates_path, "w") as f:
|
||||
json.dump(templates_data, f)
|
||||
|
||||
cache = PerceptionCache(templates_path=templates_path)
|
||||
|
||||
assert len(cache.templates) == 2
|
||||
assert cache.templates[0].name == "template1"
|
||||
assert cache.templates[0].threshold == 0.85
|
||||
assert cache.templates[1].name == "template2"
|
||||
assert cache.templates[1].threshold == 0.90
|
||||
|
||||
def test_init_with_string_path(self, tmp_path):
|
||||
"""Cache accepts string path for templates."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache
|
||||
|
||||
templates_path = str(tmp_path / "templates.json")
|
||||
cache = PerceptionCache(templates_path=templates_path)
|
||||
|
||||
assert str(cache.templates_path) == templates_path
|
||||
|
||||
|
||||
class TestPerceptionCacheMatch:
|
||||
"""Tests for PerceptionCache.match() template matching."""
|
||||
|
||||
def test_match_no_templates_returns_low_confidence(self, tmp_path):
|
||||
"""Matching with no templates returns low confidence and None state."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache
|
||||
|
||||
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
|
||||
screenshot = np.array([[1, 2], [3, 4]])
|
||||
|
||||
result = cache.match(screenshot)
|
||||
|
||||
assert result.confidence == 0.0
|
||||
assert result.state is None
|
||||
|
||||
@patch("timmy.sovereignty.perception_cache.cv2")
|
||||
def test_match_finds_best_template(self, mock_cv2, tmp_path):
|
||||
"""Match returns the best matching template above threshold."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
# Setup mock cv2 behavior
|
||||
mock_cv2.matchTemplate.return_value = np.array([[0.5, 0.6], [0.7, 0.8]])
|
||||
mock_cv2.TM_CCOEFF_NORMED = "TM_CCOEFF_NORMED"
|
||||
mock_cv2.minMaxLoc.return_value = (None, 0.92, None, None)
|
||||
|
||||
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
|
||||
template = Template(name="best_match", image=np.array([[1, 2], [3, 4]]))
|
||||
cache.add([template])
|
||||
|
||||
screenshot = np.array([[5, 6], [7, 8]])
|
||||
result = cache.match(screenshot)
|
||||
|
||||
assert result.confidence == 0.92
|
||||
assert result.state == {"template_name": "best_match"}
|
||||
|
||||
@patch("timmy.sovereignty.perception_cache.cv2")
|
||||
def test_match_respects_global_threshold(self, mock_cv2, tmp_path):
|
||||
"""Match returns None state when confidence is below threshold."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
# Setup mock cv2 to return confidence below 0.85 threshold
|
||||
mock_cv2.matchTemplate.return_value = np.array([[0.1, 0.2], [0.3, 0.4]])
|
||||
mock_cv2.TM_CCOEFF_NORMED = "TM_CCOEFF_NORMED"
|
||||
mock_cv2.minMaxLoc.return_value = (None, 0.75, None, None)
|
||||
|
||||
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
|
||||
template = Template(name="low_match", image=np.array([[1, 2], [3, 4]]))
|
||||
cache.add([template])
|
||||
|
||||
screenshot = np.array([[5, 6], [7, 8]])
|
||||
result = cache.match(screenshot)
|
||||
|
||||
# Confidence is recorded but state is None (below threshold)
|
||||
assert result.confidence == 0.75
|
||||
assert result.state is None
|
||||
|
||||
@patch("timmy.sovereignty.perception_cache.cv2")
|
||||
def test_match_selects_highest_confidence(self, mock_cv2, tmp_path):
|
||||
"""Match selects template with highest confidence across all templates."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
mock_cv2.TM_CCOEFF_NORMED = "TM_CCOEFF_NORMED"
|
||||
|
||||
# Each template will return a different confidence
|
||||
mock_cv2.minMaxLoc.side_effect = [
|
||||
(None, 0.70, None, None), # template1
|
||||
(None, 0.95, None, None), # template2 (best)
|
||||
(None, 0.80, None, None), # template3
|
||||
]
|
||||
|
||||
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
|
||||
templates = [
|
||||
Template(name="template1", image=np.array([[1, 2], [3, 4]])),
|
||||
Template(name="template2", image=np.array([[5, 6], [7, 8]])),
|
||||
Template(name="template3", image=np.array([[9, 10], [11, 12]])),
|
||||
]
|
||||
cache.add(templates)
|
||||
|
||||
screenshot = np.array([[13, 14], [15, 16]])
|
||||
result = cache.match(screenshot)
|
||||
|
||||
assert result.confidence == 0.95
|
||||
assert result.state == {"template_name": "template2"}
|
||||
|
||||
@patch("timmy.sovereignty.perception_cache.cv2")
|
||||
def test_match_exactly_at_threshold(self, mock_cv2, tmp_path):
|
||||
"""Match returns state when confidence is exactly at threshold boundary."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
mock_cv2.matchTemplate.return_value = np.array([[0.1]])
|
||||
mock_cv2.TM_CCOEFF_NORMED = "TM_CCOEFF_NORMED"
|
||||
mock_cv2.minMaxLoc.return_value = (None, 0.85, None, None) # Exactly at threshold
|
||||
|
||||
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
|
||||
template = Template(name="threshold_match", image=np.array([[1, 2], [3, 4]]))
|
||||
cache.add([template])
|
||||
|
||||
screenshot = np.array([[5, 6], [7, 8]])
|
||||
result = cache.match(screenshot)
|
||||
|
||||
# Note: current implementation uses > 0.85, so exactly 0.85 returns None state
|
||||
assert result.confidence == 0.85
|
||||
assert result.state is None
|
||||
|
||||
@patch("timmy.sovereignty.perception_cache.cv2")
|
||||
def test_match_just_above_threshold(self, mock_cv2, tmp_path):
|
||||
"""Match returns state when confidence is just above threshold."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
mock_cv2.matchTemplate.return_value = np.array([[0.1]])
|
||||
mock_cv2.TM_CCOEFF_NORMED = "TM_CCOEFF_NORMED"
|
||||
mock_cv2.minMaxLoc.return_value = (None, 0.851, None, None) # Just above threshold
|
||||
|
||||
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
|
||||
template = Template(name="above_threshold", image=np.array([[1, 2], [3, 4]]))
|
||||
cache.add([template])
|
||||
|
||||
screenshot = np.array([[5, 6], [7, 8]])
|
||||
result = cache.match(screenshot)
|
||||
|
||||
assert result.confidence == 0.851
|
||||
assert result.state == {"template_name": "above_threshold"}
|
||||
|
||||
|
||||
class TestPerceptionCacheAdd:
|
||||
"""Tests for PerceptionCache.add() method."""
|
||||
|
||||
def test_add_single_template(self, tmp_path):
|
||||
"""Can add a single template to the cache."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
|
||||
template = Template(name="new_template", image=np.array([[1, 2], [3, 4]]))
|
||||
|
||||
cache.add([template])
|
||||
|
||||
assert len(cache.templates) == 1
|
||||
assert cache.templates[0].name == "new_template"
|
||||
|
||||
def test_add_multiple_templates(self, tmp_path):
|
||||
"""Can add multiple templates at once."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
|
||||
templates = [
|
||||
Template(name="template1", image=np.array([[1, 2], [3, 4]])),
|
||||
Template(name="template2", image=np.array([[5, 6], [7, 8]])),
|
||||
]
|
||||
|
||||
cache.add(templates)
|
||||
|
||||
assert len(cache.templates) == 2
|
||||
assert cache.templates[0].name == "template1"
|
||||
assert cache.templates[1].name == "template2"
|
||||
|
||||
def test_add_templates_accumulate(self, tmp_path):
|
||||
"""Adding templates multiple times accumulates them."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
cache = PerceptionCache(templates_path=tmp_path / "templates.json")
|
||||
cache.add([Template(name="first", image=np.array([[1]]))])
|
||||
cache.add([Template(name="second", image=np.array([[2]]))])
|
||||
|
||||
assert len(cache.templates) == 2
|
||||
|
||||
|
||||
class TestPerceptionCachePersist:
|
||||
"""Tests for PerceptionCache.persist() method."""
|
||||
|
||||
def test_persist_creates_file(self, tmp_path):
|
||||
"""Persist creates templates JSON file."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
templates_path = tmp_path / "subdir" / "templates.json"
|
||||
cache = PerceptionCache(templates_path=templates_path)
|
||||
cache.add([Template(name="persisted", image=np.array([[1, 2], [3, 4]]))])
|
||||
|
||||
cache.persist()
|
||||
|
||||
assert templates_path.exists()
|
||||
|
||||
def test_persist_stores_template_names(self, tmp_path):
|
||||
"""Persist stores template names and thresholds."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
templates_path = tmp_path / "templates.json"
|
||||
cache = PerceptionCache(templates_path=templates_path)
|
||||
cache.add([
|
||||
Template(name="template1", image=np.array([[1]]), threshold=0.85),
|
||||
Template(name="template2", image=np.array([[2]]), threshold=0.90),
|
||||
])
|
||||
|
||||
cache.persist()
|
||||
|
||||
with open(templates_path) as f:
|
||||
data = json.load(f)
|
||||
|
||||
assert len(data) == 2
|
||||
assert data[0]["name"] == "template1"
|
||||
assert data[0]["threshold"] == 0.85
|
||||
assert data[1]["name"] == "template2"
|
||||
assert data[1]["threshold"] == 0.90
|
||||
|
||||
def test_persist_does_not_store_image_data(self, tmp_path):
|
||||
"""Persist only stores metadata, not actual image arrays."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache, Template
|
||||
|
||||
templates_path = tmp_path / "templates.json"
|
||||
cache = PerceptionCache(templates_path=templates_path)
|
||||
cache.add([Template(name="no_image", image=np.array([[1, 2, 3], [4, 5, 6]]))])
|
||||
|
||||
cache.persist()
|
||||
|
||||
with open(templates_path) as f:
|
||||
data = json.load(f)
|
||||
|
||||
assert "image" not in data[0]
|
||||
assert set(data[0].keys()) == {"name", "threshold"}
|
||||
|
||||
|
||||
class TestPerceptionCacheLoad:
|
||||
"""Tests for PerceptionCache.load() method."""
|
||||
|
||||
def test_load_from_existing_file(self, tmp_path):
|
||||
"""Load restores templates from persisted file."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache
|
||||
|
||||
templates_path = tmp_path / "templates.json"
|
||||
|
||||
# Create initial cache with templates and persist
|
||||
cache1 = PerceptionCache(templates_path=templates_path)
|
||||
from timmy.sovereignty.perception_cache import Template
|
||||
|
||||
cache1.add([Template(name="loaded", image=np.array([[1]]), threshold=0.88)])
|
||||
cache1.persist()
|
||||
|
||||
# Create new cache instance that loads from same file
|
||||
cache2 = PerceptionCache(templates_path=templates_path)
|
||||
|
||||
assert len(cache2.templates) == 1
|
||||
assert cache2.templates[0].name == "loaded"
|
||||
assert cache2.templates[0].threshold == 0.88
|
||||
# Note: images are loaded as empty arrays per current implementation
|
||||
assert cache2.templates[0].image.size == 0
|
||||
|
||||
def test_load_empty_file(self, tmp_path):
|
||||
"""Load handles empty template list in file."""
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache
|
||||
|
||||
templates_path = tmp_path / "templates.json"
|
||||
with open(templates_path, "w") as f:
|
||||
json.dump([], f)
|
||||
|
||||
cache = PerceptionCache(templates_path=templates_path)
|
||||
|
||||
assert cache.templates == []
|
||||
|
||||
|
||||
class TestCrystallizePerception:
|
||||
"""Tests for crystallize_perception function."""
|
||||
|
||||
def test_crystallize_returns_empty_list(self, tmp_path):
|
||||
"""crystallize_perception currently returns empty list (placeholder)."""
|
||||
from timmy.sovereignty.perception_cache import crystallize_perception
|
||||
|
||||
screenshot = np.array([[1, 2], [3, 4]])
|
||||
result = crystallize_perception(screenshot, {"some": "response"})
|
||||
|
||||
assert result == []
|
||||
|
||||
def test_crystallize_accepts_any_vlm_response(self, tmp_path):
|
||||
"""crystallize_perception accepts any vlm_response format."""
|
||||
from timmy.sovereignty.perception_cache import crystallize_perception
|
||||
|
||||
screenshot = np.array([[1, 2], [3, 4]])
|
||||
|
||||
# Test with various response types
|
||||
assert crystallize_perception(screenshot, None) == []
|
||||
assert crystallize_perception(screenshot, {}) == []
|
||||
assert crystallize_perception(screenshot, {"items": []}) == []
|
||||
assert crystallize_perception(screenshot, "string response") == []
|
||||
643
tests/timmy/test_kimi_delegation.py
Normal file
643
tests/timmy/test_kimi_delegation.py
Normal file
@@ -0,0 +1,643 @@
|
||||
"""Unit tests for timmy.kimi_delegation — Kimi research delegation pipeline."""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# exceeds_local_capacity
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExceedsLocalCapacity:
|
||||
def test_heavy_keyword_triggers_delegation(self):
|
||||
from timmy.kimi_delegation import exceeds_local_capacity
|
||||
|
||||
assert exceeds_local_capacity("Do a comprehensive review of the codebase") is True
|
||||
|
||||
def test_all_heavy_keywords_detected(self):
|
||||
from timmy.kimi_delegation import _HEAVY_RESEARCH_KEYWORDS, exceeds_local_capacity
|
||||
|
||||
for kw in _HEAVY_RESEARCH_KEYWORDS:
|
||||
assert exceeds_local_capacity(f"Please {kw} the topic") is True, f"Missed keyword: {kw}"
|
||||
|
||||
def test_long_task_triggers_delegation(self):
|
||||
from timmy.kimi_delegation import _HEAVY_WORD_THRESHOLD, exceeds_local_capacity
|
||||
|
||||
long_task = " ".join(["word"] * (_HEAVY_WORD_THRESHOLD + 1))
|
||||
assert exceeds_local_capacity(long_task) is True
|
||||
|
||||
def test_short_simple_task_returns_false(self):
|
||||
from timmy.kimi_delegation import exceeds_local_capacity
|
||||
|
||||
assert exceeds_local_capacity("Fix the typo in README") is False
|
||||
|
||||
def test_exactly_at_word_threshold_triggers(self):
|
||||
from timmy.kimi_delegation import _HEAVY_WORD_THRESHOLD, exceeds_local_capacity
|
||||
|
||||
task = " ".join(["word"] * _HEAVY_WORD_THRESHOLD)
|
||||
assert exceeds_local_capacity(task) is True
|
||||
|
||||
def test_keyword_case_insensitive(self):
|
||||
from timmy.kimi_delegation import exceeds_local_capacity
|
||||
|
||||
assert exceeds_local_capacity("Run a COMPREHENSIVE analysis") is True
|
||||
|
||||
def test_empty_string_returns_false(self):
|
||||
from timmy.kimi_delegation import exceeds_local_capacity
|
||||
|
||||
assert exceeds_local_capacity("") is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _slugify
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSlugify:
|
||||
def test_basic_text(self):
|
||||
from timmy.kimi_delegation import _slugify
|
||||
|
||||
assert _slugify("Hello World") == "hello-world"
|
||||
|
||||
def test_special_characters_removed(self):
|
||||
from timmy.kimi_delegation import _slugify
|
||||
|
||||
assert _slugify("Research: AI & ML!") == "research-ai--ml"
|
||||
|
||||
def test_underscores_become_dashes(self):
|
||||
from timmy.kimi_delegation import _slugify
|
||||
|
||||
assert _slugify("some_snake_case") == "some-snake-case"
|
||||
|
||||
def test_long_text_truncated_to_60(self):
|
||||
from timmy.kimi_delegation import _slugify
|
||||
|
||||
long_text = "a" * 100
|
||||
result = _slugify(long_text)
|
||||
assert len(result) <= 60
|
||||
|
||||
def test_leading_trailing_dashes_stripped(self):
|
||||
from timmy.kimi_delegation import _slugify
|
||||
|
||||
result = _slugify(" hello ")
|
||||
assert not result.startswith("-")
|
||||
assert not result.endswith("-")
|
||||
|
||||
def test_multiple_spaces_become_single_dash(self):
|
||||
from timmy.kimi_delegation import _slugify
|
||||
|
||||
assert _slugify("one two") == "one-two"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _build_research_template
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBuildResearchTemplate:
|
||||
def test_contains_task_title(self):
|
||||
from timmy.kimi_delegation import _build_research_template
|
||||
|
||||
body = _build_research_template("My Task", "background", "the question?")
|
||||
assert "My Task" in body
|
||||
|
||||
def test_contains_question(self):
|
||||
from timmy.kimi_delegation import _build_research_template
|
||||
|
||||
body = _build_research_template("task", "context", "What is X?")
|
||||
assert "What is X?" in body
|
||||
|
||||
def test_contains_context(self):
|
||||
from timmy.kimi_delegation import _build_research_template
|
||||
|
||||
body = _build_research_template("task", "some context here", "q?")
|
||||
assert "some context here" in body
|
||||
|
||||
def test_default_priority_normal(self):
|
||||
from timmy.kimi_delegation import _build_research_template
|
||||
|
||||
body = _build_research_template("task", "ctx", "q?")
|
||||
assert "normal" in body
|
||||
|
||||
def test_custom_priority_included(self):
|
||||
from timmy.kimi_delegation import _build_research_template
|
||||
|
||||
body = _build_research_template("task", "ctx", "q?", priority="high")
|
||||
assert "high" in body
|
||||
|
||||
def test_kimi_label_mentioned(self):
|
||||
from timmy.kimi_delegation import KIMI_READY_LABEL, _build_research_template
|
||||
|
||||
body = _build_research_template("task", "ctx", "q?")
|
||||
assert KIMI_READY_LABEL in body
|
||||
|
||||
def test_slugified_task_in_artifact_path(self):
|
||||
from timmy.kimi_delegation import _build_research_template
|
||||
|
||||
body = _build_research_template("My Research Task", "ctx", "q?")
|
||||
assert "my-research-task" in body
|
||||
|
||||
def test_sections_present(self):
|
||||
from timmy.kimi_delegation import _build_research_template
|
||||
|
||||
body = _build_research_template("task", "ctx", "q?")
|
||||
assert "## Research Request" in body
|
||||
assert "### Research Question" in body
|
||||
assert "### Background / Context" in body
|
||||
assert "### Deliverables" in body
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _extract_action_items
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExtractActionItems:
|
||||
def test_checkbox_items_extracted(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
text = "- [ ] Fix the bug\n- [ ] Write tests\n"
|
||||
items = _extract_action_items(text)
|
||||
assert "Fix the bug" in items
|
||||
assert "Write tests" in items
|
||||
|
||||
def test_numbered_list_extracted(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
text = "1. Deploy to staging\n2. Run smoke tests\n"
|
||||
items = _extract_action_items(text)
|
||||
assert "Deploy to staging" in items
|
||||
assert "Run smoke tests" in items
|
||||
|
||||
def test_action_prefix_extracted(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
text = "Action: Update the config file\n"
|
||||
items = _extract_action_items(text)
|
||||
assert "Update the config file" in items
|
||||
|
||||
def test_todo_prefix_extracted(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
text = "TODO: Add error handling\n"
|
||||
items = _extract_action_items(text)
|
||||
assert "Add error handling" in items
|
||||
|
||||
def test_next_step_prefix_extracted(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
text = "Next step: Validate results\n"
|
||||
items = _extract_action_items(text)
|
||||
assert "Validate results" in items
|
||||
|
||||
def test_case_insensitive_prefixes(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
text = "todo: lowercase todo\nACTION: uppercase action\n"
|
||||
items = _extract_action_items(text)
|
||||
assert "lowercase todo" in items
|
||||
assert "uppercase action" in items
|
||||
|
||||
def test_deduplication(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
text = "1. Do the thing\n2. Do the thing\n"
|
||||
items = _extract_action_items(text)
|
||||
assert items.count("Do the thing") == 1
|
||||
|
||||
def test_empty_text_returns_empty_list(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
assert _extract_action_items("") == []
|
||||
|
||||
def test_no_action_items_returns_empty_list(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
text = "This is just plain prose with no action items here."
|
||||
assert _extract_action_items(text) == []
|
||||
|
||||
def test_mixed_sources_combined(self):
|
||||
from timmy.kimi_delegation import _extract_action_items
|
||||
|
||||
text = "- [ ] checkbox item\n1. numbered item\nAction: action item\n"
|
||||
items = _extract_action_items(text)
|
||||
assert len(items) == 3
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _get_or_create_label (async)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetOrCreateLabel:
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_existing_label_id(self):
|
||||
from timmy.kimi_delegation import KIMI_READY_LABEL, _get_or_create_label
|
||||
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 200
|
||||
mock_resp.json.return_value = [{"name": KIMI_READY_LABEL, "id": 42}]
|
||||
|
||||
client = MagicMock()
|
||||
client.get = AsyncMock(return_value=mock_resp)
|
||||
|
||||
result = await _get_or_create_label(client, "http://git", {"Authorization": "token x"}, "owner/repo")
|
||||
assert result == 42
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_creates_label_when_missing(self):
|
||||
from timmy.kimi_delegation import _get_or_create_label
|
||||
|
||||
list_resp = MagicMock()
|
||||
list_resp.status_code = 200
|
||||
list_resp.json.return_value = [] # no existing labels
|
||||
|
||||
create_resp = MagicMock()
|
||||
create_resp.status_code = 201
|
||||
create_resp.json.return_value = {"id": 99}
|
||||
|
||||
client = MagicMock()
|
||||
client.get = AsyncMock(return_value=list_resp)
|
||||
client.post = AsyncMock(return_value=create_resp)
|
||||
|
||||
result = await _get_or_create_label(client, "http://git", {"Authorization": "token x"}, "owner/repo")
|
||||
assert result == 99
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_none_on_list_exception(self):
|
||||
from timmy.kimi_delegation import _get_or_create_label
|
||||
|
||||
client = MagicMock()
|
||||
client.get = AsyncMock(side_effect=Exception("network error"))
|
||||
|
||||
result = await _get_or_create_label(client, "http://git", {}, "owner/repo")
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_none_on_create_exception(self):
|
||||
from timmy.kimi_delegation import _get_or_create_label
|
||||
|
||||
list_resp = MagicMock()
|
||||
list_resp.status_code = 200
|
||||
list_resp.json.return_value = []
|
||||
|
||||
client = MagicMock()
|
||||
client.get = AsyncMock(return_value=list_resp)
|
||||
client.post = AsyncMock(side_effect=Exception("create failed"))
|
||||
|
||||
result = await _get_or_create_label(client, "http://git", {}, "owner/repo")
|
||||
assert result is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# create_kimi_research_issue (async)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCreateKimiResearchIssue:
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_error_when_gitea_disabled(self):
|
||||
from timmy.kimi_delegation import create_kimi_research_issue
|
||||
|
||||
with patch("timmy.kimi_delegation.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
result = await create_kimi_research_issue("task", "ctx", "q?")
|
||||
|
||||
assert result["success"] is False
|
||||
assert "not configured" in result["error"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_error_when_no_token(self):
|
||||
from timmy.kimi_delegation import create_kimi_research_issue
|
||||
|
||||
with patch("timmy.kimi_delegation.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = ""
|
||||
result = await create_kimi_research_issue("task", "ctx", "q?")
|
||||
|
||||
assert result["success"] is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_successful_issue_creation(self):
|
||||
from timmy.kimi_delegation import create_kimi_research_issue
|
||||
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok"
|
||||
mock_settings.gitea_url = "http://git"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
|
||||
label_resp = MagicMock()
|
||||
label_resp.status_code = 200
|
||||
label_resp.json.return_value = [{"name": "kimi-ready", "id": 5}]
|
||||
|
||||
issue_resp = MagicMock()
|
||||
issue_resp.status_code = 201
|
||||
issue_resp.json.return_value = {"number": 42, "html_url": "http://git/issues/42"}
|
||||
|
||||
async_client = AsyncMock()
|
||||
async_client.get = AsyncMock(return_value=label_resp)
|
||||
async_client.post = AsyncMock(return_value=issue_resp)
|
||||
async_client.__aenter__ = AsyncMock(return_value=async_client)
|
||||
async_client.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with (
|
||||
patch("timmy.kimi_delegation.settings", mock_settings),
|
||||
patch("timmy.kimi_delegation.httpx") as mock_httpx,
|
||||
):
|
||||
mock_httpx.AsyncClient.return_value = async_client
|
||||
result = await create_kimi_research_issue("task", "ctx", "q?")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["issue_number"] == 42
|
||||
assert "http://git/issues/42" in result["issue_url"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_api_error_returns_failure(self):
|
||||
from timmy.kimi_delegation import create_kimi_research_issue
|
||||
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok"
|
||||
mock_settings.gitea_url = "http://git"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
|
||||
label_resp = MagicMock()
|
||||
label_resp.status_code = 200
|
||||
label_resp.json.return_value = []
|
||||
|
||||
create_label_resp = MagicMock()
|
||||
create_label_resp.status_code = 201
|
||||
create_label_resp.json.return_value = {"id": 1}
|
||||
|
||||
issue_resp = MagicMock()
|
||||
issue_resp.status_code = 500
|
||||
issue_resp.text = "Internal Server Error"
|
||||
|
||||
async_client = AsyncMock()
|
||||
async_client.get = AsyncMock(return_value=label_resp)
|
||||
async_client.post = AsyncMock(side_effect=[create_label_resp, issue_resp])
|
||||
async_client.__aenter__ = AsyncMock(return_value=async_client)
|
||||
async_client.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with (
|
||||
patch("timmy.kimi_delegation.settings", mock_settings),
|
||||
patch("timmy.kimi_delegation.httpx") as mock_httpx,
|
||||
):
|
||||
mock_httpx.AsyncClient.return_value = async_client
|
||||
result = await create_kimi_research_issue("task", "ctx", "q?")
|
||||
|
||||
assert result["success"] is False
|
||||
assert "500" in result["error"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_exception_returns_failure(self):
|
||||
from timmy.kimi_delegation import create_kimi_research_issue
|
||||
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok"
|
||||
mock_settings.gitea_url = "http://git"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
|
||||
async_client = AsyncMock()
|
||||
async_client.__aenter__ = AsyncMock(side_effect=Exception("connection refused"))
|
||||
async_client.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with (
|
||||
patch("timmy.kimi_delegation.settings", mock_settings),
|
||||
patch("timmy.kimi_delegation.httpx") as mock_httpx,
|
||||
):
|
||||
mock_httpx.AsyncClient.return_value = async_client
|
||||
result = await create_kimi_research_issue("task", "ctx", "q?")
|
||||
|
||||
assert result["success"] is False
|
||||
assert result["error"] != ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# poll_kimi_issue (async)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPollKimiIssue:
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_error_when_gitea_not_configured(self):
|
||||
from timmy.kimi_delegation import poll_kimi_issue
|
||||
|
||||
with patch("timmy.kimi_delegation.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
result = await poll_kimi_issue(123)
|
||||
|
||||
assert result["completed"] is False
|
||||
assert "not configured" in result["error"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_completed_when_issue_closed(self):
|
||||
from timmy.kimi_delegation import poll_kimi_issue
|
||||
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok"
|
||||
mock_settings.gitea_url = "http://git"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
|
||||
resp = MagicMock()
|
||||
resp.status_code = 200
|
||||
resp.json.return_value = {"state": "closed", "body": "Done!"}
|
||||
|
||||
async_client = AsyncMock()
|
||||
async_client.get = AsyncMock(return_value=resp)
|
||||
async_client.__aenter__ = AsyncMock(return_value=async_client)
|
||||
async_client.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with (
|
||||
patch("timmy.kimi_delegation.settings", mock_settings),
|
||||
patch("timmy.kimi_delegation.httpx") as mock_httpx,
|
||||
):
|
||||
mock_httpx.AsyncClient.return_value = async_client
|
||||
result = await poll_kimi_issue(42, poll_interval=0, max_wait=1)
|
||||
|
||||
assert result["completed"] is True
|
||||
assert result["state"] == "closed"
|
||||
assert result["body"] == "Done!"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_times_out_when_issue_stays_open(self):
|
||||
from timmy.kimi_delegation import poll_kimi_issue
|
||||
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok"
|
||||
mock_settings.gitea_url = "http://git"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
|
||||
resp = MagicMock()
|
||||
resp.status_code = 200
|
||||
resp.json.return_value = {"state": "open", "body": ""}
|
||||
|
||||
async_client = AsyncMock()
|
||||
async_client.get = AsyncMock(return_value=resp)
|
||||
async_client.__aenter__ = AsyncMock(return_value=async_client)
|
||||
async_client.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with (
|
||||
patch("timmy.kimi_delegation.settings", mock_settings),
|
||||
patch("timmy.kimi_delegation.httpx") as mock_httpx,
|
||||
patch("timmy.kimi_delegation.asyncio.sleep", new_callable=AsyncMock),
|
||||
):
|
||||
mock_httpx.AsyncClient.return_value = async_client
|
||||
# poll_interval > max_wait so it exits immediately after first sleep
|
||||
result = await poll_kimi_issue(42, poll_interval=10, max_wait=5)
|
||||
|
||||
assert result["completed"] is False
|
||||
assert result["state"] == "timeout"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# index_kimi_artifact (async)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestIndexKimiArtifact:
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_artifact_returns_error(self):
|
||||
from timmy.kimi_delegation import index_kimi_artifact
|
||||
|
||||
result = await index_kimi_artifact(1, "title", " ")
|
||||
assert result["success"] is False
|
||||
assert "Empty artifact" in result["error"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_successful_indexing(self):
|
||||
from timmy.kimi_delegation import index_kimi_artifact
|
||||
|
||||
mock_entry = MagicMock()
|
||||
mock_entry.id = "mem-123"
|
||||
|
||||
with patch("timmy.kimi_delegation.asyncio.to_thread", new_callable=AsyncMock) as mock_thread:
|
||||
mock_thread.return_value = mock_entry
|
||||
result = await index_kimi_artifact(42, "My Research", "Some research content here")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["memory_id"] == "mem-123"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_exception_returns_failure(self):
|
||||
from timmy.kimi_delegation import index_kimi_artifact
|
||||
|
||||
with patch("timmy.kimi_delegation.asyncio.to_thread", new_callable=AsyncMock) as mock_thread:
|
||||
mock_thread.side_effect = Exception("DB error")
|
||||
result = await index_kimi_artifact(42, "title", "some content")
|
||||
|
||||
assert result["success"] is False
|
||||
assert result["error"] != ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# extract_and_create_followups (async)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExtractAndCreateFollowups:
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_action_items_returns_empty_created(self):
|
||||
from timmy.kimi_delegation import extract_and_create_followups
|
||||
|
||||
result = await extract_and_create_followups("Plain prose, nothing to do.", 1)
|
||||
assert result["success"] is True
|
||||
assert result["created"] == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gitea_not_configured_returns_error(self):
|
||||
from timmy.kimi_delegation import extract_and_create_followups
|
||||
|
||||
text = "1. Do something important\n"
|
||||
|
||||
with patch("timmy.kimi_delegation.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
result = await extract_and_create_followups(text, 5)
|
||||
|
||||
assert result["success"] is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_creates_followup_issues(self):
|
||||
from timmy.kimi_delegation import extract_and_create_followups
|
||||
|
||||
text = "1. Deploy the service\n2. Run integration tests\n"
|
||||
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok"
|
||||
mock_settings.gitea_url = "http://git"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
|
||||
issue_resp = MagicMock()
|
||||
issue_resp.status_code = 201
|
||||
issue_resp.json.return_value = {"number": 10}
|
||||
|
||||
async_client = AsyncMock()
|
||||
async_client.post = AsyncMock(return_value=issue_resp)
|
||||
async_client.__aenter__ = AsyncMock(return_value=async_client)
|
||||
async_client.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with (
|
||||
patch("timmy.kimi_delegation.settings", mock_settings),
|
||||
patch("timmy.kimi_delegation.httpx") as mock_httpx,
|
||||
):
|
||||
mock_httpx.AsyncClient.return_value = async_client
|
||||
result = await extract_and_create_followups(text, 5)
|
||||
|
||||
assert result["success"] is True
|
||||
assert len(result["created"]) == 2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# delegate_research_to_kimi (async)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDelegateResearchToKimi:
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_task_returns_error(self):
|
||||
from timmy.kimi_delegation import delegate_research_to_kimi
|
||||
|
||||
result = await delegate_research_to_kimi("", "ctx", "q?")
|
||||
assert result["success"] is False
|
||||
assert "required" in result["error"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_whitespace_task_returns_error(self):
|
||||
from timmy.kimi_delegation import delegate_research_to_kimi
|
||||
|
||||
result = await delegate_research_to_kimi(" ", "ctx", "q?")
|
||||
assert result["success"] is False
|
||||
assert "required" in result["error"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_question_returns_error(self):
|
||||
from timmy.kimi_delegation import delegate_research_to_kimi
|
||||
|
||||
result = await delegate_research_to_kimi("valid task", "ctx", "")
|
||||
assert result["success"] is False
|
||||
assert "required" in result["error"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delegates_to_create_issue(self):
|
||||
from timmy.kimi_delegation import delegate_research_to_kimi
|
||||
|
||||
with patch(
|
||||
"timmy.kimi_delegation.create_kimi_research_issue",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_create:
|
||||
mock_create.return_value = {"success": True, "issue_number": 7, "issue_url": "http://x", "error": None}
|
||||
result = await delegate_research_to_kimi("Research X", "ctx", "What is X?", priority="high")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["issue_number"] == 7
|
||||
mock_create.assert_awaited_once_with("Research X", "ctx", "What is X?", "high")
|
||||
667
tests/timmy/test_orchestration_loop.py
Normal file
667
tests/timmy/test_orchestration_loop.py
Normal file
@@ -0,0 +1,667 @@
|
||||
"""Tests for timmy.vassal.orchestration_loop — VassalOrchestrator core module.
|
||||
|
||||
Refs #1278
|
||||
"""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.vassal.orchestration_loop import VassalCycleRecord, VassalOrchestrator
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# VassalCycleRecord tests
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestVassalCycleRecord:
|
||||
"""Unit tests for the VassalCycleRecord dataclass."""
|
||||
|
||||
def test_creation_defaults(self):
|
||||
"""Test creating a cycle record with minimal fields."""
|
||||
record = VassalCycleRecord(
|
||||
cycle_id=1,
|
||||
started_at="2026-03-23T12:00:00+00:00",
|
||||
)
|
||||
assert record.cycle_id == 1
|
||||
assert record.started_at == "2026-03-23T12:00:00+00:00"
|
||||
assert record.finished_at == ""
|
||||
assert record.duration_ms == 0
|
||||
assert record.issues_fetched == 0
|
||||
assert record.issues_dispatched == 0
|
||||
assert record.stuck_agents == []
|
||||
assert record.house_warnings == []
|
||||
assert record.errors == []
|
||||
|
||||
def test_healthy_property_no_issues(self):
|
||||
"""Record is healthy when no errors or warnings."""
|
||||
record = VassalCycleRecord(
|
||||
cycle_id=1,
|
||||
started_at="2026-03-23T12:00:00+00:00",
|
||||
)
|
||||
assert record.healthy is True
|
||||
|
||||
def test_healthy_property_with_errors(self):
|
||||
"""Record is unhealthy when errors exist."""
|
||||
record = VassalCycleRecord(
|
||||
cycle_id=1,
|
||||
started_at="2026-03-23T12:00:00+00:00",
|
||||
errors=["backlog: Connection failed"],
|
||||
)
|
||||
assert record.healthy is False
|
||||
|
||||
def test_healthy_property_with_warnings(self):
|
||||
"""Record is unhealthy when house warnings exist."""
|
||||
record = VassalCycleRecord(
|
||||
cycle_id=1,
|
||||
started_at="2026-03-23T12:00:00+00:00",
|
||||
house_warnings=["Disk: 90% used"],
|
||||
)
|
||||
assert record.healthy is False
|
||||
|
||||
def test_full_populated_record(self):
|
||||
"""Test a fully populated cycle record."""
|
||||
record = VassalCycleRecord(
|
||||
cycle_id=5,
|
||||
started_at="2026-03-23T12:00:00+00:00",
|
||||
finished_at="2026-03-23T12:00:01+00:00",
|
||||
duration_ms=1000,
|
||||
issues_fetched=10,
|
||||
issues_dispatched=3,
|
||||
dispatched_to_claude=1,
|
||||
dispatched_to_kimi=1,
|
||||
dispatched_to_timmy=1,
|
||||
stuck_agents=["claude"],
|
||||
nudges_sent=1,
|
||||
house_warnings=[],
|
||||
cleanup_deleted=0,
|
||||
errors=[],
|
||||
)
|
||||
assert record.cycle_id == 5
|
||||
assert record.duration_ms == 1000
|
||||
assert record.healthy is True
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# VassalOrchestrator initialization tests
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestVassalOrchestratorInit:
|
||||
"""Tests for VassalOrchestrator initialization."""
|
||||
|
||||
def test_default_initialization(self):
|
||||
"""Test default initialization with no parameters."""
|
||||
orchestrator = VassalOrchestrator()
|
||||
assert orchestrator.cycle_count == 0
|
||||
assert orchestrator.is_running is False
|
||||
assert orchestrator.history == []
|
||||
assert orchestrator._max_dispatch == 10
|
||||
|
||||
def test_custom_interval(self):
|
||||
"""Test initialization with custom cycle interval."""
|
||||
orchestrator = VassalOrchestrator(cycle_interval=60.0)
|
||||
assert orchestrator._cycle_interval == 60.0
|
||||
|
||||
def test_custom_max_dispatch(self):
|
||||
"""Test initialization with custom max dispatch."""
|
||||
orchestrator = VassalOrchestrator(max_dispatch_per_cycle=5)
|
||||
assert orchestrator._max_dispatch == 5
|
||||
|
||||
def test_get_status_empty_history(self):
|
||||
"""Test get_status when no cycles have run."""
|
||||
orchestrator = VassalOrchestrator()
|
||||
status = orchestrator.get_status()
|
||||
assert status["running"] is False
|
||||
assert status["cycle_count"] == 0
|
||||
assert status["last_cycle"] is None
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Run cycle tests
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRunCycle:
|
||||
"""Tests for the run_cycle method."""
|
||||
|
||||
@pytest.fixture
|
||||
def orchestrator(self):
|
||||
"""Create a fresh orchestrator for each test."""
|
||||
return VassalOrchestrator()
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clear_dispatch_registry(self):
|
||||
"""Clear dispatch registry before each test."""
|
||||
from timmy.vassal.dispatch import clear_dispatch_registry
|
||||
|
||||
clear_dispatch_registry()
|
||||
yield
|
||||
clear_dispatch_registry()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_cycle_empty_backlog(self, orchestrator):
|
||||
"""Test a cycle with no issues to process."""
|
||||
with patch(
|
||||
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
|
||||
) as mock_broadcast:
|
||||
mock_broadcast.return_value = None
|
||||
with patch(
|
||||
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
|
||||
) as mock_fetch:
|
||||
mock_fetch.return_value = []
|
||||
record = await orchestrator.run_cycle()
|
||||
|
||||
assert record.cycle_id == 1
|
||||
assert record.issues_fetched == 0
|
||||
assert record.issues_dispatched == 0
|
||||
assert record.duration_ms >= 0
|
||||
assert record.finished_at != ""
|
||||
assert orchestrator.cycle_count == 1
|
||||
assert len(orchestrator.history) == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_cycle_dispatches_issues(self, orchestrator):
|
||||
"""Test dispatching issues to agents."""
|
||||
|
||||
mock_issue = {
|
||||
"number": 123,
|
||||
"title": "Test issue",
|
||||
"body": "Test body",
|
||||
"labels": [],
|
||||
"assignees": [],
|
||||
"html_url": "http://test/123",
|
||||
}
|
||||
|
||||
with patch(
|
||||
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
|
||||
) as mock_broadcast:
|
||||
mock_broadcast.return_value = None
|
||||
with patch(
|
||||
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
|
||||
) as mock_fetch:
|
||||
mock_fetch.return_value = [mock_issue]
|
||||
with patch(
|
||||
"timmy.vassal.dispatch.dispatch_issue", new_callable=AsyncMock
|
||||
) as mock_dispatch:
|
||||
mock_dispatch.return_value = MagicMock()
|
||||
record = await orchestrator.run_cycle()
|
||||
|
||||
assert record.cycle_id == 1
|
||||
assert record.issues_fetched == 1
|
||||
assert record.issues_dispatched == 1
|
||||
mock_dispatch.assert_awaited_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_cycle_respects_max_dispatch(self, orchestrator):
|
||||
"""Test that max_dispatch_per_cycle limits dispatches."""
|
||||
mock_issues = [
|
||||
{
|
||||
"number": i,
|
||||
"title": f"Issue {i}",
|
||||
"body": "Test",
|
||||
"labels": [],
|
||||
"assignees": [],
|
||||
"html_url": f"http://test/{i}",
|
||||
}
|
||||
for i in range(1, 15)
|
||||
]
|
||||
|
||||
orchestrator._max_dispatch = 3
|
||||
|
||||
with patch(
|
||||
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
|
||||
) as mock_broadcast:
|
||||
mock_broadcast.return_value = None
|
||||
with patch(
|
||||
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
|
||||
) as mock_fetch:
|
||||
mock_fetch.return_value = mock_issues
|
||||
with patch(
|
||||
"timmy.vassal.dispatch.dispatch_issue", new_callable=AsyncMock
|
||||
) as mock_dispatch:
|
||||
mock_dispatch.return_value = MagicMock()
|
||||
record = await orchestrator.run_cycle()
|
||||
|
||||
assert record.issues_fetched == 14
|
||||
assert record.issues_dispatched == 3
|
||||
assert mock_dispatch.await_count == 3
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_cycle_skips_already_dispatched(self, orchestrator):
|
||||
"""Test that already dispatched issues are skipped."""
|
||||
mock_issues = [
|
||||
{
|
||||
"number": 1,
|
||||
"title": "Issue 1",
|
||||
"body": "Test",
|
||||
"labels": [],
|
||||
"assignees": [],
|
||||
"html_url": "http://test/1",
|
||||
},
|
||||
{
|
||||
"number": 2,
|
||||
"title": "Issue 2",
|
||||
"body": "Test",
|
||||
"labels": [],
|
||||
"assignees": [],
|
||||
"html_url": "http://test/2",
|
||||
},
|
||||
]
|
||||
|
||||
with patch(
|
||||
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
|
||||
) as mock_broadcast:
|
||||
mock_broadcast.return_value = None
|
||||
with patch(
|
||||
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
|
||||
) as mock_fetch:
|
||||
mock_fetch.return_value = mock_issues
|
||||
with patch(
|
||||
"timmy.vassal.dispatch.get_dispatch_registry"
|
||||
) as mock_registry:
|
||||
# Issue 1 already dispatched
|
||||
mock_registry.return_value = {1: MagicMock()}
|
||||
with patch(
|
||||
"timmy.vassal.dispatch.dispatch_issue", new_callable=AsyncMock
|
||||
) as mock_dispatch:
|
||||
mock_dispatch.return_value = MagicMock()
|
||||
record = await orchestrator.run_cycle()
|
||||
|
||||
assert record.issues_fetched == 2
|
||||
assert record.issues_dispatched == 1
|
||||
mock_dispatch.assert_awaited_once()
|
||||
# Should be called with issue 2
|
||||
call_args = mock_dispatch.call_args[0][0]
|
||||
assert call_args.number == 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_cycle_tracks_agent_targets(self, orchestrator):
|
||||
"""Test that dispatch counts are tracked per agent."""
|
||||
|
||||
mock_issues = [
|
||||
{
|
||||
"number": 1,
|
||||
"title": "Architecture refactor", # Should route to Claude
|
||||
"body": "Test",
|
||||
"labels": [],
|
||||
"assignees": [],
|
||||
"html_url": "http://test/1",
|
||||
},
|
||||
{
|
||||
"number": 2,
|
||||
"title": "Research analysis", # Should route to Kimi
|
||||
"body": "Test",
|
||||
"labels": [],
|
||||
"assignees": [],
|
||||
"html_url": "http://test/2",
|
||||
},
|
||||
{
|
||||
"number": 3,
|
||||
"title": "Docs update", # Should route to Timmy
|
||||
"body": "Test",
|
||||
"labels": [],
|
||||
"assignees": [],
|
||||
"html_url": "http://test/3",
|
||||
},
|
||||
]
|
||||
|
||||
with patch(
|
||||
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
|
||||
) as mock_broadcast:
|
||||
mock_broadcast.return_value = None
|
||||
with patch(
|
||||
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
|
||||
) as mock_fetch:
|
||||
mock_fetch.return_value = mock_issues
|
||||
with patch(
|
||||
"timmy.vassal.dispatch.dispatch_issue", new_callable=AsyncMock
|
||||
) as mock_dispatch:
|
||||
mock_dispatch.return_value = MagicMock()
|
||||
record = await orchestrator.run_cycle()
|
||||
|
||||
assert record.issues_dispatched == 3
|
||||
assert record.dispatched_to_claude == 1
|
||||
assert record.dispatched_to_kimi == 1
|
||||
assert record.dispatched_to_timmy == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_cycle_handles_backlog_error(self, orchestrator):
|
||||
"""Test graceful handling of backlog step errors."""
|
||||
with patch(
|
||||
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
|
||||
) as mock_broadcast:
|
||||
mock_broadcast.return_value = None
|
||||
with patch(
|
||||
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
|
||||
) as mock_fetch:
|
||||
mock_fetch.side_effect = RuntimeError("Gitea down")
|
||||
record = await orchestrator.run_cycle()
|
||||
|
||||
assert record.cycle_id == 1
|
||||
assert record.issues_fetched == 0
|
||||
assert len(record.errors) == 1
|
||||
assert "backlog" in record.errors[0]
|
||||
assert record.healthy is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_cycle_handles_agent_health_error(self, orchestrator):
|
||||
"""Test graceful handling of agent health step errors."""
|
||||
with patch(
|
||||
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
|
||||
) as mock_broadcast:
|
||||
mock_broadcast.return_value = None
|
||||
with patch(
|
||||
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
|
||||
) as mock_fetch:
|
||||
mock_fetch.return_value = []
|
||||
with patch(
|
||||
"timmy.vassal.agent_health.get_full_health_report",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_health:
|
||||
mock_health.side_effect = RuntimeError("Health check failed")
|
||||
record = await orchestrator.run_cycle()
|
||||
|
||||
assert len(record.errors) == 1
|
||||
assert "agent_health" in record.errors[0]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_cycle_handles_house_health_error(self, orchestrator):
|
||||
"""Test graceful handling of house health step errors."""
|
||||
with patch(
|
||||
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
|
||||
) as mock_broadcast:
|
||||
mock_broadcast.return_value = None
|
||||
with patch(
|
||||
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
|
||||
) as mock_fetch:
|
||||
mock_fetch.return_value = []
|
||||
with patch(
|
||||
"timmy.vassal.house_health.get_system_snapshot",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_snapshot:
|
||||
mock_snapshot.side_effect = RuntimeError("Snapshot failed")
|
||||
record = await orchestrator.run_cycle()
|
||||
|
||||
assert len(record.errors) == 1
|
||||
assert "house_health" in record.errors[0]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_cycle_detects_stuck_agents(self, orchestrator):
|
||||
"""Test detection and nudging of stuck agents."""
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
@dataclass
|
||||
class MockAgentStatus:
|
||||
agent: str
|
||||
is_stuck: bool = False
|
||||
is_idle: bool = False
|
||||
stuck_issue_numbers: list = field(default_factory=list)
|
||||
|
||||
mock_report = MagicMock()
|
||||
mock_report.agents = [
|
||||
MockAgentStatus(agent="claude", is_stuck=True, stuck_issue_numbers=[100]),
|
||||
MockAgentStatus(agent="kimi", is_stuck=False),
|
||||
]
|
||||
|
||||
with patch(
|
||||
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
|
||||
) as mock_broadcast:
|
||||
mock_broadcast.return_value = None
|
||||
with patch(
|
||||
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
|
||||
) as mock_fetch:
|
||||
mock_fetch.return_value = []
|
||||
with patch(
|
||||
"timmy.vassal.agent_health.get_full_health_report",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_health:
|
||||
mock_health.return_value = mock_report
|
||||
with patch(
|
||||
"timmy.vassal.agent_health.nudge_stuck_agent",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_nudge:
|
||||
mock_nudge.return_value = True
|
||||
record = await orchestrator.run_cycle()
|
||||
|
||||
assert "claude" in record.stuck_agents
|
||||
assert record.nudges_sent == 1
|
||||
mock_nudge.assert_awaited_once_with("claude", 100)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_cycle_triggers_cleanup_on_high_disk(self, orchestrator):
|
||||
"""Test cleanup is triggered when disk usage is high."""
|
||||
mock_snapshot = MagicMock()
|
||||
mock_snapshot.disk.percent_used = 85.0 # Above 80% threshold
|
||||
mock_snapshot.warnings = ["Disk: 85% used"]
|
||||
|
||||
with patch(
|
||||
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
|
||||
) as mock_broadcast:
|
||||
mock_broadcast.return_value = None
|
||||
with patch(
|
||||
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
|
||||
) as mock_fetch:
|
||||
mock_fetch.return_value = []
|
||||
with patch(
|
||||
"timmy.vassal.house_health.get_system_snapshot",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_snapshot_fn:
|
||||
mock_snapshot_fn.return_value = mock_snapshot
|
||||
with patch(
|
||||
"timmy.vassal.house_health.cleanup_stale_files",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_cleanup:
|
||||
mock_cleanup.return_value = {"deleted_count": 5}
|
||||
record = await orchestrator.run_cycle()
|
||||
|
||||
assert record.cleanup_deleted == 5
|
||||
assert record.house_warnings == ["Disk: 85% used"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_status_after_cycle(self, orchestrator):
|
||||
"""Test get_status returns correct info after a cycle."""
|
||||
with patch(
|
||||
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast"
|
||||
) as mock_broadcast:
|
||||
mock_broadcast.return_value = None
|
||||
with patch(
|
||||
"timmy.vassal.backlog.fetch_open_issues", new_callable=AsyncMock
|
||||
) as mock_fetch:
|
||||
mock_fetch.return_value = []
|
||||
await orchestrator.run_cycle()
|
||||
|
||||
status = orchestrator.get_status()
|
||||
assert status["running"] is False
|
||||
assert status["cycle_count"] == 1
|
||||
assert status["last_cycle"] is not None
|
||||
assert status["last_cycle"]["cycle_id"] == 1
|
||||
assert status["last_cycle"]["issues_fetched"] == 0
|
||||
assert status["last_cycle"]["healthy"] is True
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Background loop tests
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBackgroundLoop:
|
||||
"""Tests for the start/stop background loop methods."""
|
||||
|
||||
@pytest.fixture
|
||||
def orchestrator(self):
|
||||
"""Create a fresh orchestrator for each test."""
|
||||
return VassalOrchestrator(cycle_interval=0.1)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_stop_cycle(self, orchestrator):
|
||||
"""Test starting and stopping the background loop."""
|
||||
with patch.object(orchestrator, "run_cycle", new_callable=AsyncMock) as mock_run:
|
||||
mock_run.return_value = MagicMock()
|
||||
|
||||
# Start the loop
|
||||
await orchestrator.start()
|
||||
assert orchestrator.is_running is True
|
||||
assert orchestrator._task is not None
|
||||
|
||||
# Let it run for a bit
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
# Stop the loop
|
||||
orchestrator.stop()
|
||||
assert orchestrator.is_running is False
|
||||
|
||||
# Should have run at least once
|
||||
assert mock_run.await_count >= 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_already_running(self, orchestrator):
|
||||
"""Test starting when already running is a no-op."""
|
||||
with patch.object(orchestrator, "run_cycle", new_callable=AsyncMock):
|
||||
await orchestrator.start()
|
||||
first_task = orchestrator._task
|
||||
|
||||
# Start again should not create new task
|
||||
await orchestrator.start()
|
||||
assert orchestrator._task is first_task
|
||||
|
||||
orchestrator.stop()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_not_running(self, orchestrator):
|
||||
"""Test stopping when not running is a no-op."""
|
||||
orchestrator.stop()
|
||||
assert orchestrator.is_running is False
|
||||
assert orchestrator._task is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_loop_handles_cycle_exceptions(self, orchestrator):
|
||||
"""Test that exceptions in run_cycle don't crash the loop."""
|
||||
with patch.object(
|
||||
orchestrator, "run_cycle", new_callable=AsyncMock
|
||||
) as mock_run:
|
||||
mock_run.side_effect = [RuntimeError("Boom"), MagicMock()]
|
||||
|
||||
await orchestrator.start()
|
||||
await asyncio.sleep(0.25)
|
||||
orchestrator.stop()
|
||||
|
||||
# Should have been called multiple times despite error
|
||||
assert mock_run.await_count >= 2
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Interval resolution tests
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestIntervalResolution:
|
||||
"""Tests for the _resolve_interval method."""
|
||||
|
||||
def test_resolve_interval_explicit(self):
|
||||
"""Test that explicit interval is used when provided."""
|
||||
orchestrator = VassalOrchestrator(cycle_interval=60.0)
|
||||
assert orchestrator._resolve_interval() == 60.0
|
||||
|
||||
def test_resolve_interval_from_settings(self):
|
||||
"""Test interval is read from settings when not explicitly set."""
|
||||
orchestrator = VassalOrchestrator()
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.vassal_cycle_interval = 120.0
|
||||
|
||||
with patch("config.settings", mock_settings):
|
||||
assert orchestrator._resolve_interval() == 120.0
|
||||
|
||||
def test_resolve_interval_default_fallback(self):
|
||||
"""Test default 300s is used when settings fails."""
|
||||
orchestrator = VassalOrchestrator()
|
||||
|
||||
with patch("config.settings", None):
|
||||
assert orchestrator._resolve_interval() == 300.0
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Broadcast tests
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBroadcast:
|
||||
"""Tests for the _broadcast helper."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_success(self):
|
||||
"""Test successful WebSocket broadcast."""
|
||||
orchestrator = VassalOrchestrator()
|
||||
record = VassalCycleRecord(
|
||||
cycle_id=1,
|
||||
started_at="2026-03-23T12:00:00+00:00",
|
||||
finished_at="2026-03-23T12:00:01+00:00",
|
||||
duration_ms=1000,
|
||||
issues_fetched=5,
|
||||
issues_dispatched=2,
|
||||
)
|
||||
|
||||
mock_ws_manager = MagicMock()
|
||||
mock_ws_manager.broadcast = AsyncMock()
|
||||
|
||||
with patch(
|
||||
"infrastructure.ws_manager.handler.ws_manager", mock_ws_manager
|
||||
):
|
||||
await orchestrator._broadcast(record)
|
||||
|
||||
mock_ws_manager.broadcast.assert_awaited_once()
|
||||
call_args = mock_ws_manager.broadcast.call_args[0]
|
||||
assert call_args[0] == "vassal.cycle"
|
||||
assert call_args[1]["cycle_id"] == 1
|
||||
assert call_args[1]["healthy"] is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_graceful_degradation(self):
|
||||
"""Test broadcast gracefully handles errors."""
|
||||
orchestrator = VassalOrchestrator()
|
||||
record = VassalCycleRecord(cycle_id=1, started_at="2026-03-23T12:00:00+00:00")
|
||||
|
||||
with patch(
|
||||
"infrastructure.ws_manager.handler.ws_manager"
|
||||
) as mock_ws_manager:
|
||||
mock_ws_manager.broadcast = AsyncMock(
|
||||
side_effect=RuntimeError("WS disconnected")
|
||||
)
|
||||
# Should not raise
|
||||
await orchestrator._broadcast(record)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_import_error(self):
|
||||
"""Test broadcast handles missing ws_manager module."""
|
||||
orchestrator = VassalOrchestrator()
|
||||
record = VassalCycleRecord(cycle_id=1, started_at="2026-03-23T12:00:00+00:00")
|
||||
|
||||
with patch.dict("sys.modules", {"infrastructure.ws_manager.handler": None}):
|
||||
# Should not raise
|
||||
await orchestrator._broadcast(record)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Module singleton test
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestModuleSingleton:
|
||||
"""Tests for the module-level vassal_orchestrator singleton."""
|
||||
|
||||
def test_singleton_import(self):
|
||||
"""Test that the module-level singleton is available."""
|
||||
from timmy.vassal import vassal_orchestrator
|
||||
|
||||
assert isinstance(vassal_orchestrator, VassalOrchestrator)
|
||||
|
||||
def test_singleton_is_single_instance(self):
|
||||
"""Test that importing twice returns same instance."""
|
||||
from timmy.vassal import vassal_orchestrator as orch1
|
||||
from timmy.vassal import vassal_orchestrator as orch2
|
||||
|
||||
assert orch1 is orch2
|
||||
|
||||
|
||||
# Need to import asyncio for the background loop tests
|
||||
import asyncio # noqa: E402
|
||||
308
tests/timmy/test_tools_search.py
Normal file
308
tests/timmy/test_tools_search.py
Normal file
@@ -0,0 +1,308 @@
|
||||
"""Unit tests for web_search and scrape_url tools (SearXNG + Crawl4AI).
|
||||
|
||||
All tests use mocked HTTP — no live services required.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.tools.search import _extract_crawl_content, scrape_url, web_search
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _mock_requests(json_response=None, status_code=200, raise_exc=None):
|
||||
"""Build a mock requests module whose .get/.post return controlled responses."""
|
||||
mock_req = MagicMock()
|
||||
|
||||
# Exception hierarchy
|
||||
class Timeout(Exception):
|
||||
pass
|
||||
|
||||
class HTTPError(Exception):
|
||||
def __init__(self, *a, response=None, **kw):
|
||||
super().__init__(*a, **kw)
|
||||
self.response = response
|
||||
|
||||
class RequestException(Exception):
|
||||
pass
|
||||
|
||||
exc_mod = MagicMock()
|
||||
exc_mod.Timeout = Timeout
|
||||
exc_mod.HTTPError = HTTPError
|
||||
exc_mod.RequestException = RequestException
|
||||
mock_req.exceptions = exc_mod
|
||||
|
||||
if raise_exc is not None:
|
||||
mock_req.get.side_effect = raise_exc
|
||||
mock_req.post.side_effect = raise_exc
|
||||
else:
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = status_code
|
||||
mock_resp.json.return_value = json_response or {}
|
||||
if status_code >= 400:
|
||||
mock_resp.raise_for_status.side_effect = HTTPError(
|
||||
response=MagicMock(status_code=status_code)
|
||||
)
|
||||
mock_req.get.return_value = mock_resp
|
||||
mock_req.post.return_value = mock_resp
|
||||
|
||||
return mock_req
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# web_search tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestWebSearch:
|
||||
def test_backend_none_short_circuits(self):
|
||||
"""TIMMY_SEARCH_BACKEND=none returns disabled message immediately."""
|
||||
with patch("timmy.tools.search.settings") as mock_settings:
|
||||
mock_settings.timmy_search_backend = "none"
|
||||
result = web_search("anything")
|
||||
assert "disabled" in result
|
||||
|
||||
def test_missing_requests_package(self):
|
||||
"""Graceful error when requests is not installed."""
|
||||
with patch.dict("sys.modules", {"requests": None}):
|
||||
with patch("timmy.tools.search.settings") as mock_settings:
|
||||
mock_settings.timmy_search_backend = "searxng"
|
||||
mock_settings.search_url = "http://localhost:8888"
|
||||
result = web_search("test query")
|
||||
assert "requests" in result and "not installed" in result
|
||||
|
||||
def test_successful_search(self):
|
||||
"""Happy path: returns formatted result list."""
|
||||
mock_data = {
|
||||
"results": [
|
||||
{"title": "Foo Bar", "url": "https://example.com/foo", "content": "Foo is great"},
|
||||
{"title": "Baz", "url": "https://example.com/baz", "content": "Baz rules"},
|
||||
]
|
||||
}
|
||||
mock_req = _mock_requests(json_response=mock_data)
|
||||
with patch.dict("sys.modules", {"requests": mock_req}):
|
||||
with patch("timmy.tools.search.settings") as mock_settings:
|
||||
mock_settings.timmy_search_backend = "searxng"
|
||||
mock_settings.search_url = "http://localhost:8888"
|
||||
result = web_search("foo bar")
|
||||
|
||||
assert "Foo Bar" in result
|
||||
assert "https://example.com/foo" in result
|
||||
assert "Baz" in result
|
||||
assert "foo bar" in result
|
||||
|
||||
def test_no_results(self):
|
||||
"""Empty results list returns a helpful no-results message."""
|
||||
mock_req = _mock_requests(json_response={"results": []})
|
||||
with patch.dict("sys.modules", {"requests": mock_req}):
|
||||
with patch("timmy.tools.search.settings") as mock_settings:
|
||||
mock_settings.timmy_search_backend = "searxng"
|
||||
mock_settings.search_url = "http://localhost:8888"
|
||||
result = web_search("xyzzy")
|
||||
assert "No results" in result
|
||||
|
||||
def test_num_results_respected(self):
|
||||
"""Only up to num_results entries are returned."""
|
||||
mock_data = {
|
||||
"results": [
|
||||
{"title": f"Result {i}", "url": f"https://example.com/{i}", "content": "x"}
|
||||
for i in range(10)
|
||||
]
|
||||
}
|
||||
mock_req = _mock_requests(json_response=mock_data)
|
||||
with patch.dict("sys.modules", {"requests": mock_req}):
|
||||
with patch("timmy.tools.search.settings") as mock_settings:
|
||||
mock_settings.timmy_search_backend = "searxng"
|
||||
mock_settings.search_url = "http://localhost:8888"
|
||||
result = web_search("test", num_results=3)
|
||||
|
||||
# Only 3 numbered entries should appear
|
||||
assert "1." in result
|
||||
assert "3." in result
|
||||
assert "4." not in result
|
||||
|
||||
def test_service_unavailable(self):
|
||||
"""Connection error degrades gracefully."""
|
||||
mock_req = MagicMock()
|
||||
mock_req.get.side_effect = OSError("connection refused")
|
||||
mock_req.exceptions = MagicMock()
|
||||
with patch.dict("sys.modules", {"requests": mock_req}):
|
||||
with patch("timmy.tools.search.settings") as mock_settings:
|
||||
mock_settings.timmy_search_backend = "searxng"
|
||||
mock_settings.search_url = "http://localhost:8888"
|
||||
result = web_search("test")
|
||||
assert "not reachable" in result or "unavailable" in result
|
||||
|
||||
def test_catalog_entry_exists(self):
|
||||
"""web_search must appear in the tool catalog."""
|
||||
from timmy.tools import get_all_available_tools
|
||||
|
||||
catalog = get_all_available_tools()
|
||||
assert "web_search" in catalog
|
||||
assert "orchestrator" in catalog["web_search"]["available_in"]
|
||||
assert "echo" in catalog["web_search"]["available_in"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# scrape_url tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestScrapeUrl:
|
||||
def test_invalid_url_no_scheme(self):
|
||||
"""URLs without http(s) scheme are rejected before any HTTP call."""
|
||||
result = scrape_url("example.com/page")
|
||||
assert "Error: invalid URL" in result
|
||||
|
||||
def test_invalid_url_empty(self):
|
||||
result = scrape_url("")
|
||||
assert "Error: invalid URL" in result
|
||||
|
||||
def test_backend_none_short_circuits(self):
|
||||
with patch("timmy.tools.search.settings") as mock_settings:
|
||||
mock_settings.timmy_search_backend = "none"
|
||||
result = scrape_url("https://example.com")
|
||||
assert "disabled" in result
|
||||
|
||||
def test_missing_requests_package(self):
|
||||
with patch.dict("sys.modules", {"requests": None}):
|
||||
with patch("timmy.tools.search.settings") as mock_settings:
|
||||
mock_settings.timmy_search_backend = "searxng"
|
||||
mock_settings.crawl_url = "http://localhost:11235"
|
||||
result = scrape_url("https://example.com")
|
||||
assert "requests" in result and "not installed" in result
|
||||
|
||||
def test_sync_result_returned_immediately(self):
|
||||
"""If Crawl4AI returns results in the POST response, use them directly."""
|
||||
mock_data = {
|
||||
"results": [{"markdown": "# Hello\n\nThis is the page content."}]
|
||||
}
|
||||
mock_req = _mock_requests(json_response=mock_data)
|
||||
with patch.dict("sys.modules", {"requests": mock_req}):
|
||||
with patch("timmy.tools.search.settings") as mock_settings:
|
||||
mock_settings.timmy_search_backend = "searxng"
|
||||
mock_settings.crawl_url = "http://localhost:11235"
|
||||
result = scrape_url("https://example.com")
|
||||
|
||||
assert "Hello" in result
|
||||
assert "page content" in result
|
||||
|
||||
def test_async_poll_completed(self):
|
||||
"""Async task_id flow: polls until completed and returns content."""
|
||||
submit_response = MagicMock()
|
||||
submit_response.json.return_value = {"task_id": "abc123"}
|
||||
submit_response.raise_for_status.return_value = None
|
||||
|
||||
poll_response = MagicMock()
|
||||
poll_response.json.return_value = {
|
||||
"status": "completed",
|
||||
"results": [{"markdown": "# Async content"}],
|
||||
}
|
||||
poll_response.raise_for_status.return_value = None
|
||||
|
||||
mock_req = MagicMock()
|
||||
mock_req.post.return_value = submit_response
|
||||
mock_req.get.return_value = poll_response
|
||||
mock_req.exceptions = MagicMock()
|
||||
|
||||
with patch.dict("sys.modules", {"requests": mock_req}):
|
||||
with patch("timmy.tools.search.settings") as mock_settings:
|
||||
mock_settings.timmy_search_backend = "searxng"
|
||||
mock_settings.crawl_url = "http://localhost:11235"
|
||||
with patch("timmy.tools.search.time") as mock_time:
|
||||
mock_time.sleep = MagicMock()
|
||||
result = scrape_url("https://example.com")
|
||||
|
||||
assert "Async content" in result
|
||||
|
||||
def test_async_poll_failed_task(self):
|
||||
"""Crawl4AI task failure is reported clearly."""
|
||||
submit_response = MagicMock()
|
||||
submit_response.json.return_value = {"task_id": "abc123"}
|
||||
submit_response.raise_for_status.return_value = None
|
||||
|
||||
poll_response = MagicMock()
|
||||
poll_response.json.return_value = {"status": "failed", "error": "site blocked"}
|
||||
poll_response.raise_for_status.return_value = None
|
||||
|
||||
mock_req = MagicMock()
|
||||
mock_req.post.return_value = submit_response
|
||||
mock_req.get.return_value = poll_response
|
||||
mock_req.exceptions = MagicMock()
|
||||
|
||||
with patch.dict("sys.modules", {"requests": mock_req}):
|
||||
with patch("timmy.tools.search.settings") as mock_settings:
|
||||
mock_settings.timmy_search_backend = "searxng"
|
||||
mock_settings.crawl_url = "http://localhost:11235"
|
||||
with patch("timmy.tools.search.time") as mock_time:
|
||||
mock_time.sleep = MagicMock()
|
||||
result = scrape_url("https://example.com")
|
||||
|
||||
assert "failed" in result and "site blocked" in result
|
||||
|
||||
def test_service_unavailable(self):
|
||||
"""Connection error degrades gracefully."""
|
||||
mock_req = MagicMock()
|
||||
mock_req.post.side_effect = OSError("connection refused")
|
||||
mock_req.exceptions = MagicMock()
|
||||
with patch.dict("sys.modules", {"requests": mock_req}):
|
||||
with patch("timmy.tools.search.settings") as mock_settings:
|
||||
mock_settings.timmy_search_backend = "searxng"
|
||||
mock_settings.crawl_url = "http://localhost:11235"
|
||||
result = scrape_url("https://example.com")
|
||||
assert "not reachable" in result or "unavailable" in result
|
||||
|
||||
def test_content_truncation(self):
|
||||
"""Content longer than ~4000 tokens is truncated."""
|
||||
long_content = "x" * 20000
|
||||
mock_data = {"results": [{"markdown": long_content}]}
|
||||
mock_req = _mock_requests(json_response=mock_data)
|
||||
with patch.dict("sys.modules", {"requests": mock_req}):
|
||||
with patch("timmy.tools.search.settings") as mock_settings:
|
||||
mock_settings.timmy_search_backend = "searxng"
|
||||
mock_settings.crawl_url = "http://localhost:11235"
|
||||
result = scrape_url("https://example.com")
|
||||
|
||||
assert "[…truncated" in result
|
||||
assert len(result) < 17000
|
||||
|
||||
def test_catalog_entry_exists(self):
|
||||
"""scrape_url must appear in the tool catalog."""
|
||||
from timmy.tools import get_all_available_tools
|
||||
|
||||
catalog = get_all_available_tools()
|
||||
assert "scrape_url" in catalog
|
||||
assert "orchestrator" in catalog["scrape_url"]["available_in"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _extract_crawl_content helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExtractCrawlContent:
|
||||
def test_empty_results(self):
|
||||
result = _extract_crawl_content([], "https://example.com")
|
||||
assert "No content" in result
|
||||
|
||||
def test_markdown_field_preferred(self):
|
||||
results = [{"markdown": "# Title", "content": "fallback"}]
|
||||
result = _extract_crawl_content(results, "https://example.com")
|
||||
assert "Title" in result
|
||||
|
||||
def test_fallback_to_content_field(self):
|
||||
results = [{"content": "plain text content"}]
|
||||
result = _extract_crawl_content(results, "https://example.com")
|
||||
assert "plain text content" in result
|
||||
|
||||
def test_no_content_fields(self):
|
||||
results = [{"url": "https://example.com"}]
|
||||
result = _extract_crawl_content(results, "https://example.com")
|
||||
assert "No readable content" in result
|
||||
135
tests/unit/test_airllm_backend.py
Normal file
135
tests/unit/test_airllm_backend.py
Normal file
@@ -0,0 +1,135 @@
|
||||
"""Unit tests for AirLLM backend graceful degradation.
|
||||
|
||||
Verifies that setting TIMMY_MODEL_BACKEND=airllm on non-Apple-Silicon hardware
|
||||
(Intel Mac, Linux, Windows) or when the airllm package is not installed
|
||||
falls back to the Ollama backend without crashing.
|
||||
|
||||
Refs #1284
|
||||
"""
|
||||
|
||||
import sys
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
|
||||
class TestIsAppleSilicon:
|
||||
"""is_apple_silicon() correctly identifies the host platform."""
|
||||
|
||||
def test_returns_true_on_arm64_darwin(self):
|
||||
from timmy.backends import is_apple_silicon
|
||||
|
||||
with patch("platform.system", return_value="Darwin"), patch(
|
||||
"platform.machine", return_value="arm64"
|
||||
):
|
||||
assert is_apple_silicon() is True
|
||||
|
||||
def test_returns_false_on_intel_mac(self):
|
||||
from timmy.backends import is_apple_silicon
|
||||
|
||||
with patch("platform.system", return_value="Darwin"), patch(
|
||||
"platform.machine", return_value="x86_64"
|
||||
):
|
||||
assert is_apple_silicon() is False
|
||||
|
||||
def test_returns_false_on_linux(self):
|
||||
from timmy.backends import is_apple_silicon
|
||||
|
||||
with patch("platform.system", return_value="Linux"), patch(
|
||||
"platform.machine", return_value="x86_64"
|
||||
):
|
||||
assert is_apple_silicon() is False
|
||||
|
||||
def test_returns_false_on_windows(self):
|
||||
from timmy.backends import is_apple_silicon
|
||||
|
||||
with patch("platform.system", return_value="Windows"), patch(
|
||||
"platform.machine", return_value="AMD64"
|
||||
):
|
||||
assert is_apple_silicon() is False
|
||||
|
||||
|
||||
class TestAirLLMGracefulDegradation:
|
||||
"""create_timmy(backend='airllm') falls back to Ollama on unsupported platforms."""
|
||||
|
||||
def _make_fake_ollama_agent(self):
|
||||
"""Return a lightweight stub that satisfies the Agno Agent interface."""
|
||||
agent = MagicMock()
|
||||
agent.run = MagicMock(return_value=MagicMock(content="ok"))
|
||||
return agent
|
||||
|
||||
def test_falls_back_to_ollama_on_non_apple_silicon(self, caplog):
|
||||
"""On Intel/Linux, airllm backend logs a warning and creates an Ollama agent."""
|
||||
import logging
|
||||
|
||||
from timmy.agent import create_timmy
|
||||
|
||||
fake_agent = self._make_fake_ollama_agent()
|
||||
|
||||
with (
|
||||
patch("timmy.backends.is_apple_silicon", return_value=False),
|
||||
patch("timmy.agent._create_ollama_agent", return_value=fake_agent) as mock_create,
|
||||
patch("timmy.agent._resolve_model_with_fallback", return_value=("qwen3:8b", False)),
|
||||
patch("timmy.agent._check_model_available", return_value=True),
|
||||
patch("timmy.agent._build_tools_list", return_value=[]),
|
||||
patch("timmy.agent._build_prompt", return_value="test prompt"),
|
||||
caplog.at_level(logging.WARNING, logger="timmy.agent"),
|
||||
):
|
||||
result = create_timmy(backend="airllm")
|
||||
|
||||
assert result is fake_agent
|
||||
mock_create.assert_called_once()
|
||||
assert "Apple Silicon" in caplog.text
|
||||
|
||||
def test_falls_back_to_ollama_when_airllm_not_installed(self, caplog):
|
||||
"""When the airllm package is missing, log a warning and use Ollama."""
|
||||
import logging
|
||||
|
||||
from timmy.agent import create_timmy
|
||||
|
||||
fake_agent = self._make_fake_ollama_agent()
|
||||
|
||||
# Simulate Apple Silicon + missing airllm package
|
||||
def _import_side_effect(name, *args, **kwargs):
|
||||
if name == "airllm":
|
||||
raise ImportError("No module named 'airllm'")
|
||||
return original_import(name, *args, **kwargs)
|
||||
|
||||
original_import = __builtins__["__import__"] if isinstance(__builtins__, dict) else __import__
|
||||
|
||||
with (
|
||||
patch("timmy.backends.is_apple_silicon", return_value=True),
|
||||
patch("builtins.__import__", side_effect=_import_side_effect),
|
||||
patch("timmy.agent._create_ollama_agent", return_value=fake_agent) as mock_create,
|
||||
patch("timmy.agent._resolve_model_with_fallback", return_value=("qwen3:8b", False)),
|
||||
patch("timmy.agent._check_model_available", return_value=True),
|
||||
patch("timmy.agent._build_tools_list", return_value=[]),
|
||||
patch("timmy.agent._build_prompt", return_value="test prompt"),
|
||||
caplog.at_level(logging.WARNING, logger="timmy.agent"),
|
||||
):
|
||||
result = create_timmy(backend="airllm")
|
||||
|
||||
assert result is fake_agent
|
||||
mock_create.assert_called_once()
|
||||
assert "airllm" in caplog.text.lower() or "AirLLM" in caplog.text
|
||||
|
||||
def test_airllm_backend_does_not_raise(self):
|
||||
"""create_timmy(backend='airllm') never raises — it degrades gracefully."""
|
||||
from timmy.agent import create_timmy
|
||||
|
||||
fake_agent = self._make_fake_ollama_agent()
|
||||
|
||||
with (
|
||||
patch("timmy.backends.is_apple_silicon", return_value=False),
|
||||
patch("timmy.agent._create_ollama_agent", return_value=fake_agent),
|
||||
patch("timmy.agent._resolve_model_with_fallback", return_value=("qwen3:8b", False)),
|
||||
patch("timmy.agent._check_model_available", return_value=True),
|
||||
patch("timmy.agent._build_tools_list", return_value=[]),
|
||||
patch("timmy.agent._build_prompt", return_value="test prompt"),
|
||||
):
|
||||
# Should not raise under any circumstances
|
||||
result = create_timmy(backend="airllm")
|
||||
|
||||
assert result is not None
|
||||
235
tests/unit/test_brain_worker.py
Normal file
235
tests/unit/test_brain_worker.py
Normal file
@@ -0,0 +1,235 @@
|
||||
"""Unit tests for brain.worker.DistributedWorker."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from brain.worker import MAX_RETRIES, DelegatedTask, DistributedWorker
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def clear_task_registry():
|
||||
"""Reset the worker registry before each test."""
|
||||
DistributedWorker.clear()
|
||||
yield
|
||||
DistributedWorker.clear()
|
||||
|
||||
|
||||
class TestSubmit:
|
||||
def test_returns_task_id(self):
|
||||
with patch.object(DistributedWorker, "_run_task"):
|
||||
task_id = DistributedWorker.submit("researcher", "research", "find something")
|
||||
assert isinstance(task_id, str)
|
||||
assert len(task_id) == 8
|
||||
|
||||
def test_task_registered_as_queued(self):
|
||||
with patch.object(DistributedWorker, "_run_task"):
|
||||
task_id = DistributedWorker.submit("coder", "code", "fix the bug")
|
||||
status = DistributedWorker.get_status(task_id)
|
||||
assert status["found"] is True
|
||||
assert status["task_id"] == task_id
|
||||
assert status["agent"] == "coder"
|
||||
|
||||
def test_unique_task_ids(self):
|
||||
with patch.object(DistributedWorker, "_run_task"):
|
||||
ids = [DistributedWorker.submit("coder", "code", "task") for _ in range(10)]
|
||||
assert len(set(ids)) == 10
|
||||
|
||||
def test_starts_daemon_thread(self):
|
||||
event = threading.Event()
|
||||
|
||||
def fake_run_task(record):
|
||||
event.set()
|
||||
|
||||
with patch.object(DistributedWorker, "_run_task", side_effect=fake_run_task):
|
||||
DistributedWorker.submit("coder", "code", "something")
|
||||
|
||||
assert event.wait(timeout=2), "Background thread did not start"
|
||||
|
||||
def test_priority_stored(self):
|
||||
with patch.object(DistributedWorker, "_run_task"):
|
||||
task_id = DistributedWorker.submit("coder", "code", "task", priority="high")
|
||||
status = DistributedWorker.get_status(task_id)
|
||||
assert status["priority"] == "high"
|
||||
|
||||
|
||||
class TestGetStatus:
|
||||
def test_unknown_task_id(self):
|
||||
result = DistributedWorker.get_status("deadbeef")
|
||||
assert result["found"] is False
|
||||
assert result["task_id"] == "deadbeef"
|
||||
|
||||
def test_known_task_has_all_fields(self):
|
||||
with patch.object(DistributedWorker, "_run_task"):
|
||||
task_id = DistributedWorker.submit("writer", "writing", "write a blog post")
|
||||
status = DistributedWorker.get_status(task_id)
|
||||
for key in ("found", "task_id", "agent", "role", "status", "backend", "created_at"):
|
||||
assert key in status, f"Missing key: {key}"
|
||||
|
||||
|
||||
class TestListTasks:
|
||||
def test_empty_initially(self):
|
||||
assert DistributedWorker.list_tasks() == []
|
||||
|
||||
def test_returns_registered_tasks(self):
|
||||
with patch.object(DistributedWorker, "_run_task"):
|
||||
DistributedWorker.submit("coder", "code", "task A")
|
||||
DistributedWorker.submit("writer", "writing", "task B")
|
||||
tasks = DistributedWorker.list_tasks()
|
||||
assert len(tasks) == 2
|
||||
agents = {t["agent"] for t in tasks}
|
||||
assert agents == {"coder", "writer"}
|
||||
|
||||
|
||||
class TestSelectBackend:
|
||||
def test_defaults_to_agentic_loop(self):
|
||||
with patch("brain.worker.logger"):
|
||||
backend = DistributedWorker._select_backend("code", "fix the bug")
|
||||
assert backend == "agentic_loop"
|
||||
|
||||
def test_kimi_for_heavy_research_with_gitea(self):
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok"
|
||||
mock_settings.paperclip_api_key = ""
|
||||
|
||||
with (
|
||||
patch("timmy.kimi_delegation.exceeds_local_capacity", return_value=True),
|
||||
patch("config.settings", mock_settings),
|
||||
):
|
||||
backend = DistributedWorker._select_backend("research", "comprehensive survey " * 10)
|
||||
assert backend == "kimi"
|
||||
|
||||
def test_agentic_loop_when_no_gitea(self):
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
mock_settings.paperclip_api_key = ""
|
||||
|
||||
with patch("config.settings", mock_settings):
|
||||
backend = DistributedWorker._select_backend("research", "comprehensive survey " * 10)
|
||||
assert backend == "agentic_loop"
|
||||
|
||||
def test_paperclip_when_api_key_configured(self):
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
mock_settings.paperclip_api_key = "pk_test_123"
|
||||
|
||||
with patch("config.settings", mock_settings):
|
||||
backend = DistributedWorker._select_backend("code", "build a widget")
|
||||
assert backend == "paperclip"
|
||||
|
||||
|
||||
class TestRunTask:
|
||||
def test_marks_completed_on_success(self):
|
||||
record = DelegatedTask(
|
||||
task_id="abc12345",
|
||||
agent_name="coder",
|
||||
agent_role="code",
|
||||
task_description="fix bug",
|
||||
priority="normal",
|
||||
backend="agentic_loop",
|
||||
)
|
||||
|
||||
with patch.object(DistributedWorker, "_dispatch", return_value={"success": True}):
|
||||
DistributedWorker._run_task(record)
|
||||
|
||||
assert record.status == "completed"
|
||||
assert record.result == {"success": True}
|
||||
assert record.error is None
|
||||
|
||||
def test_marks_failed_after_exhausting_retries(self):
|
||||
record = DelegatedTask(
|
||||
task_id="fail1234",
|
||||
agent_name="coder",
|
||||
agent_role="code",
|
||||
task_description="broken task",
|
||||
priority="normal",
|
||||
backend="agentic_loop",
|
||||
)
|
||||
|
||||
with patch.object(DistributedWorker, "_dispatch", side_effect=RuntimeError("boom")):
|
||||
DistributedWorker._run_task(record)
|
||||
|
||||
assert record.status == "failed"
|
||||
assert "boom" in record.error
|
||||
assert record.retries == MAX_RETRIES
|
||||
|
||||
def test_retries_before_failing(self):
|
||||
record = DelegatedTask(
|
||||
task_id="retry001",
|
||||
agent_name="coder",
|
||||
agent_role="code",
|
||||
task_description="flaky task",
|
||||
priority="normal",
|
||||
backend="agentic_loop",
|
||||
)
|
||||
|
||||
call_count = 0
|
||||
|
||||
def flaky_dispatch(r):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count < MAX_RETRIES + 1:
|
||||
raise RuntimeError("transient failure")
|
||||
return {"success": True}
|
||||
|
||||
with patch.object(DistributedWorker, "_dispatch", side_effect=flaky_dispatch):
|
||||
DistributedWorker._run_task(record)
|
||||
|
||||
assert record.status == "completed"
|
||||
assert call_count == MAX_RETRIES + 1
|
||||
|
||||
def test_succeeds_on_first_attempt(self):
|
||||
record = DelegatedTask(
|
||||
task_id="ok000001",
|
||||
agent_name="writer",
|
||||
agent_role="writing",
|
||||
task_description="write summary",
|
||||
priority="low",
|
||||
backend="agentic_loop",
|
||||
)
|
||||
|
||||
with patch.object(DistributedWorker, "_dispatch", return_value={"summary": "done"}):
|
||||
DistributedWorker._run_task(record)
|
||||
|
||||
assert record.status == "completed"
|
||||
assert record.retries == 0
|
||||
|
||||
|
||||
class TestDelegatetaskIntegration:
|
||||
"""Integration: delegate_task should wire to DistributedWorker."""
|
||||
|
||||
def test_delegate_task_returns_task_id(self):
|
||||
from timmy.tools_delegation import delegate_task
|
||||
|
||||
with patch.object(DistributedWorker, "_run_task"):
|
||||
result = delegate_task("researcher", "research something for me")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["task_id"] is not None
|
||||
assert result["status"] == "queued"
|
||||
|
||||
def test_delegate_task_status_queued_for_valid_agent(self):
|
||||
from timmy.tools_delegation import delegate_task
|
||||
|
||||
with patch.object(DistributedWorker, "_run_task"):
|
||||
result = delegate_task("coder", "implement feature X")
|
||||
|
||||
assert result["status"] == "queued"
|
||||
assert len(result["task_id"]) == 8
|
||||
|
||||
def test_task_in_registry_after_delegation(self):
|
||||
from timmy.tools_delegation import delegate_task
|
||||
|
||||
with patch.object(DistributedWorker, "_run_task"):
|
||||
result = delegate_task("writer", "write documentation")
|
||||
|
||||
task_id = result["task_id"]
|
||||
status = DistributedWorker.get_status(task_id)
|
||||
assert status["found"] is True
|
||||
assert status["agent"] == "writer"
|
||||
Reference in New Issue
Block a user