Compare commits
14 Commits
claude/iss
...
claude/iss
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
466683e14d | ||
| b5a65b9d10 | |||
| 43030b7db2 | |||
| ab36149fa5 | |||
| 6a674bf9e0 | |||
| df7358b383 | |||
| af0963a8c7 | |||
| dd65586b5e | |||
| 7f875398fc | |||
| fc53a33361 | |||
| 1697e55cdb | |||
| 092c982341 | |||
| 45bde4df58 | |||
| c0f6ca9fc2 |
15
.github/workflows/tests.yml
vendored
15
.github/workflows/tests.yml
vendored
@@ -50,6 +50,7 @@ jobs:
|
||||
run: pip install tox
|
||||
|
||||
- name: Run tests (via tox)
|
||||
id: tests
|
||||
run: tox -e ci
|
||||
|
||||
# Posts a check annotation + PR comment showing pass/fail counts.
|
||||
@@ -63,6 +64,20 @@ jobs:
|
||||
comment_title: "Test Results"
|
||||
report_individual_runs: true
|
||||
|
||||
- name: Enforce coverage floor (60%)
|
||||
if: always() && steps.tests.outcome == 'success'
|
||||
run: |
|
||||
python -c "
|
||||
import xml.etree.ElementTree as ET, sys
|
||||
tree = ET.parse('reports/coverage.xml')
|
||||
rate = float(tree.getroot().attrib['line-rate']) * 100
|
||||
print(f'Coverage: {rate:.1f}%')
|
||||
if rate < 60:
|
||||
print(f'FAIL: Coverage {rate:.1f}% is below 60% floor')
|
||||
sys.exit(1)
|
||||
print('PASS: Coverage is above 60% floor')
|
||||
"
|
||||
|
||||
# Coverage report available as a downloadable artifact in the Actions tab
|
||||
- name: Upload coverage report
|
||||
uses: actions/upload-artifact@v4
|
||||
|
||||
107
config/moderation.yaml
Normal file
107
config/moderation.yaml
Normal file
@@ -0,0 +1,107 @@
|
||||
# Content Moderation Profiles
|
||||
# Per-game moderation configuration for the AI narrator pipeline.
|
||||
#
|
||||
# Each profile defines:
|
||||
# - vocabulary_whitelist: Game terms safe in context (won't trigger moderation)
|
||||
# - context_prompt: System prompt framing for the narrator
|
||||
# - threshold: Confidence threshold — flags below this pass through
|
||||
# - fallbacks: Pre-generated safe narration by scene type
|
||||
#
|
||||
# Model options (from research):
|
||||
# llama-guard3:1b — Speed (<30ms/sentence, INT4 quantized)
|
||||
# shieldgemma:2b — Accuracy (+10.8% AU-PRC, ~50-100ms)
|
||||
#
|
||||
# Override guard model via MODERATION_GUARD_MODEL env var.
|
||||
|
||||
# ── Guard model selection ────────────────────────────────────────────────────
|
||||
guard_model: "llama-guard3:1b"
|
||||
|
||||
# ── Streaming disclosure notes ───────────────────────────────────────────────
|
||||
# YouTube: Use "Altered or synthetic content" toggle
|
||||
# Twitch: Standard community guidelines (no specific AI disclosure req as of 2026-03)
|
||||
|
||||
# ── Game Profiles ────────────────────────────────────────────────────────────
|
||||
profiles:
|
||||
|
||||
morrowind:
|
||||
display_name: "The Elder Scrolls III: Morrowind"
|
||||
threshold: 0.85
|
||||
vocabulary_whitelist:
|
||||
- Skooma
|
||||
- Moon Sugar
|
||||
- slave
|
||||
- slavery
|
||||
- Morag Tong
|
||||
- Dark Brotherhood
|
||||
- Telvanni
|
||||
- Camonna Tong
|
||||
- smuggler
|
||||
- assassin
|
||||
- Sixth House
|
||||
- Corprus
|
||||
- Dagoth Ur
|
||||
- Nerevarine
|
||||
- Balmora
|
||||
- Vivec
|
||||
- Almsivi
|
||||
- Ordinators
|
||||
- Ashlanders
|
||||
- outlander
|
||||
- N'wah
|
||||
context_prompt: >
|
||||
You are narrating gameplay of The Elder Scrolls III: Morrowind.
|
||||
Morrowind contains mature themes including slavery, drug use
|
||||
(Skooma/Moon Sugar), assassin guilds (Morag Tong, Dark Brotherhood),
|
||||
and political intrigue. Treat these as game mechanics and historical
|
||||
worldbuilding within the game's fictional universe. Never editorialize
|
||||
on real-world parallels. Narrate events neutrally as a game
|
||||
commentator would.
|
||||
fallbacks:
|
||||
combat: "The battle rages on in the ashlands of Vvardenfell."
|
||||
dialogue: "The conversation continues between the characters."
|
||||
exploration: "The Nerevarine presses onward through the landscape."
|
||||
quest: "The quest unfolds as the hero navigates Morrowind's politics."
|
||||
default: "The adventure continues in Morrowind."
|
||||
|
||||
skyrim:
|
||||
display_name: "The Elder Scrolls V: Skyrim"
|
||||
threshold: 0.85
|
||||
vocabulary_whitelist:
|
||||
- Skooma
|
||||
- Dark Brotherhood
|
||||
- Thieves Guild
|
||||
- Stormcloak
|
||||
- Imperial
|
||||
- Dragonborn
|
||||
- Dovahkiin
|
||||
- Daedra
|
||||
- Thalmor
|
||||
- bandit
|
||||
- assassin
|
||||
- Forsworn
|
||||
- necromancer
|
||||
context_prompt: >
|
||||
You are narrating gameplay of The Elder Scrolls V: Skyrim.
|
||||
Skyrim features civil war, thieves guilds, assassin organizations,
|
||||
and fantasy violence. Treat all content as in-game fiction.
|
||||
Never draw real-world parallels. Narrate as a neutral game
|
||||
commentator.
|
||||
fallbacks:
|
||||
combat: "Steel clashes as the battle continues in the wilds of Skyrim."
|
||||
dialogue: "The conversation plays out in the cold northern land."
|
||||
exploration: "The Dragonborn ventures further into the province."
|
||||
default: "The adventure continues in Skyrim."
|
||||
|
||||
default:
|
||||
display_name: "Generic Game"
|
||||
threshold: 0.80
|
||||
vocabulary_whitelist: []
|
||||
context_prompt: >
|
||||
You are narrating gameplay. Describe in-game events as a neutral
|
||||
game commentator. Never reference real-world violence, politics,
|
||||
or controversial topics. Stay focused on game mechanics and story.
|
||||
fallbacks:
|
||||
combat: "The action continues on screen."
|
||||
dialogue: "The conversation unfolds between characters."
|
||||
exploration: "The player explores the game world."
|
||||
default: "The gameplay continues."
|
||||
91
docs/BACKLOG_TRIAGE_2026-03-23.md
Normal file
91
docs/BACKLOG_TRIAGE_2026-03-23.md
Normal file
@@ -0,0 +1,91 @@
|
||||
# Deep Backlog Triage — Harness vs Infrastructure Separation
|
||||
|
||||
**Date:** March 23, 2026
|
||||
**Analyst:** Perplexity Computer
|
||||
**Executor:** Claude (Opus 4.6)
|
||||
**Issue:** #1076
|
||||
|
||||
---
|
||||
|
||||
## Summary of Actions Taken
|
||||
|
||||
### 1. Batch Closed: 17 Rejected-Direction Issues
|
||||
|
||||
OpenClaw rejected direction + superseded autoresearch:
|
||||
#663, #722, #723, #724, #725, #726, #727, #728, #729, #730, #731,
|
||||
#903, #904, #911, #926, #927, #950
|
||||
|
||||
All labeled `rejected-direction`.
|
||||
|
||||
### 2. Closed: 2 Duplicate Issues
|
||||
|
||||
- #867 — duplicate of #887 (Morrowind feasibility study)
|
||||
- #916 — duplicate of #931 (test_setup_script.py fixes)
|
||||
|
||||
Both labeled `duplicate`.
|
||||
|
||||
### 3. Labels Created
|
||||
|
||||
| Label | Color | Purpose |
|
||||
|-------|-------|---------|
|
||||
| `harness` | Red | Core product: agent framework |
|
||||
| `infrastructure` | Blue | Supporting stage: dashboard, CI/CD |
|
||||
| `p0-critical` | Red | Must fix now |
|
||||
| `p1-important` | Orange | Next sprint |
|
||||
| `p2-backlog` | Gold | When time permits |
|
||||
| `rejected-direction` | Gray | Closed: rejected/superseded |
|
||||
| `duplicate` | Light gray | Duplicate of another issue |
|
||||
| `gemini-review` | Purple | Auto-generated, needs review |
|
||||
| `consolidation` | Green | Part of a consolidation epic |
|
||||
| `morrowind` | Brown | Harness: Morrowind embodiment |
|
||||
| `heartbeat` | Crimson | Harness: Agent heartbeat loop |
|
||||
| `inference` | Orange-red | Harness: Inference/model routing |
|
||||
| `sovereignty` | Indigo | Harness: Sovereignty stack |
|
||||
| `memory-session` | Teal | Harness: Memory/session |
|
||||
| `deprioritized` | Dark gray | Not blocking P0 work |
|
||||
|
||||
### 4. Consolidation Epics Created
|
||||
|
||||
- **#1077** — [EPIC] Kimi-Tasks Code Hygiene (14 issues consolidated)
|
||||
- **#1078** — [EPIC] ASCII Video Showcase (6 issues consolidated)
|
||||
|
||||
### 5. Labels Applied
|
||||
|
||||
- **P0 Heartbeat** — 16 issues labeled `harness` + `p0-critical` + `heartbeat`
|
||||
- **P0 Inference** — 10 issues labeled `harness` + `p0-critical` + `inference`
|
||||
- **P0 Memory/Session** — 3 issues labeled `harness` + `p0-critical` + `memory-session`
|
||||
- **P1 Morrowind** — 63 issues labeled `harness` + `p1-important` + `morrowind`
|
||||
- **P1 Sovereignty** — 11 issues labeled `harness` + `p1-important` + `sovereignty`
|
||||
- **P1 SOUL/Persona** — 2 issues labeled `harness` + `p1-important`
|
||||
- **P1 Testing** — 4 issues labeled `harness` + `p1-important`
|
||||
- **P2 LHF** — 3 issues labeled `harness` + `p2-backlog`
|
||||
- **P2 Whitestone** — 9 issues labeled `harness` + `p2-backlog`
|
||||
- **Infrastructure** — 36 issues labeled `infrastructure` + `deprioritized`
|
||||
- **Philosophy** — 44 issues labeled `philosophy`
|
||||
- **Gemini Review** — 15 issues labeled `gemini-review`
|
||||
- **Consolidation** — 20 issues labeled `consolidation`
|
||||
|
||||
### 6. Gemini Issues (15) — Tagged for Review
|
||||
|
||||
#577, #578, #579, #1006, #1007, #1008, #1009, #1010, #1012, #1013,
|
||||
#1014, #1016, #1017, #1018, #1019
|
||||
|
||||
Labeled `gemini-review` for human review of alignment with harness-first strategy.
|
||||
|
||||
---
|
||||
|
||||
## Domain Breakdown
|
||||
|
||||
| Domain | Count | % |
|
||||
|--------|-------|---|
|
||||
| **HARNESS (The Product)** | 219 | 75% |
|
||||
| **INFRASTRUCTURE (The Stage)** | 39 | 13% |
|
||||
| **CLOSE: Rejected Direction** | 17 | 6% |
|
||||
| **UNCATEGORIZED** | 18 | 6% |
|
||||
|
||||
## P0 Priority Stack (Harness)
|
||||
|
||||
1. **Heartbeat v2** — Agent loop + WorldInterface (PR #900)
|
||||
2. **Inference Cascade** — Local model routing (#966, #1064-#1069, #1075)
|
||||
3. **Session Crystallization** — Memory/handoff (#982, #983-#986)
|
||||
4. **Perception Pipeline** — Game state extraction (#963-#965, #1008)
|
||||
195
docs/mcp-setup.md
Normal file
195
docs/mcp-setup.md
Normal file
@@ -0,0 +1,195 @@
|
||||
# MCP Bridge Setup — Qwen3 via Ollama
|
||||
|
||||
This document describes how the MCP (Model Context Protocol) bridge connects
|
||||
Qwen3 models running in Ollama to Timmy's tool ecosystem.
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
User Prompt
|
||||
│
|
||||
▼
|
||||
┌──────────────┐ /api/chat ┌──────────────────┐
|
||||
│ MCPBridge │ ──────────────────▶ │ Ollama (Qwen3) │
|
||||
│ (Python) │ ◀────────────────── │ tool_calls JSON │
|
||||
└──────┬───────┘ └──────────────────┘
|
||||
│
|
||||
│ Execute tool calls
|
||||
▼
|
||||
┌──────────────────────────────────────────────┐
|
||||
│ MCP Tool Handlers │
|
||||
├──────────────┬───────────────┬───────────────┤
|
||||
│ Gitea API │ Shell Exec │ Custom Tools │
|
||||
│ (httpx) │ (ShellHand) │ (pluggable) │
|
||||
└──────────────┴───────────────┴───────────────┘
|
||||
```
|
||||
|
||||
## Bridge Options Evaluated
|
||||
|
||||
| Option | Verdict | Reason |
|
||||
|--------|---------|--------|
|
||||
| **Direct Ollama /api/chat** | **Selected** | Zero extra deps, native Qwen3 tool support, full control |
|
||||
| qwen-agent MCP | Rejected | Adds heavy dependency (qwen-agent), overlaps with Agno |
|
||||
| ollmcp | Rejected | External Go binary, limited error handling |
|
||||
| mcphost | Rejected | Generic host, doesn't integrate with existing tool safety |
|
||||
| ollama-mcp-bridge | Rejected | Purpose-built but unmaintained, Node.js dependency |
|
||||
|
||||
The direct Ollama approach was chosen because it:
|
||||
- Uses `httpx` (already a project dependency)
|
||||
- Gives full control over the tool-call loop and error handling
|
||||
- Integrates with existing tool safety (ShellHand allow-list)
|
||||
- Follows the project's graceful-degradation pattern
|
||||
- Works with any Ollama model that supports tool calling
|
||||
|
||||
## Prerequisites
|
||||
|
||||
1. **Ollama** running locally (default: `http://localhost:11434`)
|
||||
2. **Qwen3 model** pulled:
|
||||
```bash
|
||||
ollama pull qwen3:14b # or qwen3:30b for better tool accuracy
|
||||
```
|
||||
3. **Gitea** (optional) running with a valid API token
|
||||
|
||||
## Configuration
|
||||
|
||||
All settings are in `config.py` via environment variables or `.env`:
|
||||
|
||||
| Setting | Default | Description |
|
||||
|---------|---------|-------------|
|
||||
| `OLLAMA_URL` | `http://localhost:11434` | Ollama API endpoint |
|
||||
| `OLLAMA_MODEL` | `qwen3:30b` | Default model for tool calling |
|
||||
| `OLLAMA_NUM_CTX` | `4096` | Context window cap |
|
||||
| `MCP_BRIDGE_TIMEOUT` | `60` | HTTP timeout for bridge calls (seconds) |
|
||||
| `GITEA_URL` | `http://localhost:3000` | Gitea instance URL |
|
||||
| `GITEA_TOKEN` | (empty) | Gitea API token |
|
||||
| `GITEA_REPO` | `rockachopa/Timmy-time-dashboard` | Target repository |
|
||||
|
||||
## Usage
|
||||
|
||||
### Basic usage
|
||||
|
||||
```python
|
||||
from timmy.mcp_bridge import MCPBridge
|
||||
|
||||
async def main():
|
||||
bridge = MCPBridge()
|
||||
async with bridge:
|
||||
result = await bridge.run("List open issues in the repo")
|
||||
print(result.content)
|
||||
print(f"Tool calls: {len(result.tool_calls_made)}")
|
||||
print(f"Latency: {result.latency_ms:.0f}ms")
|
||||
```
|
||||
|
||||
### With custom tools
|
||||
|
||||
```python
|
||||
from timmy.mcp_bridge import MCPBridge, MCPToolDef
|
||||
|
||||
async def my_handler(**kwargs):
|
||||
return f"Processed: {kwargs}"
|
||||
|
||||
custom_tool = MCPToolDef(
|
||||
name="my_tool",
|
||||
description="Does something custom",
|
||||
parameters={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"input": {"type": "string", "description": "Input data"},
|
||||
},
|
||||
"required": ["input"],
|
||||
},
|
||||
handler=my_handler,
|
||||
)
|
||||
|
||||
bridge = MCPBridge(extra_tools=[custom_tool])
|
||||
```
|
||||
|
||||
### Selective tool loading
|
||||
|
||||
```python
|
||||
# Gitea tools only (no shell)
|
||||
bridge = MCPBridge(include_shell=False)
|
||||
|
||||
# Shell only (no Gitea)
|
||||
bridge = MCPBridge(include_gitea=False)
|
||||
|
||||
# Custom model
|
||||
bridge = MCPBridge(model="qwen3:14b")
|
||||
```
|
||||
|
||||
## Available Tools
|
||||
|
||||
### Gitea Tools (enabled when `GITEA_TOKEN` is set)
|
||||
|
||||
| Tool | Description |
|
||||
|------|-------------|
|
||||
| `list_issues` | List issues by state (open/closed/all) |
|
||||
| `create_issue` | Create a new issue with title and body |
|
||||
| `read_issue` | Read details of a specific issue by number |
|
||||
|
||||
### Shell Tool (enabled by default)
|
||||
|
||||
| Tool | Description |
|
||||
|------|-------------|
|
||||
| `shell_exec` | Execute sandboxed shell commands (allow-list enforced) |
|
||||
|
||||
The shell tool uses the project's `ShellHand` with its allow-list of safe
|
||||
commands (make, pytest, git, ls, cat, grep, etc.). Dangerous commands are
|
||||
blocked.
|
||||
|
||||
## How Tool Calling Works
|
||||
|
||||
1. User prompt is sent to Ollama with tool definitions
|
||||
2. Qwen3 generates a response — either text or `tool_calls` JSON
|
||||
3. If tool calls are present, the bridge executes each one
|
||||
4. Tool results are appended to the message history as `role: "tool"`
|
||||
5. The updated history is sent back to the model
|
||||
6. Steps 2-5 repeat until the model produces a final text response
|
||||
7. Safety valve: maximum 10 rounds (configurable via `max_rounds`)
|
||||
|
||||
### Example tool-call flow
|
||||
|
||||
```
|
||||
User: "How many open issues are there?"
|
||||
|
||||
Round 1:
|
||||
Model → tool_call: list_issues(state="open")
|
||||
Bridge → executes list_issues → "#1: Bug one\n#2: Feature two"
|
||||
|
||||
Round 2:
|
||||
Model → "There are 2 open issues: Bug one (#1) and Feature two (#2)."
|
||||
Bridge → returns BridgeResult(content="There are 2 open issues...")
|
||||
```
|
||||
|
||||
## Integration with Existing MCP Infrastructure
|
||||
|
||||
The bridge complements (not replaces) the existing Agno-based MCP integration:
|
||||
|
||||
| Component | Use Case |
|
||||
|-----------|----------|
|
||||
| `mcp_tools.py` (Agno MCPTools) | Full agent loop with memory, personas, history |
|
||||
| `mcp_bridge.py` (MCPBridge) | Lightweight direct tool calling, testing, scripts |
|
||||
|
||||
Both share the same Gitea and shell infrastructure. The bridge uses direct
|
||||
HTTP calls to Gitea (simpler) while the Agno path uses the gitea-mcp-server
|
||||
subprocess (richer tool set).
|
||||
|
||||
## Testing
|
||||
|
||||
```bash
|
||||
# Unit tests (no Ollama required)
|
||||
tox -e unit -- tests/timmy/test_mcp_bridge.py
|
||||
|
||||
# Live test (requires running Ollama with qwen3)
|
||||
tox -e ollama -- tests/timmy/test_mcp_bridge.py
|
||||
```
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
| Problem | Solution |
|
||||
|---------|----------|
|
||||
| "Ollama connection failed" | Ensure `ollama serve` is running |
|
||||
| "Model not found" | Run `ollama pull qwen3:14b` |
|
||||
| Tool calls return errors | Check tool allow-list in ShellHand |
|
||||
| "max tool-call rounds reached" | Model is looping — simplify the prompt |
|
||||
| Gitea tools return empty | Check `GITEA_TOKEN` and `GITEA_URL` |
|
||||
74
docs/research/integration-architecture-deep-dives.md
Normal file
74
docs/research/integration-architecture-deep-dives.md
Normal file
@@ -0,0 +1,74 @@
|
||||
# Timmy Time Integration Architecture: Eight Deep Dives into Real Deployment
|
||||
|
||||
> **Source:** PDF attached to issue #946, written during Veloren exploration phase.
|
||||
> Many patterns are game-agnostic and apply to the Morrowind/OpenClaw pivot.
|
||||
|
||||
## Summary of Eight Deep Dives
|
||||
|
||||
### 1. Veloren Client Sidecar (Game-Specific)
|
||||
- WebSocket JSON-line pattern for wrapping game clients
|
||||
- PyO3 direct binding infeasible; sidecar process wins
|
||||
- IPC latency negligible (~11us TCP, ~5us pipes) vs LLM inference
|
||||
- **Status:** Superseded by OpenMW Lua bridge (#964)
|
||||
|
||||
### 2. Agno Ollama Tool Calling is Broken
|
||||
- Agno issues #2231, #2625, #1419, #1612, #4715 document persistent breakage
|
||||
- Root cause: Agno's Ollama model class doesn't robustly parse native tool_calls
|
||||
- **Fix:** Use Ollama's `format` parameter with Pydantic JSON schemas directly
|
||||
- Recommended models: qwen3-coder:32b (top), glm-4.7-flash, gpt-oss:20b
|
||||
- Critical settings: temperature 0.0-0.2, stream=False for tool calls
|
||||
- **Status:** Covered by #966 (three-tier router)
|
||||
|
||||
### 3. MCP is the Right Abstraction
|
||||
- FastMCP averages 26.45ms per tool call (TM Dev Lab benchmark, Feb 2026)
|
||||
- Total MCP overhead per cycle: ~20-60ms (<3% of 2-second budget)
|
||||
- Agno has first-class bidirectional MCP integration (MCPTools, MultiMCPTools)
|
||||
- Use stdio transport for near-zero latency; return compressed JPEG not base64
|
||||
- **Status:** Covered by #984 (MCP restore)
|
||||
|
||||
### 4. Human + AI Co-op Architecture (Game-Specific)
|
||||
- Headless client treated identically to graphical client by server
|
||||
- Leverages party system, trade API, and /tell for communication
|
||||
- Mode switching: solo autonomous play when human absent, assist when present
|
||||
- **Status:** Defer until after tutorial completion
|
||||
|
||||
### 5. Real Latency Numbers
|
||||
- All-local M3 Max pipeline: 4-9 seconds per full cycle
|
||||
- Groq hybrid pipeline: 3-7 seconds per full cycle
|
||||
- VLM inference is 50-70% of total pipeline time (bottleneck)
|
||||
- Dual-model Ollama on 96GB M3 Max: ~11-14GB, ~70GB free
|
||||
- **Status:** Superseded by API-first perception (#963)
|
||||
|
||||
### 6. Content Moderation (Three-Layer Defense)
|
||||
- Layer 1: Game-context system prompts (Morrowind themes as game mechanics)
|
||||
- Layer 2: Llama Guard 3 1B at <30ms/sentence for real-time filtering
|
||||
- Layer 3: Per-game moderation profiles with vocabulary whitelists
|
||||
- Run moderation + TTS preprocessing in parallel for zero added latency
|
||||
- Neuro-sama incident (Dec 2022) is the cautionary tale
|
||||
- **Status:** New issue created → #1056
|
||||
|
||||
### 7. Model Selection (Qwen3-8B vs Hermes 3)
|
||||
- Three-role architecture: Perception (Qwen3-VL 8B), Decision (Qwen3-8B), Narration (Hermes 3 8B)
|
||||
- Qwen3-8B outperforms Qwen2.5-14B on 15 benchmarks
|
||||
- Hermes 3 best for narration (steerability, roleplaying)
|
||||
- Both use identical Hermes Function Calling standard
|
||||
- **Status:** Partially covered by #966 (three-tier router)
|
||||
|
||||
### 8. Split Hetzner + Mac Deployment
|
||||
- Hetzner GEX44 (RTX 4000 SFF Ada, €184/month) for rendering/streaming
|
||||
- Mac M3 Max for all AI inference via Tailscale
|
||||
- Use FFmpeg x11grab + NVENC, not OBS (no headless support)
|
||||
- Use headless Xorg, not Xvfb (GPU access required for Vulkan)
|
||||
- Total cost: ~$200/month
|
||||
- **Status:** Referenced in #982 sprint plan
|
||||
|
||||
## Cross-Reference to Active Issues
|
||||
|
||||
| Research Topic | Active Issue | Status |
|
||||
|---------------|-------------|--------|
|
||||
| Pydantic structured output for Ollama | #966 (three-tier router) | In progress |
|
||||
| FastMCP tool server | #984 (MCP restore) | In progress |
|
||||
| Content moderation pipeline | #1056 (new) | Created from this research |
|
||||
| Split Hetzner + Mac deployment | #982 (sprint plan) | Referenced |
|
||||
| VLM latency / perception | #963 (perception bottleneck) | API-first approach |
|
||||
| OpenMW bridge (replaces Veloren sidecar) | #964 | In progress |
|
||||
@@ -50,6 +50,7 @@ sounddevice = { version = ">=0.4.6", optional = true }
|
||||
sentence-transformers = { version = ">=2.0.0", optional = true }
|
||||
numpy = { version = ">=1.24.0", optional = true }
|
||||
requests = { version = ">=2.31.0", optional = true }
|
||||
trafilatura = { version = ">=1.6.0", optional = true }
|
||||
GitPython = { version = ">=3.1.40", optional = true }
|
||||
pytest = { version = ">=8.0.0", optional = true }
|
||||
pytest-asyncio = { version = ">=0.24.0", optional = true }
|
||||
@@ -67,6 +68,7 @@ voice = ["pyttsx3", "openai-whisper", "piper-tts", "sounddevice"]
|
||||
celery = ["celery"]
|
||||
embeddings = ["sentence-transformers", "numpy"]
|
||||
git = ["GitPython"]
|
||||
research = ["requests", "trafilatura"]
|
||||
dev = ["pytest", "pytest-asyncio", "pytest-cov", "pytest-timeout", "pytest-randomly", "pytest-xdist", "selenium"]
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
|
||||
66
scripts/claude_quota_check.sh
Executable file
66
scripts/claude_quota_check.sh
Executable file
@@ -0,0 +1,66 @@
|
||||
#!/usr/bin/env bash
|
||||
# claude_quota_check.sh — Quick CLI check of Claude API quota and metabolic mode.
|
||||
#
|
||||
# Usage:
|
||||
# ./scripts/claude_quota_check.sh # Human-readable report
|
||||
# ./scripts/claude_quota_check.sh --mode # Print current mode only (BURST/ACTIVE/RESTING)
|
||||
# ./scripts/claude_quota_check.sh --json # JSON output for scripting
|
||||
#
|
||||
# Refs: #1074, #972
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)"
|
||||
SRC="${REPO_ROOT}/src"
|
||||
|
||||
# Ensure we can import the project Python modules
|
||||
export PYTHONPATH="${SRC}:${PYTHONPATH:-}"
|
||||
|
||||
MODE_ONLY=0
|
||||
JSON_OUTPUT=0
|
||||
|
||||
for arg in "$@"; do
|
||||
case "$arg" in
|
||||
--mode) MODE_ONLY=1 ;;
|
||||
--json) JSON_OUTPUT=1 ;;
|
||||
-h|--help)
|
||||
echo "Usage: $0 [--mode|--json]"
|
||||
echo " (no flags) Human-readable quota report"
|
||||
echo " --mode Print current metabolic mode only"
|
||||
echo " --json JSON output for scripting"
|
||||
exit 0
|
||||
;;
|
||||
*)
|
||||
echo "Unknown flag: $arg" >&2
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
if [[ $MODE_ONLY -eq 1 ]]; then
|
||||
python3 - <<'PYEOF'
|
||||
from infrastructure.claude_quota import current_mode
|
||||
print(current_mode())
|
||||
PYEOF
|
||||
|
||||
elif [[ $JSON_OUTPUT -eq 1 ]]; then
|
||||
python3 - <<'PYEOF'
|
||||
import json
|
||||
from infrastructure.claude_quota import get_quota_store
|
||||
store = get_quota_store()
|
||||
today = store.today_summary()
|
||||
month = store.month_summary()
|
||||
print(json.dumps({
|
||||
"today": today.as_dict(),
|
||||
"month": month.as_dict(),
|
||||
"current_mode": today.mode,
|
||||
}))
|
||||
PYEOF
|
||||
|
||||
else
|
||||
python3 - <<'PYEOF'
|
||||
from infrastructure.claude_quota import quota_report
|
||||
print(quota_report())
|
||||
PYEOF
|
||||
fi
|
||||
107
scripts/run_benchmarks.py
Normal file
107
scripts/run_benchmarks.py
Normal file
@@ -0,0 +1,107 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Run the agent performance regression benchmark suite.
|
||||
|
||||
Usage::
|
||||
|
||||
python scripts/run_benchmarks.py # all scenarios
|
||||
python scripts/run_benchmarks.py --tags navigation # filter by tag
|
||||
python scripts/run_benchmarks.py --output results/benchmarks.jsonl
|
||||
python scripts/run_benchmarks.py --compare results/benchmarks.jsonl
|
||||
|
||||
Exit codes:
|
||||
0 — all scenarios passed
|
||||
1 — one or more scenarios failed
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Ensure src/ is on the path when invoked directly
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))
|
||||
|
||||
from infrastructure.world.benchmark.metrics import BenchmarkMetrics, load_history
|
||||
from infrastructure.world.benchmark.runner import BenchmarkRunner
|
||||
from infrastructure.world.benchmark.scenarios import load_scenarios
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Agent performance regression benchmark suite",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--tags",
|
||||
nargs="*",
|
||||
default=None,
|
||||
help="Filter scenarios by tag (e.g. navigation quest)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output",
|
||||
type=Path,
|
||||
default=None,
|
||||
help="JSONL file to append results to",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--compare",
|
||||
type=Path,
|
||||
default=None,
|
||||
help="JSONL file with baseline results for regression comparison",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
async def main() -> int:
|
||||
args = parse_args()
|
||||
|
||||
scenarios = load_scenarios(tags=args.tags)
|
||||
if not scenarios:
|
||||
print("No matching scenarios found.")
|
||||
return 1
|
||||
|
||||
print(f"Running {len(scenarios)} benchmark scenario(s)...\n")
|
||||
|
||||
runner = BenchmarkRunner()
|
||||
metrics = await runner.run(scenarios)
|
||||
|
||||
print(metrics.summary())
|
||||
|
||||
if args.output:
|
||||
metrics.save(args.output)
|
||||
|
||||
if args.compare:
|
||||
history = load_history(args.compare)
|
||||
if history:
|
||||
from infrastructure.world.benchmark.metrics import compare_runs
|
||||
|
||||
# Reconstruct baseline from last recorded run
|
||||
last = history[0]
|
||||
baseline = BenchmarkMetrics(
|
||||
timestamp=last.get("timestamp", ""),
|
||||
commit_sha=last.get("commit_sha", ""),
|
||||
total_time_ms=last.get("total_time_ms", 0),
|
||||
)
|
||||
for s in last.get("scenarios", []):
|
||||
from infrastructure.world.benchmark.metrics import ScenarioResult
|
||||
|
||||
baseline.results.append(
|
||||
ScenarioResult(
|
||||
scenario_name=s["scenario_name"],
|
||||
success=s["success"],
|
||||
cycles_used=s["cycles_used"],
|
||||
max_cycles=s["max_cycles"],
|
||||
wall_time_ms=s.get("wall_time_ms", 0),
|
||||
llm_calls=s.get("llm_calls", 0),
|
||||
metabolic_cost=s.get("metabolic_cost", 0.0),
|
||||
)
|
||||
)
|
||||
print()
|
||||
print(compare_runs(metrics, baseline))
|
||||
|
||||
return 0 if metrics.fail_count == 0 else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(asyncio.run(main()))
|
||||
@@ -99,6 +99,14 @@ class Settings(BaseSettings):
|
||||
anthropic_api_key: str = ""
|
||||
claude_model: str = "haiku"
|
||||
|
||||
# ── Content Moderation ──────────────────────────────────────────────
|
||||
# Three-layer moderation pipeline for AI narrator output.
|
||||
# Uses Llama Guard via Ollama with regex fallback.
|
||||
moderation_enabled: bool = True
|
||||
moderation_guard_model: str = "llama-guard3:1b"
|
||||
# Default confidence threshold — per-game profiles can override.
|
||||
moderation_threshold: float = 0.8
|
||||
|
||||
# ── Spark Intelligence ────────────────────────────────────────────────
|
||||
# Enable/disable the Spark cognitive layer.
|
||||
# When enabled, Spark captures swarm events, runs EIDOS predictions,
|
||||
@@ -144,6 +152,10 @@ class Settings(BaseSettings):
|
||||
# Default is False (telemetry disabled) to align with sovereign AI vision.
|
||||
telemetry_enabled: bool = False
|
||||
|
||||
# ── Sovereignty Metrics ──────────────────────────────────────────────
|
||||
# Alert when API cost per research task exceeds this threshold (USD).
|
||||
sovereignty_api_cost_alert_threshold: float = 1.00
|
||||
|
||||
# CORS allowed origins for the web chat interface (Gitea Pages, etc.)
|
||||
# Set CORS_ORIGINS as a comma-separated list, e.g. "http://localhost:3000,https://example.com"
|
||||
cors_origins: list[str] = [
|
||||
@@ -290,6 +302,7 @@ class Settings(BaseSettings):
|
||||
mcp_gitea_command: str = "gitea-mcp-server -t stdio"
|
||||
mcp_filesystem_command: str = "npx -y @modelcontextprotocol/server-filesystem"
|
||||
mcp_timeout: int = 15
|
||||
mcp_bridge_timeout: int = 60 # HTTP timeout for MCP bridge Ollama calls (seconds)
|
||||
|
||||
# ── Loop QA (Self-Testing) ─────────────────────────────────────────
|
||||
# Self-test orchestrator that probes capabilities alongside the thinking loop.
|
||||
|
||||
@@ -45,6 +45,7 @@ from dashboard.routes.models import api_router as models_api_router
|
||||
from dashboard.routes.models import router as models_router
|
||||
from dashboard.routes.quests import router as quests_router
|
||||
from dashboard.routes.scorecards import router as scorecards_router
|
||||
from dashboard.routes.sovereignty_metrics import router as sovereignty_metrics_router
|
||||
from dashboard.routes.spark import router as spark_router
|
||||
from dashboard.routes.system import router as system_router
|
||||
from dashboard.routes.tasks import router as tasks_router
|
||||
@@ -631,6 +632,7 @@ app.include_router(tower_router)
|
||||
app.include_router(daily_run_router)
|
||||
app.include_router(quests_router)
|
||||
app.include_router(scorecards_router)
|
||||
app.include_router(sovereignty_metrics_router)
|
||||
|
||||
|
||||
@app.websocket("/ws")
|
||||
|
||||
@@ -125,7 +125,7 @@ def _run_grok_query(message: str) -> dict:
|
||||
from lightning.factory import get_backend as get_ln_backend
|
||||
|
||||
ln = get_ln_backend()
|
||||
sats = min(settings.grok_max_sats_per_query, 100)
|
||||
sats = min(settings.grok_max_sats_per_query, settings.grok_sats_hard_cap)
|
||||
ln.create_invoice(sats, f"Grok: {message[:50]}")
|
||||
invoice_note = f" | {sats} sats"
|
||||
except Exception as exc:
|
||||
|
||||
74
src/dashboard/routes/sovereignty_metrics.py
Normal file
74
src/dashboard/routes/sovereignty_metrics.py
Normal file
@@ -0,0 +1,74 @@
|
||||
"""Sovereignty metrics dashboard routes.
|
||||
|
||||
Provides API endpoints and HTMX partials for tracking research
|
||||
sovereignty progress against graduation targets.
|
||||
|
||||
Refs: #981
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from fastapi import APIRouter, Request
|
||||
from fastapi.responses import HTMLResponse
|
||||
|
||||
from config import settings
|
||||
from dashboard.templating import templates
|
||||
from infrastructure.sovereignty_metrics import (
|
||||
GRADUATION_TARGETS,
|
||||
get_sovereignty_store,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/sovereignty", tags=["sovereignty"])
|
||||
|
||||
|
||||
@router.get("/metrics")
|
||||
async def sovereignty_metrics_api() -> dict[str, Any]:
|
||||
"""JSON API: full sovereignty metrics summary with trends."""
|
||||
store = get_sovereignty_store()
|
||||
summary = store.get_summary()
|
||||
alerts = store.get_alerts(unacknowledged_only=True)
|
||||
return {
|
||||
"metrics": summary,
|
||||
"alerts": alerts,
|
||||
"targets": GRADUATION_TARGETS,
|
||||
"cost_threshold": settings.sovereignty_api_cost_alert_threshold,
|
||||
}
|
||||
|
||||
|
||||
@router.get("/metrics/panel", response_class=HTMLResponse)
|
||||
async def sovereignty_metrics_panel(request: Request) -> HTMLResponse:
|
||||
"""HTMX partial: sovereignty metrics progress panel."""
|
||||
store = get_sovereignty_store()
|
||||
summary = store.get_summary()
|
||||
alerts = store.get_alerts(unacknowledged_only=True)
|
||||
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
"partials/sovereignty_metrics.html",
|
||||
{
|
||||
"metrics": summary,
|
||||
"alerts": alerts,
|
||||
"targets": GRADUATION_TARGETS,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@router.get("/alerts")
|
||||
async def sovereignty_alerts_api() -> dict[str, Any]:
|
||||
"""JSON API: sovereignty alerts."""
|
||||
store = get_sovereignty_store()
|
||||
return {
|
||||
"alerts": store.get_alerts(unacknowledged_only=False),
|
||||
"unacknowledged": store.get_alerts(unacknowledged_only=True),
|
||||
}
|
||||
|
||||
|
||||
@router.post("/alerts/{alert_id}/acknowledge")
|
||||
async def acknowledge_alert(alert_id: int) -> dict[str, bool]:
|
||||
"""Acknowledge a sovereignty alert."""
|
||||
store = get_sovereignty_store()
|
||||
success = store.acknowledge_alert(alert_id)
|
||||
return {"success": success}
|
||||
@@ -104,29 +104,25 @@ class _TaskView:
|
||||
@router.get("/tasks", response_class=HTMLResponse)
|
||||
async def tasks_page(request: Request):
|
||||
"""Render the main task queue page with 3-column layout."""
|
||||
try:
|
||||
with _get_db() as db:
|
||||
pending = [
|
||||
_TaskView(_row_to_dict(r))
|
||||
for r in db.execute(
|
||||
"SELECT * FROM tasks WHERE status IN ('pending_approval') ORDER BY created_at DESC"
|
||||
).fetchall()
|
||||
]
|
||||
active = [
|
||||
_TaskView(_row_to_dict(r))
|
||||
for r in db.execute(
|
||||
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
|
||||
).fetchall()
|
||||
]
|
||||
completed = [
|
||||
_TaskView(_row_to_dict(r))
|
||||
for r in db.execute(
|
||||
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
|
||||
).fetchall()
|
||||
]
|
||||
except sqlite3.OperationalError as exc:
|
||||
logger.warning("Task DB unavailable: %s", exc)
|
||||
pending, active, completed = [], [], []
|
||||
with _get_db() as db:
|
||||
pending = [
|
||||
_TaskView(_row_to_dict(r))
|
||||
for r in db.execute(
|
||||
"SELECT * FROM tasks WHERE status IN ('pending_approval') ORDER BY created_at DESC"
|
||||
).fetchall()
|
||||
]
|
||||
active = [
|
||||
_TaskView(_row_to_dict(r))
|
||||
for r in db.execute(
|
||||
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
|
||||
).fetchall()
|
||||
]
|
||||
completed = [
|
||||
_TaskView(_row_to_dict(r))
|
||||
for r in db.execute(
|
||||
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
|
||||
).fetchall()
|
||||
]
|
||||
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
@@ -147,76 +143,49 @@ async def tasks_page(request: Request):
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _render_task_list(request: Request, query: str, empty_msg: str) -> HTMLResponse:
|
||||
"""Fetch tasks by query and render as HTMX task-card partials."""
|
||||
with _get_db() as db:
|
||||
rows = db.execute(query).fetchall()
|
||||
parts = [
|
||||
templates.TemplateResponse(
|
||||
request, "partials/task_card.html", {"task": _TaskView(_row_to_dict(r))}
|
||||
).body.decode()
|
||||
for r in rows
|
||||
]
|
||||
if not parts:
|
||||
return HTMLResponse(f'<div class="empty-column">{empty_msg}</div>')
|
||||
return HTMLResponse("".join(parts))
|
||||
|
||||
|
||||
@router.get("/tasks/pending", response_class=HTMLResponse)
|
||||
async def tasks_pending(request: Request):
|
||||
"""Return HTMX partial for pending approval tasks."""
|
||||
try:
|
||||
with _get_db() as db:
|
||||
rows = db.execute(
|
||||
"SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC"
|
||||
).fetchall()
|
||||
except sqlite3.OperationalError as exc:
|
||||
logger.warning("Task DB unavailable: %s", exc)
|
||||
return HTMLResponse('<div class="empty-column">Database unavailable</div>')
|
||||
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
|
||||
parts = []
|
||||
for task in tasks:
|
||||
parts.append(
|
||||
templates.TemplateResponse(
|
||||
request, "partials/task_card.html", {"task": task}
|
||||
).body.decode()
|
||||
)
|
||||
if not parts:
|
||||
return HTMLResponse('<div class="empty-column">No pending tasks</div>')
|
||||
return HTMLResponse("".join(parts))
|
||||
return _render_task_list(
|
||||
request,
|
||||
"SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC",
|
||||
"No pending tasks",
|
||||
)
|
||||
|
||||
|
||||
@router.get("/tasks/active", response_class=HTMLResponse)
|
||||
async def tasks_active(request: Request):
|
||||
"""Return HTMX partial for active (approved/running/paused) tasks."""
|
||||
try:
|
||||
with _get_db() as db:
|
||||
rows = db.execute(
|
||||
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
|
||||
).fetchall()
|
||||
except sqlite3.OperationalError as exc:
|
||||
logger.warning("Task DB unavailable: %s", exc)
|
||||
return HTMLResponse('<div class="empty-column">Database unavailable</div>')
|
||||
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
|
||||
parts = []
|
||||
for task in tasks:
|
||||
parts.append(
|
||||
templates.TemplateResponse(
|
||||
request, "partials/task_card.html", {"task": task}
|
||||
).body.decode()
|
||||
)
|
||||
if not parts:
|
||||
return HTMLResponse('<div class="empty-column">No active tasks</div>')
|
||||
return HTMLResponse("".join(parts))
|
||||
return _render_task_list(
|
||||
request,
|
||||
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC",
|
||||
"No active tasks",
|
||||
)
|
||||
|
||||
|
||||
@router.get("/tasks/completed", response_class=HTMLResponse)
|
||||
async def tasks_completed(request: Request):
|
||||
"""Return HTMX partial for completed/vetoed/failed tasks (last 50)."""
|
||||
try:
|
||||
with _get_db() as db:
|
||||
rows = db.execute(
|
||||
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
|
||||
).fetchall()
|
||||
except sqlite3.OperationalError as exc:
|
||||
logger.warning("Task DB unavailable: %s", exc)
|
||||
return HTMLResponse('<div class="empty-column">Database unavailable</div>')
|
||||
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
|
||||
parts = []
|
||||
for task in tasks:
|
||||
parts.append(
|
||||
templates.TemplateResponse(
|
||||
request, "partials/task_card.html", {"task": task}
|
||||
).body.decode()
|
||||
)
|
||||
if not parts:
|
||||
return HTMLResponse('<div class="empty-column">No completed tasks yet</div>')
|
||||
return HTMLResponse("".join(parts))
|
||||
return _render_task_list(
|
||||
request,
|
||||
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50",
|
||||
"No completed tasks yet",
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -241,17 +210,13 @@ async def create_task_form(
|
||||
now = datetime.now(UTC).isoformat()
|
||||
priority = priority if priority in VALID_PRIORITIES else "normal"
|
||||
|
||||
try:
|
||||
with _get_db() as db:
|
||||
db.execute(
|
||||
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_at) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(task_id, title, description, priority, assigned_to, now),
|
||||
)
|
||||
db.commit()
|
||||
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||||
except sqlite3.OperationalError as exc:
|
||||
logger.warning("Task DB unavailable: %s", exc)
|
||||
raise HTTPException(status_code=503, detail="Task database unavailable") from exc
|
||||
with _get_db() as db:
|
||||
db.execute(
|
||||
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_at) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(task_id, title, description, priority, assigned_to, now),
|
||||
)
|
||||
db.commit()
|
||||
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||||
|
||||
task = _TaskView(_row_to_dict(row))
|
||||
return templates.TemplateResponse(request, "partials/task_card.html", {"task": task})
|
||||
@@ -300,17 +265,13 @@ async def modify_task(
|
||||
description: str = Form(""),
|
||||
):
|
||||
"""Update task title and description."""
|
||||
try:
|
||||
with _get_db() as db:
|
||||
db.execute(
|
||||
"UPDATE tasks SET title=?, description=? WHERE id=?",
|
||||
(title, description, task_id),
|
||||
)
|
||||
db.commit()
|
||||
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||||
except sqlite3.OperationalError as exc:
|
||||
logger.warning("Task DB unavailable: %s", exc)
|
||||
raise HTTPException(status_code=503, detail="Task database unavailable") from exc
|
||||
with _get_db() as db:
|
||||
db.execute(
|
||||
"UPDATE tasks SET title=?, description=? WHERE id=?",
|
||||
(title, description, task_id),
|
||||
)
|
||||
db.commit()
|
||||
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||||
if not row:
|
||||
raise HTTPException(404, "Task not found")
|
||||
task = _TaskView(_row_to_dict(row))
|
||||
@@ -322,17 +283,13 @@ async def _set_status(request: Request, task_id: str, new_status: str):
|
||||
completed_at = (
|
||||
datetime.now(UTC).isoformat() if new_status in ("completed", "vetoed", "failed") else None
|
||||
)
|
||||
try:
|
||||
with _get_db() as db:
|
||||
db.execute(
|
||||
"UPDATE tasks SET status=?, completed_at=COALESCE(?, completed_at) WHERE id=?",
|
||||
(new_status, completed_at, task_id),
|
||||
)
|
||||
db.commit()
|
||||
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||||
except sqlite3.OperationalError as exc:
|
||||
logger.warning("Task DB unavailable: %s", exc)
|
||||
raise HTTPException(status_code=503, detail="Task database unavailable") from exc
|
||||
with _get_db() as db:
|
||||
db.execute(
|
||||
"UPDATE tasks SET status=?, completed_at=COALESCE(?, completed_at) WHERE id=?",
|
||||
(new_status, completed_at, task_id),
|
||||
)
|
||||
db.commit()
|
||||
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||||
if not row:
|
||||
raise HTTPException(404, "Task not found")
|
||||
task = _TaskView(_row_to_dict(row))
|
||||
@@ -358,26 +315,22 @@ async def api_create_task(request: Request):
|
||||
if priority not in VALID_PRIORITIES:
|
||||
priority = "normal"
|
||||
|
||||
try:
|
||||
with _get_db() as db:
|
||||
db.execute(
|
||||
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_by, created_at) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
task_id,
|
||||
title,
|
||||
body.get("description", ""),
|
||||
priority,
|
||||
body.get("assigned_to", ""),
|
||||
body.get("created_by", "operator"),
|
||||
now,
|
||||
),
|
||||
)
|
||||
db.commit()
|
||||
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||||
except sqlite3.OperationalError as exc:
|
||||
logger.warning("Task DB unavailable: %s", exc)
|
||||
raise HTTPException(status_code=503, detail="Task database unavailable") from exc
|
||||
with _get_db() as db:
|
||||
db.execute(
|
||||
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_by, created_at) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
task_id,
|
||||
title,
|
||||
body.get("description", ""),
|
||||
priority,
|
||||
body.get("assigned_to", ""),
|
||||
body.get("created_by", "operator"),
|
||||
now,
|
||||
),
|
||||
)
|
||||
db.commit()
|
||||
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||||
|
||||
return JSONResponse(_row_to_dict(row), status_code=201)
|
||||
|
||||
@@ -385,12 +338,8 @@ async def api_create_task(request: Request):
|
||||
@router.get("/api/tasks", response_class=JSONResponse)
|
||||
async def api_list_tasks():
|
||||
"""List all tasks as JSON."""
|
||||
try:
|
||||
with _get_db() as db:
|
||||
rows = db.execute("SELECT * FROM tasks ORDER BY created_at DESC").fetchall()
|
||||
except sqlite3.OperationalError as exc:
|
||||
logger.warning("Task DB unavailable: %s", exc)
|
||||
return JSONResponse([], status_code=200)
|
||||
with _get_db() as db:
|
||||
rows = db.execute("SELECT * FROM tasks ORDER BY created_at DESC").fetchall()
|
||||
return JSONResponse([_row_to_dict(r) for r in rows])
|
||||
|
||||
|
||||
@@ -405,17 +354,13 @@ async def api_update_status(task_id: str, request: Request):
|
||||
completed_at = (
|
||||
datetime.now(UTC).isoformat() if new_status in ("completed", "vetoed", "failed") else None
|
||||
)
|
||||
try:
|
||||
with _get_db() as db:
|
||||
db.execute(
|
||||
"UPDATE tasks SET status=?, completed_at=COALESCE(?, completed_at) WHERE id=?",
|
||||
(new_status, completed_at, task_id),
|
||||
)
|
||||
db.commit()
|
||||
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||||
except sqlite3.OperationalError as exc:
|
||||
logger.warning("Task DB unavailable: %s", exc)
|
||||
raise HTTPException(status_code=503, detail="Task database unavailable") from exc
|
||||
with _get_db() as db:
|
||||
db.execute(
|
||||
"UPDATE tasks SET status=?, completed_at=COALESCE(?, completed_at) WHERE id=?",
|
||||
(new_status, completed_at, task_id),
|
||||
)
|
||||
db.commit()
|
||||
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||||
if not row:
|
||||
raise HTTPException(404, "Task not found")
|
||||
return JSONResponse(_row_to_dict(row))
|
||||
@@ -424,13 +369,9 @@ async def api_update_status(task_id: str, request: Request):
|
||||
@router.delete("/api/tasks/{task_id}", response_class=JSONResponse)
|
||||
async def api_delete_task(task_id: str):
|
||||
"""Delete a task."""
|
||||
try:
|
||||
with _get_db() as db:
|
||||
cursor = db.execute("DELETE FROM tasks WHERE id=?", (task_id,))
|
||||
db.commit()
|
||||
except sqlite3.OperationalError as exc:
|
||||
logger.warning("Task DB unavailable: %s", exc)
|
||||
raise HTTPException(status_code=503, detail="Task database unavailable") from exc
|
||||
with _get_db() as db:
|
||||
cursor = db.execute("DELETE FROM tasks WHERE id=?", (task_id,))
|
||||
db.commit()
|
||||
if cursor.rowcount == 0:
|
||||
raise HTTPException(404, "Task not found")
|
||||
return JSONResponse({"success": True, "id": task_id})
|
||||
@@ -444,19 +385,15 @@ async def api_delete_task(task_id: str):
|
||||
@router.get("/api/queue/status", response_class=JSONResponse)
|
||||
async def queue_status(assigned_to: str = "default"):
|
||||
"""Return queue status for the chat panel's agent status indicator."""
|
||||
try:
|
||||
with _get_db() as db:
|
||||
running = db.execute(
|
||||
"SELECT * FROM tasks WHERE status='running' AND assigned_to=? LIMIT 1",
|
||||
(assigned_to,),
|
||||
).fetchone()
|
||||
ahead = db.execute(
|
||||
"SELECT COUNT(*) as cnt FROM tasks WHERE status IN ('pending_approval','approved') AND assigned_to=?",
|
||||
(assigned_to,),
|
||||
).fetchone()
|
||||
except sqlite3.OperationalError as exc:
|
||||
logger.warning("Task DB unavailable: %s", exc)
|
||||
return JSONResponse({"is_working": False, "current_task": None, "tasks_ahead": 0})
|
||||
with _get_db() as db:
|
||||
running = db.execute(
|
||||
"SELECT * FROM tasks WHERE status='running' AND assigned_to=? LIMIT 1",
|
||||
(assigned_to,),
|
||||
).fetchone()
|
||||
ahead = db.execute(
|
||||
"SELECT COUNT(*) as cnt FROM tasks WHERE status IN ('pending_approval','approved') AND assigned_to=?",
|
||||
(assigned_to,),
|
||||
).fetchone()
|
||||
|
||||
if running:
|
||||
return JSONResponse(
|
||||
|
||||
@@ -179,6 +179,13 @@
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Sovereignty Metrics -->
|
||||
{% call panel("SOVEREIGNTY METRICS", id="sovereignty-metrics-panel",
|
||||
hx_get="/sovereignty/metrics/panel",
|
||||
hx_trigger="load, every 30s") %}
|
||||
<p class="chat-history-placeholder">Loading sovereignty metrics...</p>
|
||||
{% endcall %}
|
||||
|
||||
<!-- Chat History -->
|
||||
<div class="card mc-card-spaced">
|
||||
<div class="card-header">
|
||||
|
||||
63
src/dashboard/templates/partials/sovereignty_metrics.html
Normal file
63
src/dashboard/templates/partials/sovereignty_metrics.html
Normal file
@@ -0,0 +1,63 @@
|
||||
{# HTMX partial: Sovereignty Metrics Progress Panel
|
||||
Loaded via hx-get="/sovereignty/metrics/panel"
|
||||
Refs: #981
|
||||
#}
|
||||
{% set phase_labels = {"pre-start": "Pre-start", "week1": "Week 1", "month1": "Month 1", "month3": "Month 3", "graduated": "Graduated"} %}
|
||||
{% set phase_colors = {"pre-start": "var(--text-dim)", "week1": "var(--red)", "month1": "var(--amber)", "month3": "var(--green)", "graduated": "var(--purple)"} %}
|
||||
|
||||
{% set metric_labels = {
|
||||
"cache_hit_rate": "Cache Hit Rate",
|
||||
"api_cost": "API Cost / Task",
|
||||
"time_to_report": "Time to Report",
|
||||
"human_involvement": "Human Involvement",
|
||||
"local_artifacts": "Local Artifacts"
|
||||
} %}
|
||||
|
||||
{% set metric_units = {
|
||||
"cache_hit_rate": "%",
|
||||
"api_cost": "$",
|
||||
"time_to_report": "min",
|
||||
"human_involvement": "%",
|
||||
"local_artifacts": ""
|
||||
} %}
|
||||
|
||||
{% if alerts %}
|
||||
<div class="sov-alerts">
|
||||
{% for alert in alerts %}
|
||||
<div class="sov-alert-item">
|
||||
<span class="sov-alert-icon">!</span>
|
||||
<span>{{ alert.message }}</span>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
<div class="grid grid-3">
|
||||
{% for key, data in metrics.items() %}
|
||||
{% set label = metric_labels.get(key, key) %}
|
||||
{% set unit = metric_units.get(key, "") %}
|
||||
{% set phase = data.phase %}
|
||||
{% set color = phase_colors.get(phase, "var(--text-dim)") %}
|
||||
<div class="stat">
|
||||
<div class="stat-value" style="color: {{ color }}">
|
||||
{% if data.current is not none %}
|
||||
{% if key == "cache_hit_rate" or key == "human_involvement" %}
|
||||
{{ "%.0f"|format(data.current * 100) }}{{ unit }}
|
||||
{% elif key == "api_cost" %}
|
||||
{{ unit }}{{ "%.2f"|format(data.current) }}
|
||||
{% elif key == "time_to_report" %}
|
||||
{{ "%.1f"|format(data.current) }}{{ unit }}
|
||||
{% else %}
|
||||
{{ data.current|int }}
|
||||
{% endif %}
|
||||
{% else %}
|
||||
--
|
||||
{% endif %}
|
||||
</div>
|
||||
<div class="stat-label">{{ label }}</div>
|
||||
<div class="stat-label" style="font-size: 0.7rem; color: {{ color }}">
|
||||
{{ phase_labels.get(phase, phase) }}
|
||||
</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</div>
|
||||
302
src/infrastructure/claude_quota.py
Normal file
302
src/infrastructure/claude_quota.py
Normal file
@@ -0,0 +1,302 @@
|
||||
"""Claude API quota tracker and metabolic mode advisor.
|
||||
|
||||
Tracks Claude API usage (tokens, cost, calls) in a local SQLite database.
|
||||
Provides a metabolic mode recommendation (BURST / ACTIVE / RESTING) based on
|
||||
daily spend thresholds so the orchestrator can decide when to use cloud inference
|
||||
vs. local Ollama.
|
||||
|
||||
Metabolic protocol (from issue #1074):
|
||||
BURST — daily spend < burst_threshold → use Claude freely
|
||||
ACTIVE — daily spend < active_threshold → prefer Groq / cheap tier
|
||||
RESTING — daily spend >= active_threshold → local only, no API calls
|
||||
|
||||
Refs: #1074, #972
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import sqlite3
|
||||
from contextlib import closing
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, date, datetime
|
||||
from pathlib import Path
|
||||
from typing import Literal
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ── Cost table (USD per million tokens, approximate) ─────────────────────────
|
||||
_MODEL_COSTS: dict[str, dict[str, float]] = {
|
||||
# haiku aliases
|
||||
"haiku": {"input": 0.25, "output": 1.25},
|
||||
"claude-haiku-4-5": {"input": 0.25, "output": 1.25},
|
||||
"claude-haiku-4-5-20251001": {"input": 0.25, "output": 1.25},
|
||||
# sonnet aliases
|
||||
"sonnet": {"input": 3.00, "output": 15.00},
|
||||
"claude-sonnet-4-6": {"input": 3.00, "output": 15.00},
|
||||
# opus aliases
|
||||
"opus": {"input": 15.00, "output": 75.00},
|
||||
"claude-opus-4-6": {"input": 15.00, "output": 75.00},
|
||||
}
|
||||
_DEFAULT_COST = {"input": 3.00, "output": 15.00} # conservative default
|
||||
|
||||
MetabolicMode = Literal["BURST", "ACTIVE", "RESTING"]
|
||||
|
||||
DB_PATH = Path(settings.repo_root) / "data" / "claude_quota.db"
|
||||
|
||||
# Daily spend thresholds (USD) — tune via env or subclass Settings
|
||||
BURST_THRESHOLD: float = 1.00 # < $1/day → BURST mode, use Claude freely
|
||||
ACTIVE_THRESHOLD: float = 5.00 # < $5/day → ACTIVE mode, prefer cheaper tier
|
||||
|
||||
_SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS claude_calls (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
ts TEXT NOT NULL,
|
||||
model TEXT NOT NULL,
|
||||
input_tok INTEGER NOT NULL DEFAULT 0,
|
||||
output_tok INTEGER NOT NULL DEFAULT 0,
|
||||
cost_usd REAL NOT NULL DEFAULT 0.0,
|
||||
task_label TEXT DEFAULT '',
|
||||
metadata TEXT DEFAULT '{}'
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_cc_ts ON claude_calls(ts);
|
||||
CREATE INDEX IF NOT EXISTS idx_cc_model ON claude_calls(model);
|
||||
"""
|
||||
|
||||
|
||||
@dataclass
|
||||
class ClaudeCall:
|
||||
"""Record of a single Claude API call."""
|
||||
|
||||
model: str
|
||||
input_tokens: int
|
||||
output_tokens: int
|
||||
task_label: str = ""
|
||||
ts: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
||||
metadata: dict = field(default_factory=dict)
|
||||
|
||||
@property
|
||||
def cost_usd(self) -> float:
|
||||
costs = _MODEL_COSTS.get(self.model, _DEFAULT_COST)
|
||||
return (
|
||||
self.input_tokens * costs["input"]
|
||||
+ self.output_tokens * costs["output"]
|
||||
) / 1_000_000
|
||||
|
||||
|
||||
@dataclass
|
||||
class QuotaSummary:
|
||||
"""Aggregated quota status for a time window."""
|
||||
|
||||
period: str # "today" | "month"
|
||||
calls: int
|
||||
input_tokens: int
|
||||
output_tokens: int
|
||||
cost_usd: float
|
||||
mode: MetabolicMode
|
||||
burst_threshold: float
|
||||
active_threshold: float
|
||||
|
||||
def as_dict(self) -> dict:
|
||||
return {
|
||||
"period": self.period,
|
||||
"calls": self.calls,
|
||||
"input_tokens": self.input_tokens,
|
||||
"output_tokens": self.output_tokens,
|
||||
"cost_usd": round(self.cost_usd, 4),
|
||||
"mode": self.mode,
|
||||
"burst_threshold": self.burst_threshold,
|
||||
"active_threshold": self.active_threshold,
|
||||
}
|
||||
|
||||
|
||||
def _mode_for_cost(daily_cost: float) -> MetabolicMode:
|
||||
if daily_cost < BURST_THRESHOLD:
|
||||
return "BURST"
|
||||
if daily_cost < ACTIVE_THRESHOLD:
|
||||
return "ACTIVE"
|
||||
return "RESTING"
|
||||
|
||||
|
||||
class ClaudeQuotaStore:
|
||||
"""SQLite-backed store for Claude API usage tracking.
|
||||
|
||||
Thread-safe: creates a new connection per operation.
|
||||
"""
|
||||
|
||||
def __init__(self, db_path: Path | None = None) -> None:
|
||||
self._db_path = db_path or DB_PATH
|
||||
self._init_db()
|
||||
|
||||
def _init_db(self) -> None:
|
||||
try:
|
||||
self._db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with closing(sqlite3.connect(str(self._db_path))) as conn:
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
|
||||
conn.executescript(_SCHEMA)
|
||||
conn.commit()
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to initialize claude_quota DB: %s", exc)
|
||||
|
||||
def _connect(self) -> sqlite3.Connection:
|
||||
conn = sqlite3.connect(str(self._db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
|
||||
return conn
|
||||
|
||||
def record_call(self, call: ClaudeCall) -> None:
|
||||
"""Persist a completed Claude API call."""
|
||||
try:
|
||||
with closing(self._connect()) as conn:
|
||||
conn.execute(
|
||||
"INSERT INTO claude_calls "
|
||||
"(ts, model, input_tok, output_tok, cost_usd, task_label, metadata) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
call.ts,
|
||||
call.model,
|
||||
call.input_tokens,
|
||||
call.output_tokens,
|
||||
call.cost_usd,
|
||||
call.task_label,
|
||||
json.dumps(call.metadata),
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to record Claude call: %s", exc)
|
||||
|
||||
def _aggregate(self, where_clause: str, params: tuple) -> dict:
|
||||
"""Return aggregated stats for a WHERE clause."""
|
||||
try:
|
||||
with closing(self._connect()) as conn:
|
||||
row = conn.execute(
|
||||
f"SELECT COUNT(*) as calls, "
|
||||
f"COALESCE(SUM(input_tok),0) as input_tok, "
|
||||
f"COALESCE(SUM(output_tok),0) as output_tok, "
|
||||
f"COALESCE(SUM(cost_usd),0.0) as cost_usd "
|
||||
f"FROM claude_calls {where_clause}",
|
||||
params,
|
||||
).fetchone()
|
||||
if row:
|
||||
return dict(row)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to aggregate Claude quota: %s", exc)
|
||||
return {"calls": 0, "input_tok": 0, "output_tok": 0, "cost_usd": 0.0}
|
||||
|
||||
def today_summary(self) -> QuotaSummary:
|
||||
"""Return quota summary for today (UTC)."""
|
||||
today = date.today().isoformat()
|
||||
agg = self._aggregate("WHERE ts >= ?", (today,))
|
||||
return QuotaSummary(
|
||||
period="today",
|
||||
calls=agg["calls"],
|
||||
input_tokens=agg["input_tok"],
|
||||
output_tokens=agg["output_tok"],
|
||||
cost_usd=agg["cost_usd"],
|
||||
mode=_mode_for_cost(agg["cost_usd"]),
|
||||
burst_threshold=BURST_THRESHOLD,
|
||||
active_threshold=ACTIVE_THRESHOLD,
|
||||
)
|
||||
|
||||
def month_summary(self) -> QuotaSummary:
|
||||
"""Return quota summary for the current calendar month (UTC)."""
|
||||
month_prefix = date.today().strftime("%Y-%m")
|
||||
agg = self._aggregate("WHERE ts >= ?", (month_prefix,))
|
||||
return QuotaSummary(
|
||||
period="month",
|
||||
calls=agg["calls"],
|
||||
input_tokens=agg["input_tok"],
|
||||
output_tokens=agg["output_tok"],
|
||||
cost_usd=agg["cost_usd"],
|
||||
mode=_mode_for_cost(agg["cost_usd"] / 30), # amortised daily
|
||||
burst_threshold=BURST_THRESHOLD,
|
||||
active_threshold=ACTIVE_THRESHOLD,
|
||||
)
|
||||
|
||||
def current_mode(self) -> MetabolicMode:
|
||||
"""Return the current metabolic mode based on today's spend."""
|
||||
return self.today_summary().mode
|
||||
|
||||
|
||||
# ── Module-level singleton ────────────────────────────────────────────────────
|
||||
_store: ClaudeQuotaStore | None = None
|
||||
|
||||
|
||||
def get_quota_store() -> ClaudeQuotaStore:
|
||||
"""Return the module-level quota store, creating it on first access."""
|
||||
global _store
|
||||
if _store is None:
|
||||
_store = ClaudeQuotaStore()
|
||||
return _store
|
||||
|
||||
|
||||
def record_usage(
|
||||
model: str,
|
||||
input_tokens: int,
|
||||
output_tokens: int,
|
||||
task_label: str = "",
|
||||
metadata: dict | None = None,
|
||||
) -> None:
|
||||
"""Convenience function to record a Claude API call.
|
||||
|
||||
Silently degrades if the quota DB is unavailable.
|
||||
"""
|
||||
call = ClaudeCall(
|
||||
model=model,
|
||||
input_tokens=input_tokens,
|
||||
output_tokens=output_tokens,
|
||||
task_label=task_label,
|
||||
metadata=metadata or {},
|
||||
)
|
||||
get_quota_store().record_call(call)
|
||||
logger.debug(
|
||||
"Claude call recorded: model=%s in=%d out=%d cost=$%.4f",
|
||||
model,
|
||||
input_tokens,
|
||||
output_tokens,
|
||||
call.cost_usd,
|
||||
)
|
||||
|
||||
|
||||
def current_mode() -> MetabolicMode:
|
||||
"""Return the current metabolic mode.
|
||||
|
||||
BURST → Claude is cheap today, use freely.
|
||||
ACTIVE → Approaching daily budget, prefer Groq / cheaper tier.
|
||||
RESTING → Daily limit reached, use local Ollama only.
|
||||
"""
|
||||
try:
|
||||
return get_quota_store().current_mode()
|
||||
except Exception as exc:
|
||||
logger.warning("Quota mode check failed, defaulting to BURST: %s", exc)
|
||||
return "BURST"
|
||||
|
||||
|
||||
def quota_report() -> str:
|
||||
"""Return a human-readable quota report for CLI / dashboard display."""
|
||||
try:
|
||||
store = get_quota_store()
|
||||
today = store.today_summary()
|
||||
month = store.month_summary()
|
||||
|
||||
lines = [
|
||||
"═══════════════════════════════════════",
|
||||
" Claude API Quota — Metabolic Report ",
|
||||
"═══════════════════════════════════════",
|
||||
f" Today {today.calls:>6} calls "
|
||||
f"${today.cost_usd:>7.4f} [{today.mode}]",
|
||||
f" This month {month.calls:>5} calls "
|
||||
f"${month.cost_usd:>7.4f}",
|
||||
"───────────────────────────────────────",
|
||||
f" BURST threshold : ${today.burst_threshold:.2f}/day",
|
||||
f" ACTIVE threshold : ${today.active_threshold:.2f}/day",
|
||||
"───────────────────────────────────────",
|
||||
f" Current mode : {today.mode}",
|
||||
"═══════════════════════════════════════",
|
||||
]
|
||||
return "\n".join(lines)
|
||||
except Exception as exc:
|
||||
return f"Quota report unavailable: {exc}"
|
||||
7
src/infrastructure/guards/__init__.py
Normal file
7
src/infrastructure/guards/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
"""Content moderation pipeline for AI narrator output.
|
||||
|
||||
Three-layer defense:
|
||||
1. Game-context system prompts (vocabulary whitelists, theme framing)
|
||||
2. Real-time output filter via Llama Guard (or fallback regex)
|
||||
3. Per-game moderation profiles with configurable thresholds
|
||||
"""
|
||||
497
src/infrastructure/guards/moderation.py
Normal file
497
src/infrastructure/guards/moderation.py
Normal file
@@ -0,0 +1,497 @@
|
||||
"""Content moderation pipeline for AI narrator output.
|
||||
|
||||
Three-layer defense against harmful LLM output:
|
||||
|
||||
Layer 1 — Game-context system prompts with per-game vocabulary whitelists.
|
||||
Layer 2 — Real-time output filter (Llama Guard via Ollama, regex fallback).
|
||||
Layer 3 — Per-game moderation profiles with configurable thresholds.
|
||||
|
||||
Usage:
|
||||
from infrastructure.guards.moderation import get_moderator
|
||||
|
||||
moderator = get_moderator()
|
||||
result = await moderator.check("Some narrator text", game="morrowind")
|
||||
if result.blocked:
|
||||
use_fallback_narration(result.fallback)
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ModerationVerdict(Enum):
|
||||
"""Result of a moderation check."""
|
||||
|
||||
PASS = "pass" # noqa: S105
|
||||
FAIL = "fail"
|
||||
ERROR = "error"
|
||||
|
||||
|
||||
class ViolationCategory(Enum):
|
||||
"""Categories of content violations."""
|
||||
|
||||
HATE_SPEECH = "hate_speech"
|
||||
VIOLENCE_GLORIFICATION = "violence_glorification"
|
||||
REAL_WORLD_HARM = "real_world_harm"
|
||||
SEXUAL_CONTENT = "sexual_content"
|
||||
SELF_HARM = "self_harm"
|
||||
NONE = "none"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModerationResult:
|
||||
"""Result from the moderation pipeline."""
|
||||
|
||||
verdict: ModerationVerdict
|
||||
blocked: bool
|
||||
category: ViolationCategory = ViolationCategory.NONE
|
||||
confidence: float = 0.0
|
||||
latency_ms: float = 0.0
|
||||
layer: str = "" # Which layer caught it
|
||||
fallback: str = "" # Contextual fallback narration
|
||||
reason: str = ""
|
||||
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
||||
|
||||
@property
|
||||
def passed(self) -> bool:
|
||||
return self.verdict == ModerationVerdict.PASS
|
||||
|
||||
|
||||
@dataclass
|
||||
class GameProfile:
|
||||
"""Per-game moderation profile.
|
||||
|
||||
Defines vocabulary whitelists, theme framing rules,
|
||||
and threshold tuning for a specific game.
|
||||
"""
|
||||
|
||||
game_id: str
|
||||
display_name: str
|
||||
# Terms that are safe in game context (e.g., "Skooma", "slave", "Morag Tong")
|
||||
vocabulary_whitelist: list[str] = field(default_factory=list)
|
||||
# System prompt additions for game-context framing
|
||||
context_prompt: str = ""
|
||||
# Confidence threshold — below this, content passes even if flagged
|
||||
threshold: float = 0.8
|
||||
# Scene-type fallback narrations
|
||||
fallbacks: dict[str, str] = field(default_factory=dict)
|
||||
|
||||
|
||||
# ── Regex fallback patterns ─────────────────────────────────────────────────
|
||||
# Used when Llama Guard is unavailable. Catches obvious violations only.
|
||||
# Intentionally conservative — false negatives preferred over false positives.
|
||||
_FALLBACK_PATTERNS: list[tuple[re.Pattern, ViolationCategory]] = [
|
||||
(
|
||||
re.compile(
|
||||
r"\b(kill\s+(?:all|every)\s+(?:real|actual)\s+(?:people|humans|children))\b",
|
||||
re.IGNORECASE,
|
||||
),
|
||||
ViolationCategory.REAL_WORLD_HARM,
|
||||
),
|
||||
(
|
||||
re.compile(
|
||||
r"\b((?:you|we)\s+should\s+(?:actually|really)\s+(?:harm|hurt|kill))\b",
|
||||
re.IGNORECASE,
|
||||
),
|
||||
ViolationCategory.REAL_WORLD_HARM,
|
||||
),
|
||||
(
|
||||
re.compile(
|
||||
r"\b((?:in\s+real\s+life|irl)\s+(?:you|we)\s+(?:should|could|must)\s+"
|
||||
r"(?:attack|hurt|kill|harm))\b",
|
||||
re.IGNORECASE,
|
||||
),
|
||||
ViolationCategory.REAL_WORLD_HARM,
|
||||
),
|
||||
]
|
||||
|
||||
# ── Default game profiles ───────────────────────────────────────────────────
|
||||
|
||||
_DEFAULT_PROFILES: dict[str, GameProfile] = {
|
||||
"morrowind": GameProfile(
|
||||
game_id="morrowind",
|
||||
display_name="The Elder Scrolls III: Morrowind",
|
||||
vocabulary_whitelist=[
|
||||
"Skooma",
|
||||
"Moon Sugar",
|
||||
"slave",
|
||||
"slavery",
|
||||
"Morag Tong",
|
||||
"Dark Brotherhood",
|
||||
"Telvanni",
|
||||
"Camonna Tong",
|
||||
"smuggler",
|
||||
"assassin",
|
||||
"Sixth House",
|
||||
"Corprus",
|
||||
"Dagoth Ur",
|
||||
"Nerevarine",
|
||||
],
|
||||
context_prompt=(
|
||||
"You are narrating gameplay of The Elder Scrolls III: Morrowind. "
|
||||
"Morrowind contains mature themes including slavery, drug use (Skooma/Moon Sugar), "
|
||||
"assassin guilds (Morag Tong, Dark Brotherhood), and political intrigue. "
|
||||
"Treat these as game mechanics and historical worldbuilding within the game's "
|
||||
"fictional universe. Never editorialize on real-world parallels. "
|
||||
"Narrate events neutrally as a game commentator would."
|
||||
),
|
||||
threshold=0.85,
|
||||
fallbacks={
|
||||
"combat": "The battle rages on in the ashlands of Vvardenfell.",
|
||||
"dialogue": "The conversation continues between the characters.",
|
||||
"exploration": "The Nerevarine presses onward through the landscape.",
|
||||
"default": "The adventure continues in Morrowind.",
|
||||
},
|
||||
),
|
||||
"default": GameProfile(
|
||||
game_id="default",
|
||||
display_name="Generic Game",
|
||||
vocabulary_whitelist=[],
|
||||
context_prompt=(
|
||||
"You are narrating gameplay. Describe in-game events as a neutral "
|
||||
"game commentator. Never reference real-world violence, politics, "
|
||||
"or controversial topics. Stay focused on game mechanics and story."
|
||||
),
|
||||
threshold=0.8,
|
||||
fallbacks={
|
||||
"combat": "The action continues on screen.",
|
||||
"dialogue": "The conversation unfolds between characters.",
|
||||
"exploration": "The player explores the game world.",
|
||||
"default": "The gameplay continues.",
|
||||
},
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
class ContentModerator:
|
||||
"""Three-layer content moderation pipeline.
|
||||
|
||||
Layer 1: Game-context system prompts with vocabulary whitelists.
|
||||
Layer 2: LLM-based moderation (Llama Guard via Ollama, with regex fallback).
|
||||
Layer 3: Per-game threshold tuning and profile-based filtering.
|
||||
|
||||
Follows graceful degradation — if Llama Guard is unavailable,
|
||||
falls back to regex patterns. Never crashes.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
profiles: dict[str, GameProfile] | None = None,
|
||||
guard_model: str | None = None,
|
||||
) -> None:
|
||||
self._profiles: dict[str, GameProfile] = profiles or dict(_DEFAULT_PROFILES)
|
||||
self._guard_model = guard_model or settings.moderation_guard_model
|
||||
self._guard_available: bool | None = None # Lazy-checked
|
||||
self._metrics = _ModerationMetrics()
|
||||
|
||||
def get_profile(self, game: str) -> GameProfile:
|
||||
"""Get the moderation profile for a game, falling back to default."""
|
||||
return self._profiles.get(game, self._profiles["default"])
|
||||
|
||||
def register_profile(self, profile: GameProfile) -> None:
|
||||
"""Register or update a game moderation profile."""
|
||||
self._profiles[profile.game_id] = profile
|
||||
logger.info("Registered moderation profile: %s", profile.game_id)
|
||||
|
||||
def get_context_prompt(self, game: str) -> str:
|
||||
"""Get the game-context system prompt (Layer 1).
|
||||
|
||||
Returns the context prompt for the given game, which should be
|
||||
prepended to the narrator's system prompt.
|
||||
"""
|
||||
profile = self.get_profile(game)
|
||||
return profile.context_prompt
|
||||
|
||||
async def check(
|
||||
self,
|
||||
text: str,
|
||||
game: str = "default",
|
||||
scene_type: str = "default",
|
||||
) -> ModerationResult:
|
||||
"""Run the full moderation pipeline on narrator output.
|
||||
|
||||
Args:
|
||||
text: The text to moderate (narrator output).
|
||||
game: Game identifier for profile selection.
|
||||
scene_type: Current scene type for fallback selection.
|
||||
|
||||
Returns:
|
||||
ModerationResult with verdict, confidence, and fallback.
|
||||
"""
|
||||
start = time.monotonic()
|
||||
profile = self.get_profile(game)
|
||||
|
||||
# Layer 1: Vocabulary whitelist pre-processing
|
||||
cleaned_text = self._apply_whitelist(text, profile)
|
||||
|
||||
# Layer 2: LLM guard or regex fallback
|
||||
result = await self._run_guard(cleaned_text, profile)
|
||||
|
||||
# Layer 3: Threshold tuning
|
||||
if result.verdict == ModerationVerdict.FAIL and result.confidence < profile.threshold:
|
||||
logger.info(
|
||||
"Moderation flag below threshold (%.2f < %.2f) — allowing",
|
||||
result.confidence,
|
||||
profile.threshold,
|
||||
)
|
||||
result = ModerationResult(
|
||||
verdict=ModerationVerdict.PASS,
|
||||
blocked=False,
|
||||
confidence=result.confidence,
|
||||
layer="threshold",
|
||||
reason=f"Below threshold ({result.confidence:.2f} < {profile.threshold:.2f})",
|
||||
)
|
||||
|
||||
# Attach fallback narration if blocked
|
||||
if result.blocked:
|
||||
result.fallback = profile.fallbacks.get(
|
||||
scene_type, profile.fallbacks.get("default", "")
|
||||
)
|
||||
|
||||
result.latency_ms = (time.monotonic() - start) * 1000
|
||||
self._metrics.record(result)
|
||||
|
||||
if result.blocked:
|
||||
logger.warning(
|
||||
"Content blocked [%s/%s]: category=%s confidence=%.2f reason=%s",
|
||||
game,
|
||||
scene_type,
|
||||
result.category.value,
|
||||
result.confidence,
|
||||
result.reason,
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
def _apply_whitelist(self, text: str, profile: GameProfile) -> str:
|
||||
"""Layer 1: Replace whitelisted game terms with placeholders.
|
||||
|
||||
This prevents the guard model from flagging in-game terminology
|
||||
(e.g., "Skooma" being flagged as drug reference).
|
||||
"""
|
||||
cleaned = text
|
||||
for term in profile.vocabulary_whitelist:
|
||||
# Case-insensitive replacement with a neutral placeholder
|
||||
pattern = re.compile(re.escape(term), re.IGNORECASE)
|
||||
cleaned = pattern.sub("[GAME_TERM]", cleaned)
|
||||
return cleaned
|
||||
|
||||
async def _run_guard(self, text: str, profile: GameProfile) -> ModerationResult:
|
||||
"""Layer 2: Run LLM guard model or fall back to regex."""
|
||||
if not settings.moderation_enabled:
|
||||
return ModerationResult(
|
||||
verdict=ModerationVerdict.PASS,
|
||||
blocked=False,
|
||||
layer="disabled",
|
||||
reason="Moderation disabled",
|
||||
)
|
||||
|
||||
# Try Llama Guard via Ollama
|
||||
if await self._is_guard_available():
|
||||
try:
|
||||
return await self._check_with_guard(text)
|
||||
except Exception as exc:
|
||||
logger.warning("Guard model failed, using regex fallback: %s", exc)
|
||||
self._guard_available = False
|
||||
|
||||
# Regex fallback
|
||||
return self._check_with_regex(text)
|
||||
|
||||
async def _is_guard_available(self) -> bool:
|
||||
"""Check if the guard model is available via Ollama."""
|
||||
if self._guard_available is not None:
|
||||
return self._guard_available
|
||||
|
||||
try:
|
||||
import aiohttp
|
||||
|
||||
url = f"{settings.normalized_ollama_url}/api/tags"
|
||||
timeout = aiohttp.ClientTimeout(total=5)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
async with session.get(url) as resp:
|
||||
if resp.status != 200:
|
||||
self._guard_available = False
|
||||
return False
|
||||
data = await resp.json()
|
||||
models = [m.get("name", "") for m in data.get("models", [])]
|
||||
self._guard_available = any(
|
||||
self._guard_model in m or m.startswith(self._guard_model) for m in models
|
||||
)
|
||||
if not self._guard_available:
|
||||
logger.info(
|
||||
"Guard model '%s' not found in Ollama — using regex fallback",
|
||||
self._guard_model,
|
||||
)
|
||||
return self._guard_available
|
||||
except Exception as exc:
|
||||
logger.debug("Ollama guard check failed: %s", exc)
|
||||
self._guard_available = False
|
||||
return False
|
||||
|
||||
async def _check_with_guard(self, text: str) -> ModerationResult:
|
||||
"""Run moderation check via Llama Guard."""
|
||||
import aiohttp
|
||||
|
||||
url = f"{settings.normalized_ollama_url}/api/chat"
|
||||
payload = {
|
||||
"model": self._guard_model,
|
||||
"messages": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": text,
|
||||
}
|
||||
],
|
||||
"stream": False,
|
||||
"options": {"temperature": 0.0},
|
||||
}
|
||||
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
async with session.post(url, json=payload) as resp:
|
||||
if resp.status != 200:
|
||||
raise RuntimeError(f"Guard API error: {resp.status}")
|
||||
data = await resp.json()
|
||||
|
||||
response_text = data.get("message", {}).get("content", "").strip().lower()
|
||||
|
||||
# Llama Guard returns "safe" or "unsafe\n<category>"
|
||||
if response_text.startswith("safe"):
|
||||
return ModerationResult(
|
||||
verdict=ModerationVerdict.PASS,
|
||||
blocked=False,
|
||||
confidence=0.0,
|
||||
layer="llama_guard",
|
||||
reason="Content safe",
|
||||
)
|
||||
|
||||
# Parse unsafe response
|
||||
category = ViolationCategory.NONE
|
||||
confidence = 0.95 # High confidence from LLM guard
|
||||
lines = response_text.split("\n")
|
||||
if len(lines) > 1:
|
||||
cat_str = lines[1].strip()
|
||||
category = _parse_guard_category(cat_str)
|
||||
|
||||
return ModerationResult(
|
||||
verdict=ModerationVerdict.FAIL,
|
||||
blocked=True,
|
||||
category=category,
|
||||
confidence=confidence,
|
||||
layer="llama_guard",
|
||||
reason=f"Guard flagged: {response_text}",
|
||||
)
|
||||
|
||||
def _check_with_regex(self, text: str) -> ModerationResult:
|
||||
"""Regex fallback when guard model is unavailable.
|
||||
|
||||
Intentionally conservative — only catches obvious real-world harm.
|
||||
"""
|
||||
for pattern, category in _FALLBACK_PATTERNS:
|
||||
match = pattern.search(text)
|
||||
if match:
|
||||
return ModerationResult(
|
||||
verdict=ModerationVerdict.FAIL,
|
||||
blocked=True,
|
||||
category=category,
|
||||
confidence=0.95, # Regex patterns are high-signal
|
||||
layer="regex_fallback",
|
||||
reason=f"Regex match: {match.group(0)[:50]}",
|
||||
)
|
||||
|
||||
return ModerationResult(
|
||||
verdict=ModerationVerdict.PASS,
|
||||
blocked=False,
|
||||
layer="regex_fallback",
|
||||
reason="No regex matches",
|
||||
)
|
||||
|
||||
def get_metrics(self) -> dict[str, Any]:
|
||||
"""Get moderation pipeline metrics."""
|
||||
return self._metrics.to_dict()
|
||||
|
||||
def reset_guard_cache(self) -> None:
|
||||
"""Reset the guard availability cache (e.g., after pulling model)."""
|
||||
self._guard_available = None
|
||||
|
||||
|
||||
class _ModerationMetrics:
|
||||
"""Tracks moderation pipeline performance."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.total_checks: int = 0
|
||||
self.passed: int = 0
|
||||
self.blocked: int = 0
|
||||
self.errors: int = 0
|
||||
self.total_latency_ms: float = 0.0
|
||||
self.by_layer: dict[str, int] = {}
|
||||
self.by_category: dict[str, int] = {}
|
||||
|
||||
def record(self, result: ModerationResult) -> None:
|
||||
self.total_checks += 1
|
||||
self.total_latency_ms += result.latency_ms
|
||||
|
||||
if result.verdict == ModerationVerdict.PASS:
|
||||
self.passed += 1
|
||||
elif result.verdict == ModerationVerdict.FAIL:
|
||||
self.blocked += 1
|
||||
else:
|
||||
self.errors += 1
|
||||
|
||||
layer = result.layer or "unknown"
|
||||
self.by_layer[layer] = self.by_layer.get(layer, 0) + 1
|
||||
|
||||
if result.blocked:
|
||||
cat = result.category.value
|
||||
self.by_category[cat] = self.by_category.get(cat, 0) + 1
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"total_checks": self.total_checks,
|
||||
"passed": self.passed,
|
||||
"blocked": self.blocked,
|
||||
"errors": self.errors,
|
||||
"avg_latency_ms": (
|
||||
round(self.total_latency_ms / self.total_checks, 2)
|
||||
if self.total_checks > 0
|
||||
else 0.0
|
||||
),
|
||||
"by_layer": dict(self.by_layer),
|
||||
"by_category": dict(self.by_category),
|
||||
}
|
||||
|
||||
|
||||
def _parse_guard_category(cat_str: str) -> ViolationCategory:
|
||||
"""Parse Llama Guard category string to ViolationCategory."""
|
||||
cat_lower = cat_str.lower()
|
||||
if "hate" in cat_lower:
|
||||
return ViolationCategory.HATE_SPEECH
|
||||
if "violence" in cat_lower:
|
||||
return ViolationCategory.VIOLENCE_GLORIFICATION
|
||||
if "sexual" in cat_lower:
|
||||
return ViolationCategory.SEXUAL_CONTENT
|
||||
if "self-harm" in cat_lower or "self_harm" in cat_lower or "suicide" in cat_lower:
|
||||
return ViolationCategory.SELF_HARM
|
||||
if "harm" in cat_lower or "dangerous" in cat_lower:
|
||||
return ViolationCategory.REAL_WORLD_HARM
|
||||
return ViolationCategory.NONE
|
||||
|
||||
|
||||
# ── Module-level singleton ──────────────────────────────────────────────────
|
||||
_moderator: ContentModerator | None = None
|
||||
|
||||
|
||||
def get_moderator() -> ContentModerator:
|
||||
"""Get or create the content moderator singleton."""
|
||||
global _moderator
|
||||
if _moderator is None:
|
||||
_moderator = ContentModerator()
|
||||
return _moderator
|
||||
56
src/infrastructure/guards/profiles.py
Normal file
56
src/infrastructure/guards/profiles.py
Normal file
@@ -0,0 +1,56 @@
|
||||
"""Load game moderation profiles from config/moderation.yaml.
|
||||
|
||||
Falls back to hardcoded defaults if the YAML file is missing or malformed.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from infrastructure.guards.moderation import GameProfile
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def load_profiles(config_path: Path | None = None) -> dict[str, GameProfile]:
|
||||
"""Load game moderation profiles from YAML config.
|
||||
|
||||
Args:
|
||||
config_path: Path to moderation.yaml. Defaults to config/moderation.yaml.
|
||||
|
||||
Returns:
|
||||
Dict mapping game_id to GameProfile.
|
||||
"""
|
||||
path = config_path or Path("config/moderation.yaml")
|
||||
|
||||
if not path.exists():
|
||||
logger.info("Moderation config not found at %s — using defaults", path)
|
||||
return {}
|
||||
|
||||
try:
|
||||
import yaml
|
||||
except ImportError:
|
||||
logger.warning("PyYAML not installed — using default moderation profiles")
|
||||
return {}
|
||||
|
||||
try:
|
||||
data = yaml.safe_load(path.read_text())
|
||||
except Exception as exc:
|
||||
logger.error("Failed to parse moderation config: %s", exc)
|
||||
return {}
|
||||
|
||||
profiles: dict[str, GameProfile] = {}
|
||||
for game_id, profile_data in data.get("profiles", {}).items():
|
||||
try:
|
||||
profiles[game_id] = GameProfile(
|
||||
game_id=game_id,
|
||||
display_name=profile_data.get("display_name", game_id),
|
||||
vocabulary_whitelist=profile_data.get("vocabulary_whitelist", []),
|
||||
context_prompt=profile_data.get("context_prompt", ""),
|
||||
threshold=float(profile_data.get("threshold", 0.8)),
|
||||
fallbacks=profile_data.get("fallbacks", {}),
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Invalid profile '%s': %s", game_id, exc)
|
||||
|
||||
logger.info("Loaded %d moderation profiles from %s", len(profiles), path)
|
||||
return profiles
|
||||
306
src/infrastructure/sovereignty_metrics.py
Normal file
306
src/infrastructure/sovereignty_metrics.py
Normal file
@@ -0,0 +1,306 @@
|
||||
"""Sovereignty metrics collector and store.
|
||||
|
||||
Tracks research sovereignty progress: cache hit rate, API cost,
|
||||
time-to-report, and human involvement. Persists to SQLite for
|
||||
trend analysis and dashboard display.
|
||||
|
||||
Refs: #981
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import sqlite3
|
||||
from contextlib import closing
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DB_PATH = Path(settings.repo_root) / "data" / "sovereignty_metrics.db"
|
||||
|
||||
_SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS sovereignty_metrics (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp TEXT NOT NULL,
|
||||
metric_type TEXT NOT NULL,
|
||||
value REAL NOT NULL,
|
||||
metadata TEXT DEFAULT '{}'
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_sm_type ON sovereignty_metrics(metric_type);
|
||||
CREATE INDEX IF NOT EXISTS idx_sm_ts ON sovereignty_metrics(timestamp);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS sovereignty_alerts (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp TEXT NOT NULL,
|
||||
alert_type TEXT NOT NULL,
|
||||
message TEXT NOT NULL,
|
||||
value REAL NOT NULL,
|
||||
threshold REAL NOT NULL,
|
||||
acknowledged INTEGER DEFAULT 0
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_sa_ts ON sovereignty_alerts(timestamp);
|
||||
CREATE INDEX IF NOT EXISTS idx_sa_ack ON sovereignty_alerts(acknowledged);
|
||||
"""
|
||||
|
||||
|
||||
@dataclass
|
||||
class SovereigntyMetric:
|
||||
"""A single sovereignty metric data point."""
|
||||
|
||||
metric_type: str # cache_hit_rate, api_cost, time_to_report, human_involvement
|
||||
value: float
|
||||
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
||||
metadata: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SovereigntyAlert:
|
||||
"""An alert triggered when a metric exceeds a threshold."""
|
||||
|
||||
alert_type: str
|
||||
message: str
|
||||
value: float
|
||||
threshold: float
|
||||
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
||||
acknowledged: bool = False
|
||||
|
||||
|
||||
# Graduation targets from issue #981
|
||||
GRADUATION_TARGETS = {
|
||||
"cache_hit_rate": {"week1": 0.10, "month1": 0.40, "month3": 0.80, "graduation": 0.90},
|
||||
"api_cost": {"week1": 1.50, "month1": 0.50, "month3": 0.10, "graduation": 0.01},
|
||||
"time_to_report": {"week1": 180.0, "month1": 30.0, "month3": 5.0, "graduation": 1.0},
|
||||
"human_involvement": {"week1": 1.0, "month1": 0.5, "month3": 0.25, "graduation": 0.0},
|
||||
"local_artifacts": {"week1": 6, "month1": 30, "month3": 100, "graduation": 500},
|
||||
}
|
||||
|
||||
|
||||
class SovereigntyMetricsStore:
|
||||
"""SQLite-backed sovereignty metrics store.
|
||||
|
||||
Thread-safe: creates a new connection per operation.
|
||||
"""
|
||||
|
||||
def __init__(self, db_path: Path | None = None) -> None:
|
||||
self._db_path = db_path or DB_PATH
|
||||
self._init_db()
|
||||
|
||||
def _init_db(self) -> None:
|
||||
"""Initialize the database schema."""
|
||||
try:
|
||||
self._db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with closing(sqlite3.connect(str(self._db_path))) as conn:
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
|
||||
conn.executescript(_SCHEMA)
|
||||
conn.commit()
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to initialize sovereignty metrics DB: %s", exc)
|
||||
|
||||
def _connect(self) -> sqlite3.Connection:
|
||||
"""Get a new connection."""
|
||||
conn = sqlite3.connect(str(self._db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
|
||||
return conn
|
||||
|
||||
def record(self, metric: SovereigntyMetric) -> None:
|
||||
"""Record a sovereignty metric data point."""
|
||||
try:
|
||||
with closing(self._connect()) as conn:
|
||||
conn.execute(
|
||||
"INSERT INTO sovereignty_metrics (timestamp, metric_type, value, metadata) "
|
||||
"VALUES (?, ?, ?, ?)",
|
||||
(
|
||||
metric.timestamp,
|
||||
metric.metric_type,
|
||||
metric.value,
|
||||
json.dumps(metric.metadata),
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to record sovereignty metric: %s", exc)
|
||||
|
||||
# Check thresholds for alerts
|
||||
self._check_alert(metric)
|
||||
|
||||
def _check_alert(self, metric: SovereigntyMetric) -> None:
|
||||
"""Check if a metric triggers an alert."""
|
||||
threshold = settings.sovereignty_api_cost_alert_threshold
|
||||
if metric.metric_type == "api_cost" and metric.value > threshold:
|
||||
alert = SovereigntyAlert(
|
||||
alert_type="api_cost_exceeded",
|
||||
message=f"API cost ${metric.value:.2f} exceeds threshold ${threshold:.2f}",
|
||||
value=metric.value,
|
||||
threshold=threshold,
|
||||
)
|
||||
self._record_alert(alert)
|
||||
|
||||
def _record_alert(self, alert: SovereigntyAlert) -> None:
|
||||
"""Persist an alert."""
|
||||
try:
|
||||
with closing(self._connect()) as conn:
|
||||
conn.execute(
|
||||
"INSERT INTO sovereignty_alerts "
|
||||
"(timestamp, alert_type, message, value, threshold) "
|
||||
"VALUES (?, ?, ?, ?, ?)",
|
||||
(
|
||||
alert.timestamp,
|
||||
alert.alert_type,
|
||||
alert.message,
|
||||
alert.value,
|
||||
alert.threshold,
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
logger.warning("Sovereignty alert: %s", alert.message)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to record sovereignty alert: %s", exc)
|
||||
|
||||
def get_latest(self, metric_type: str, limit: int = 50) -> list[dict]:
|
||||
"""Get the most recent metric values for a given type."""
|
||||
try:
|
||||
with closing(self._connect()) as conn:
|
||||
rows = conn.execute(
|
||||
"SELECT timestamp, value, metadata FROM sovereignty_metrics "
|
||||
"WHERE metric_type = ? ORDER BY timestamp DESC LIMIT ?",
|
||||
(metric_type, limit),
|
||||
).fetchall()
|
||||
return [
|
||||
{
|
||||
"timestamp": row["timestamp"],
|
||||
"value": row["value"],
|
||||
"metadata": json.loads(row["metadata"]) if row["metadata"] else {},
|
||||
}
|
||||
for row in rows
|
||||
]
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to query sovereignty metrics: %s", exc)
|
||||
return []
|
||||
|
||||
def get_summary(self) -> dict[str, Any]:
|
||||
"""Get a summary of current sovereignty metrics progress."""
|
||||
summary: dict[str, Any] = {}
|
||||
for metric_type in GRADUATION_TARGETS:
|
||||
latest = self.get_latest(metric_type, limit=1)
|
||||
history = self.get_latest(metric_type, limit=30)
|
||||
|
||||
current_value = latest[0]["value"] if latest else None
|
||||
targets = GRADUATION_TARGETS[metric_type]
|
||||
|
||||
# Determine current phase based on value
|
||||
phase = "pre-start"
|
||||
if current_value is not None:
|
||||
if metric_type in ("api_cost", "time_to_report", "human_involvement"):
|
||||
# Lower is better
|
||||
if current_value <= targets["graduation"]:
|
||||
phase = "graduated"
|
||||
elif current_value <= targets["month3"]:
|
||||
phase = "month3"
|
||||
elif current_value <= targets["month1"]:
|
||||
phase = "month1"
|
||||
elif current_value <= targets["week1"]:
|
||||
phase = "week1"
|
||||
else:
|
||||
phase = "pre-start"
|
||||
else:
|
||||
# Higher is better
|
||||
if current_value >= targets["graduation"]:
|
||||
phase = "graduated"
|
||||
elif current_value >= targets["month3"]:
|
||||
phase = "month3"
|
||||
elif current_value >= targets["month1"]:
|
||||
phase = "month1"
|
||||
elif current_value >= targets["week1"]:
|
||||
phase = "week1"
|
||||
else:
|
||||
phase = "pre-start"
|
||||
|
||||
summary[metric_type] = {
|
||||
"current": current_value,
|
||||
"phase": phase,
|
||||
"targets": targets,
|
||||
"trend": [{"t": h["timestamp"], "v": h["value"]} for h in reversed(history)],
|
||||
}
|
||||
|
||||
return summary
|
||||
|
||||
def get_alerts(self, unacknowledged_only: bool = True, limit: int = 20) -> list[dict]:
|
||||
"""Get sovereignty alerts."""
|
||||
try:
|
||||
with closing(self._connect()) as conn:
|
||||
if unacknowledged_only:
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM sovereignty_alerts "
|
||||
"WHERE acknowledged = 0 ORDER BY timestamp DESC LIMIT ?",
|
||||
(limit,),
|
||||
).fetchall()
|
||||
else:
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM sovereignty_alerts ORDER BY timestamp DESC LIMIT ?",
|
||||
(limit,),
|
||||
).fetchall()
|
||||
return [dict(row) for row in rows]
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to query sovereignty alerts: %s", exc)
|
||||
return []
|
||||
|
||||
def acknowledge_alert(self, alert_id: int) -> bool:
|
||||
"""Acknowledge an alert."""
|
||||
try:
|
||||
with closing(self._connect()) as conn:
|
||||
conn.execute(
|
||||
"UPDATE sovereignty_alerts SET acknowledged = 1 WHERE id = ?",
|
||||
(alert_id,),
|
||||
)
|
||||
conn.commit()
|
||||
return True
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to acknowledge alert: %s", exc)
|
||||
return False
|
||||
|
||||
|
||||
# ── Module-level singleton ─────────────────────────────────────────────────
|
||||
_store: SovereigntyMetricsStore | None = None
|
||||
|
||||
|
||||
def get_sovereignty_store() -> SovereigntyMetricsStore:
|
||||
"""Return the module-level store, creating it on first access."""
|
||||
global _store
|
||||
if _store is None:
|
||||
_store = SovereigntyMetricsStore()
|
||||
return _store
|
||||
|
||||
|
||||
async def emit_sovereignty_metric(
|
||||
metric_type: str,
|
||||
value: float,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
) -> None:
|
||||
"""Convenience function to record a sovereignty metric and emit an event.
|
||||
|
||||
Also publishes to the event bus for real-time subscribers.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
from infrastructure.events.bus import emit
|
||||
|
||||
metric = SovereigntyMetric(
|
||||
metric_type=metric_type,
|
||||
value=value,
|
||||
metadata=metadata or {},
|
||||
)
|
||||
# Record to SQLite in thread to avoid blocking event loop
|
||||
await asyncio.to_thread(get_sovereignty_store().record, metric)
|
||||
|
||||
# Publish to event bus for real-time consumers
|
||||
await emit(
|
||||
f"sovereignty.metric.{metric_type}",
|
||||
source="sovereignty_metrics",
|
||||
data={"metric_type": metric_type, "value": value, **(metadata or {})},
|
||||
)
|
||||
17
src/infrastructure/world/benchmark/__init__.py
Normal file
17
src/infrastructure/world/benchmark/__init__.py
Normal file
@@ -0,0 +1,17 @@
|
||||
"""Performance regression suite for Morrowind agent scenarios.
|
||||
|
||||
Provides standardised benchmark scenarios, a runner that executes them
|
||||
through the heartbeat loop with a mock (or live) world adapter, and
|
||||
metrics collection for CI-integrated regression detection.
|
||||
"""
|
||||
|
||||
from infrastructure.world.benchmark.metrics import BenchmarkMetrics
|
||||
from infrastructure.world.benchmark.runner import BenchmarkRunner
|
||||
from infrastructure.world.benchmark.scenarios import BenchmarkScenario, load_scenarios
|
||||
|
||||
__all__ = [
|
||||
"BenchmarkMetrics",
|
||||
"BenchmarkRunner",
|
||||
"BenchmarkScenario",
|
||||
"load_scenarios",
|
||||
]
|
||||
195
src/infrastructure/world/benchmark/metrics.py
Normal file
195
src/infrastructure/world/benchmark/metrics.py
Normal file
@@ -0,0 +1,195 @@
|
||||
"""Benchmark metrics collection and persistence.
|
||||
|
||||
Tracks per-scenario results: cycles used, wall-clock time, success,
|
||||
LLM call count, and estimated metabolic cost. Results are persisted
|
||||
as JSONL for trend analysis and CI regression gates.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ScenarioResult:
|
||||
"""Outcome of running a single benchmark scenario.
|
||||
|
||||
Attributes:
|
||||
scenario_name: Human-readable scenario name.
|
||||
success: Whether the goal predicate was satisfied.
|
||||
cycles_used: Number of heartbeat cycles executed.
|
||||
max_cycles: The scenario's cycle budget.
|
||||
wall_time_ms: Total wall-clock time in milliseconds.
|
||||
llm_calls: Number of LLM inference calls made.
|
||||
metabolic_cost: Estimated resource cost (arbitrary unit, ≈ tokens).
|
||||
error: Error message if the run crashed.
|
||||
tags: Scenario tags (copied for filtering).
|
||||
"""
|
||||
|
||||
scenario_name: str
|
||||
success: bool = False
|
||||
cycles_used: int = 0
|
||||
max_cycles: int = 0
|
||||
wall_time_ms: int = 0
|
||||
llm_calls: int = 0
|
||||
metabolic_cost: float = 0.0
|
||||
error: str | None = None
|
||||
tags: list[str] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class BenchmarkMetrics:
|
||||
"""Aggregated metrics across all scenarios in a benchmark run.
|
||||
|
||||
Attributes:
|
||||
results: Per-scenario results.
|
||||
total_time_ms: Total wall-clock time for the full suite.
|
||||
timestamp: ISO-8601 timestamp of the run.
|
||||
commit_sha: Git commit SHA (if available).
|
||||
"""
|
||||
|
||||
results: list[ScenarioResult] = field(default_factory=list)
|
||||
total_time_ms: int = 0
|
||||
timestamp: str = ""
|
||||
commit_sha: str = ""
|
||||
|
||||
# -- derived properties ------------------------------------------------
|
||||
|
||||
@property
|
||||
def pass_count(self) -> int:
|
||||
return sum(1 for r in self.results if r.success)
|
||||
|
||||
@property
|
||||
def fail_count(self) -> int:
|
||||
return sum(1 for r in self.results if not r.success)
|
||||
|
||||
@property
|
||||
def success_rate(self) -> float:
|
||||
if not self.results:
|
||||
return 0.0
|
||||
return self.pass_count / len(self.results)
|
||||
|
||||
@property
|
||||
def total_llm_calls(self) -> int:
|
||||
return sum(r.llm_calls for r in self.results)
|
||||
|
||||
@property
|
||||
def total_metabolic_cost(self) -> float:
|
||||
return sum(r.metabolic_cost for r in self.results)
|
||||
|
||||
# -- persistence -------------------------------------------------------
|
||||
|
||||
def save(self, path: Path) -> None:
|
||||
"""Append this run's results to a JSONL file at *path*."""
|
||||
path = Path(path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
record = {
|
||||
"timestamp": self.timestamp,
|
||||
"commit_sha": self.commit_sha,
|
||||
"total_time_ms": self.total_time_ms,
|
||||
"success_rate": round(self.success_rate, 4),
|
||||
"total_llm_calls": self.total_llm_calls,
|
||||
"total_metabolic_cost": round(self.total_metabolic_cost, 2),
|
||||
"scenarios": [asdict(r) for r in self.results],
|
||||
}
|
||||
with path.open("a") as f:
|
||||
f.write(json.dumps(record) + "\n")
|
||||
logger.info("Benchmark results saved to %s", path)
|
||||
|
||||
# -- summary -----------------------------------------------------------
|
||||
|
||||
def summary(self) -> str:
|
||||
"""Return a human-readable summary of the benchmark run."""
|
||||
lines = [
|
||||
"=== Benchmark Summary ===",
|
||||
f"Scenarios: {len(self.results)} "
|
||||
f"Passed: {self.pass_count} "
|
||||
f"Failed: {self.fail_count} "
|
||||
f"Success rate: {self.success_rate:.0%}",
|
||||
f"Total time: {self.total_time_ms} ms "
|
||||
f"LLM calls: {self.total_llm_calls} "
|
||||
f"Metabolic cost: {self.total_metabolic_cost:.1f}",
|
||||
]
|
||||
if self.commit_sha:
|
||||
lines.append(f"Commit: {self.commit_sha}")
|
||||
lines.append("")
|
||||
for r in self.results:
|
||||
status = "PASS" if r.success else "FAIL"
|
||||
lines.append(
|
||||
f" [{status}] {r.scenario_name} — "
|
||||
f"{r.cycles_used}/{r.max_cycles} cycles, "
|
||||
f"{r.wall_time_ms} ms, "
|
||||
f"{r.llm_calls} LLM calls"
|
||||
)
|
||||
if r.error:
|
||||
lines.append(f" Error: {r.error}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def load_history(path: Path) -> list[dict]:
|
||||
"""Load benchmark history from a JSONL file.
|
||||
|
||||
Returns:
|
||||
List of run records, most recent first.
|
||||
"""
|
||||
path = Path(path)
|
||||
if not path.exists():
|
||||
return []
|
||||
records: list[dict] = []
|
||||
for line in path.read_text().strip().splitlines():
|
||||
try:
|
||||
records.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
return list(reversed(records))
|
||||
|
||||
|
||||
def compare_runs(
|
||||
current: BenchmarkMetrics,
|
||||
baseline: BenchmarkMetrics,
|
||||
) -> str:
|
||||
"""Compare two benchmark runs and report regressions.
|
||||
|
||||
Returns:
|
||||
Human-readable comparison report.
|
||||
"""
|
||||
lines = ["=== Regression Report ==="]
|
||||
|
||||
# Overall
|
||||
rate_delta = current.success_rate - baseline.success_rate
|
||||
lines.append(
|
||||
f"Success rate: {baseline.success_rate:.0%} -> {current.success_rate:.0%} "
|
||||
f"({rate_delta:+.0%})"
|
||||
)
|
||||
|
||||
cost_delta = current.total_metabolic_cost - baseline.total_metabolic_cost
|
||||
if baseline.total_metabolic_cost > 0:
|
||||
cost_pct = (cost_delta / baseline.total_metabolic_cost) * 100
|
||||
lines.append(
|
||||
f"Metabolic cost: {baseline.total_metabolic_cost:.1f} -> "
|
||||
f"{current.total_metabolic_cost:.1f} ({cost_pct:+.1f}%)"
|
||||
)
|
||||
|
||||
# Per-scenario
|
||||
baseline_map = {r.scenario_name: r for r in baseline.results}
|
||||
for r in current.results:
|
||||
b = baseline_map.get(r.scenario_name)
|
||||
if b is None:
|
||||
lines.append(f" [NEW] {r.scenario_name}")
|
||||
continue
|
||||
if b.success and not r.success:
|
||||
lines.append(f" [REGRESSION] {r.scenario_name} — was PASS, now FAIL")
|
||||
elif not b.success and r.success:
|
||||
lines.append(f" [IMPROVEMENT] {r.scenario_name} — was FAIL, now PASS")
|
||||
elif r.cycles_used > b.cycles_used * 1.5:
|
||||
lines.append(
|
||||
f" [SLOWER] {r.scenario_name} — "
|
||||
f"{b.cycles_used} -> {r.cycles_used} cycles (+{r.cycles_used - b.cycles_used})"
|
||||
)
|
||||
|
||||
return "\n".join(lines)
|
||||
167
src/infrastructure/world/benchmark/runner.py
Normal file
167
src/infrastructure/world/benchmark/runner.py
Normal file
@@ -0,0 +1,167 @@
|
||||
"""Benchmark runner — executes scenarios through the heartbeat loop.
|
||||
|
||||
Wires each ``BenchmarkScenario`` into a ``MockWorldAdapter`` (or a
|
||||
supplied adapter), runs the heartbeat for up to ``max_cycles``, and
|
||||
collects ``BenchmarkMetrics``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import subprocess
|
||||
import time
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from infrastructure.world.adapters.mock import MockWorldAdapter
|
||||
from infrastructure.world.benchmark.metrics import BenchmarkMetrics, ScenarioResult
|
||||
from infrastructure.world.benchmark.scenarios import BenchmarkScenario
|
||||
from infrastructure.world.interface import WorldInterface
|
||||
from loop.heartbeat import Heartbeat
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Rough estimate: each heartbeat cycle costs ~1 unit of metabolic cost
|
||||
# (gather + reason + act phases each touch the LLM router once).
|
||||
_COST_PER_CYCLE = 3.0 # three phases per cycle
|
||||
|
||||
|
||||
class BenchmarkRunner:
|
||||
"""Run benchmark scenarios and collect metrics.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
adapter_factory:
|
||||
Optional callable that returns a ``WorldInterface`` for a given
|
||||
scenario. Defaults to building a ``MockWorldAdapter`` from the
|
||||
scenario's start state.
|
||||
heartbeat_interval:
|
||||
Seconds between heartbeat ticks (0 for immediate).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
adapter_factory=None,
|
||||
heartbeat_interval: float = 0.0,
|
||||
) -> None:
|
||||
self._adapter_factory = adapter_factory or self._default_adapter
|
||||
self._interval = heartbeat_interval
|
||||
|
||||
# -- public API --------------------------------------------------------
|
||||
|
||||
async def run(
|
||||
self,
|
||||
scenarios: list[BenchmarkScenario],
|
||||
) -> BenchmarkMetrics:
|
||||
"""Execute all *scenarios* and return aggregated metrics."""
|
||||
metrics = BenchmarkMetrics(
|
||||
timestamp=datetime.now(UTC).isoformat(),
|
||||
commit_sha=self._git_sha(),
|
||||
)
|
||||
suite_start = time.monotonic()
|
||||
|
||||
for scenario in scenarios:
|
||||
logger.info("Benchmark: starting '%s'", scenario.name)
|
||||
result = await self._run_scenario(scenario)
|
||||
metrics.results.append(result)
|
||||
status = "PASS" if result.success else "FAIL"
|
||||
logger.info(
|
||||
"Benchmark: '%s' %s (%d/%d cycles, %d ms)",
|
||||
scenario.name,
|
||||
status,
|
||||
result.cycles_used,
|
||||
result.max_cycles,
|
||||
result.wall_time_ms,
|
||||
)
|
||||
|
||||
metrics.total_time_ms = int((time.monotonic() - suite_start) * 1000)
|
||||
return metrics
|
||||
|
||||
# -- internal ----------------------------------------------------------
|
||||
|
||||
async def _run_scenario(self, scenario: BenchmarkScenario) -> ScenarioResult:
|
||||
"""Run a single scenario through the heartbeat loop."""
|
||||
result = ScenarioResult(
|
||||
scenario_name=scenario.name,
|
||||
max_cycles=scenario.max_cycles,
|
||||
tags=list(scenario.tags),
|
||||
)
|
||||
|
||||
adapter = self._adapter_factory(scenario)
|
||||
adapter.connect()
|
||||
|
||||
hb = Heartbeat(world=adapter, interval=self._interval)
|
||||
actions: list[dict] = []
|
||||
|
||||
start = time.monotonic()
|
||||
try:
|
||||
for cycle in range(1, scenario.max_cycles + 1):
|
||||
record = await hb.run_once()
|
||||
result.cycles_used = cycle
|
||||
|
||||
# Track LLM calls (each cycle has 3 phases that may call LLM)
|
||||
result.llm_calls += 3
|
||||
|
||||
# Accumulate actions for goal predicate
|
||||
if record.action_taken and record.action_taken != "idle":
|
||||
actions.append(
|
||||
{
|
||||
"action": record.action_taken,
|
||||
"target": record.observation.get("location", ""),
|
||||
"status": record.action_status,
|
||||
}
|
||||
)
|
||||
|
||||
# Update adapter location if scenario simulates movement
|
||||
current_location = self._get_current_location(adapter)
|
||||
|
||||
# Check goal predicate
|
||||
if scenario.goal_predicate is not None:
|
||||
if scenario.goal_predicate(actions, current_location):
|
||||
result.success = True
|
||||
break
|
||||
elif cycle == scenario.max_cycles:
|
||||
# No predicate — success if we survived all cycles
|
||||
result.success = True
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning("Benchmark scenario '%s' crashed: %s", scenario.name, exc)
|
||||
result.error = str(exc)
|
||||
finally:
|
||||
adapter.disconnect()
|
||||
|
||||
result.wall_time_ms = int((time.monotonic() - start) * 1000)
|
||||
result.metabolic_cost = result.cycles_used * _COST_PER_CYCLE
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _default_adapter(scenario: BenchmarkScenario) -> WorldInterface:
|
||||
"""Build a MockWorldAdapter from a scenario's starting state."""
|
||||
return MockWorldAdapter(
|
||||
location=scenario.start_location,
|
||||
entities=list(scenario.entities),
|
||||
events=list(scenario.events),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _get_current_location(adapter: WorldInterface) -> str:
|
||||
"""Read the current location from the adapter."""
|
||||
try:
|
||||
perception = adapter.observe()
|
||||
return perception.location
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
@staticmethod
|
||||
def _git_sha() -> str:
|
||||
"""Best-effort: return the current git commit SHA."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "rev-parse", "--short", "HEAD"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5,
|
||||
)
|
||||
return result.stdout.strip() if result.returncode == 0 else ""
|
||||
except (OSError, subprocess.TimeoutExpired):
|
||||
return ""
|
||||
160
src/infrastructure/world/benchmark/scenarios.py
Normal file
160
src/infrastructure/world/benchmark/scenarios.py
Normal file
@@ -0,0 +1,160 @@
|
||||
"""Benchmark scenario definitions for Morrowind agent regression testing.
|
||||
|
||||
Each scenario specifies a starting location, goal conditions, world state
|
||||
(entities, events), and maximum cycles allowed. The runner feeds these
|
||||
into the heartbeat loop and checks completion against the goal predicate.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class BenchmarkScenario:
|
||||
"""A reproducible agent task used to detect performance regressions.
|
||||
|
||||
Attributes:
|
||||
name: Human-readable scenario name.
|
||||
description: What the scenario tests.
|
||||
start_location: Where the agent begins.
|
||||
goal_location: Target location (if navigation scenario).
|
||||
entities: NPCs / objects present in the world.
|
||||
events: Game events injected each cycle.
|
||||
max_cycles: Hard cap on heartbeat cycles before failure.
|
||||
goal_predicate: Optional callable ``(actions, location) -> bool``
|
||||
evaluated after each cycle to check early success.
|
||||
tags: Freeform tags for filtering (e.g. "navigation", "quest").
|
||||
"""
|
||||
|
||||
name: str
|
||||
description: str
|
||||
start_location: str
|
||||
goal_location: str = ""
|
||||
entities: list[str] = field(default_factory=list)
|
||||
events: list[str] = field(default_factory=list)
|
||||
max_cycles: int = 50
|
||||
goal_predicate: Callable | None = None
|
||||
tags: list[str] = field(default_factory=list)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Goal predicates
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _reached_location(target: str) -> Callable:
|
||||
"""Return a predicate that checks whether the agent reached *target*."""
|
||||
|
||||
def predicate(actions: list[dict], current_location: str) -> bool:
|
||||
return current_location.lower() == target.lower()
|
||||
|
||||
return predicate
|
||||
|
||||
|
||||
def _interacted_with(npc: str) -> Callable:
|
||||
"""Return a predicate that checks for a speak/interact action with *npc*."""
|
||||
|
||||
def predicate(actions: list[dict], current_location: str) -> bool:
|
||||
for act in actions:
|
||||
if act.get("action") in ("speak", "interact", "talk"):
|
||||
if act.get("target", "").lower() == npc.lower():
|
||||
return True
|
||||
return False
|
||||
|
||||
return predicate
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Built-in scenarios
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
BUILTIN_SCENARIOS: list[BenchmarkScenario] = [
|
||||
BenchmarkScenario(
|
||||
name="Walk Seyda Neen to Balmora",
|
||||
description=(
|
||||
"Navigate from the starting village to Balmora via the road. "
|
||||
"Tests basic navigation and pathfinding."
|
||||
),
|
||||
start_location="Seyda Neen",
|
||||
goal_location="Balmora",
|
||||
entities=["Silt Strider", "Road Sign", "Mudcrab"],
|
||||
events=["player_spawned"],
|
||||
max_cycles=30,
|
||||
goal_predicate=_reached_location("Balmora"),
|
||||
tags=["navigation", "basic"],
|
||||
),
|
||||
BenchmarkScenario(
|
||||
name="Fargoth's Ring",
|
||||
description=(
|
||||
"Complete the Fargoth quest: find Fargoth, receive the ring, "
|
||||
"and return it. Tests NPC interaction and quest logic."
|
||||
),
|
||||
start_location="Seyda Neen",
|
||||
goal_location="Seyda Neen",
|
||||
entities=["Fargoth", "Arrille", "Guard"],
|
||||
events=["quest_available:fargoth_ring"],
|
||||
max_cycles=40,
|
||||
goal_predicate=_interacted_with("Fargoth"),
|
||||
tags=["quest", "npc_interaction"],
|
||||
),
|
||||
BenchmarkScenario(
|
||||
name="Balmora Guild Navigation",
|
||||
description=(
|
||||
"Walk from Balmora South Wall Corner Club to the Fighters Guild. "
|
||||
"Tests intra-city navigation with multiple NPCs present."
|
||||
),
|
||||
start_location="Balmora, South Wall Corner Club",
|
||||
goal_location="Balmora, Fighters Guild",
|
||||
entities=["Guard", "Merchant", "Caius Cosades"],
|
||||
events=["player_entered"],
|
||||
max_cycles=20,
|
||||
goal_predicate=_reached_location("Balmora, Fighters Guild"),
|
||||
tags=["navigation", "city"],
|
||||
),
|
||||
BenchmarkScenario(
|
||||
name="Combat Encounter — Mudcrab",
|
||||
description=(
|
||||
"Engage and defeat a single Mudcrab on the road between "
|
||||
"Seyda Neen and Balmora. Tests combat action selection."
|
||||
),
|
||||
start_location="Bitter Coast Road",
|
||||
goal_location="Bitter Coast Road",
|
||||
entities=["Mudcrab"],
|
||||
events=["hostile_entity_nearby"],
|
||||
max_cycles=15,
|
||||
goal_predicate=None, # Success = survived max_cycles without crash
|
||||
tags=["combat", "basic"],
|
||||
),
|
||||
BenchmarkScenario(
|
||||
name="Passive Observation — Balmora Market",
|
||||
description=(
|
||||
"Observe the Balmora market for 10 cycles without acting. "
|
||||
"Tests that the agent can reason without unnecessary actions."
|
||||
),
|
||||
start_location="Balmora, Market Square",
|
||||
goal_location="",
|
||||
entities=["Merchant", "Guard", "Pilgrim", "Trader"],
|
||||
events=["market_day"],
|
||||
max_cycles=10,
|
||||
tags=["observation", "passive"],
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def load_scenarios(
|
||||
tags: list[str] | None = None,
|
||||
) -> list[BenchmarkScenario]:
|
||||
"""Return built-in scenarios, optionally filtered by tags.
|
||||
|
||||
Args:
|
||||
tags: If provided, only return scenarios whose tags overlap.
|
||||
|
||||
Returns:
|
||||
List of matching ``BenchmarkScenario`` instances.
|
||||
"""
|
||||
if tags is None:
|
||||
return list(BUILTIN_SCENARIOS)
|
||||
tag_set = set(tags)
|
||||
return [s for s in BUILTIN_SCENARIOS if tag_set & set(s.tags)]
|
||||
@@ -215,6 +215,119 @@ def _summarize(result: AgenticResult, total_steps: int, was_truncated: bool) ->
|
||||
result.status = "completed"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Execution orchestrator
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def _execute_all_steps(
|
||||
agent,
|
||||
task: str,
|
||||
task_id: str,
|
||||
steps: list[str],
|
||||
total_steps: int,
|
||||
session_id: str,
|
||||
result: AgenticResult,
|
||||
on_progress: Callable | None,
|
||||
) -> list[str]:
|
||||
"""Execute all planned steps, handling failures with adaptation.
|
||||
|
||||
Appends AgenticStep objects to *result.steps* and returns the list
|
||||
of completed-result strings (used as context for later steps).
|
||||
"""
|
||||
completed_results: list[str] = []
|
||||
|
||||
for i, step_desc in enumerate(steps, 1):
|
||||
step_start = time.monotonic()
|
||||
try:
|
||||
step = await _execute_step(
|
||||
agent,
|
||||
task,
|
||||
step_desc,
|
||||
i,
|
||||
total_steps,
|
||||
completed_results,
|
||||
session_id,
|
||||
)
|
||||
result.steps.append(step)
|
||||
completed_results.append(f"Step {i}: {step.result[:200]}")
|
||||
await _broadcast_progress(
|
||||
"agentic.step_complete",
|
||||
{
|
||||
"task_id": task_id,
|
||||
"step": i,
|
||||
"total": total_steps,
|
||||
"description": step_desc,
|
||||
"result": step.result[:200],
|
||||
},
|
||||
)
|
||||
if on_progress:
|
||||
await on_progress(step_desc, i, total_steps)
|
||||
|
||||
except Exception as exc: # broad catch intentional: agent.run can raise any error
|
||||
logger.warning("Agentic loop step %d failed: %s", i, exc)
|
||||
step = await _handle_step_failure(
|
||||
agent,
|
||||
step_desc,
|
||||
i,
|
||||
total_steps,
|
||||
task_id,
|
||||
exc,
|
||||
step_start,
|
||||
session_id,
|
||||
result,
|
||||
completed_results,
|
||||
on_progress,
|
||||
)
|
||||
|
||||
return completed_results
|
||||
|
||||
|
||||
async def _handle_step_failure(
|
||||
agent,
|
||||
step_desc: str,
|
||||
step_num: int,
|
||||
total_steps: int,
|
||||
task_id: str,
|
||||
exc: Exception,
|
||||
step_start: float,
|
||||
session_id: str,
|
||||
result: AgenticResult,
|
||||
completed_results: list[str],
|
||||
on_progress: Callable | None,
|
||||
) -> None:
|
||||
"""Try to adapt a failed step; record a hard failure if adaptation also fails."""
|
||||
try:
|
||||
step = await _adapt_step(agent, step_desc, step_num, exc, step_start, session_id)
|
||||
result.steps.append(step)
|
||||
completed_results.append(f"Step {step_num} (adapted): {step.result[:200]}")
|
||||
await _broadcast_progress(
|
||||
"agentic.step_adapted",
|
||||
{
|
||||
"task_id": task_id,
|
||||
"step": step_num,
|
||||
"total": total_steps,
|
||||
"description": step_desc,
|
||||
"error": str(exc),
|
||||
"adaptation": step.result[:200],
|
||||
},
|
||||
)
|
||||
if on_progress:
|
||||
await on_progress(f"[Adapted] {step_desc}", step_num, total_steps)
|
||||
except Exception as adapt_exc: # broad catch intentional
|
||||
logger.error("Agentic loop adaptation also failed: %s", adapt_exc)
|
||||
result.steps.append(
|
||||
AgenticStep(
|
||||
step_num=step_num,
|
||||
description=step_desc,
|
||||
result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}",
|
||||
status="failed",
|
||||
duration_ms=int((time.monotonic() - step_start) * 1000),
|
||||
)
|
||||
)
|
||||
completed_results.append(f"Step {step_num}: FAILED")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Core loop
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -265,65 +378,9 @@ async def run_agentic_loop(
|
||||
)
|
||||
|
||||
# Phase 2: Execution
|
||||
completed_results: list[str] = []
|
||||
for i, step_desc in enumerate(steps, 1):
|
||||
step_start = time.monotonic()
|
||||
try:
|
||||
step = await _execute_step(
|
||||
agent,
|
||||
task,
|
||||
step_desc,
|
||||
i,
|
||||
total_steps,
|
||||
completed_results,
|
||||
session_id,
|
||||
)
|
||||
result.steps.append(step)
|
||||
completed_results.append(f"Step {i}: {step.result[:200]}")
|
||||
await _broadcast_progress(
|
||||
"agentic.step_complete",
|
||||
{
|
||||
"task_id": task_id,
|
||||
"step": i,
|
||||
"total": total_steps,
|
||||
"description": step_desc,
|
||||
"result": step.result[:200],
|
||||
},
|
||||
)
|
||||
if on_progress:
|
||||
await on_progress(step_desc, i, total_steps)
|
||||
|
||||
except Exception as exc: # broad catch intentional: agent.run can raise any error
|
||||
logger.warning("Agentic loop step %d failed: %s", i, exc)
|
||||
try:
|
||||
step = await _adapt_step(agent, step_desc, i, exc, step_start, session_id)
|
||||
result.steps.append(step)
|
||||
completed_results.append(f"Step {i} (adapted): {step.result[:200]}")
|
||||
await _broadcast_progress(
|
||||
"agentic.step_adapted",
|
||||
{
|
||||
"task_id": task_id,
|
||||
"step": i,
|
||||
"total": total_steps,
|
||||
"description": step_desc,
|
||||
"error": str(exc),
|
||||
"adaptation": step.result[:200],
|
||||
},
|
||||
)
|
||||
if on_progress:
|
||||
await on_progress(f"[Adapted] {step_desc}", i, total_steps)
|
||||
except Exception as adapt_exc: # broad catch intentional
|
||||
logger.error("Agentic loop adaptation also failed: %s", adapt_exc)
|
||||
result.steps.append(
|
||||
AgenticStep(
|
||||
step_num=i,
|
||||
description=step_desc,
|
||||
result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}",
|
||||
status="failed",
|
||||
duration_ms=int((time.monotonic() - step_start) * 1000),
|
||||
)
|
||||
)
|
||||
completed_results.append(f"Step {i}: FAILED")
|
||||
await _execute_all_steps(
|
||||
agent, task, task_id, steps, total_steps, session_id, result, on_progress
|
||||
)
|
||||
|
||||
# Phase 3: Summary
|
||||
_summarize(result, total_steps, was_truncated)
|
||||
|
||||
540
src/timmy/mcp_bridge.py
Normal file
540
src/timmy/mcp_bridge.py
Normal file
@@ -0,0 +1,540 @@
|
||||
"""MCP Bridge for Qwen3 via Ollama.
|
||||
|
||||
Provides a lightweight bridge between Ollama's native tool-calling API
|
||||
and MCP tool servers (Gitea, Filesystem, Shell). Unlike the Agno-based
|
||||
agent loop, this bridge talks directly to the Ollama ``/api/chat``
|
||||
endpoint, translating MCP tool schemas into Ollama tool definitions and
|
||||
executing tool calls in a loop until the model produces a final response.
|
||||
|
||||
Designed for Qwen3 models which have first-class tool-calling support.
|
||||
|
||||
Usage::
|
||||
|
||||
from timmy.mcp_bridge import MCPBridge
|
||||
|
||||
bridge = MCPBridge()
|
||||
async with bridge:
|
||||
result = await bridge.run("List open issues in Timmy-time-dashboard")
|
||||
print(result.content)
|
||||
|
||||
The bridge evaluates available options in order of preference:
|
||||
1. Direct Ollama /api/chat with native tool_calls (selected — best fit)
|
||||
2. qwen-agent MCP (requires separate qwen-agent install)
|
||||
3. ollmcp / mcphost / ollama-mcp-bridge (external binaries)
|
||||
|
||||
Option 1 was selected because:
|
||||
- Zero additional dependencies (uses httpx already in the project)
|
||||
- Native Qwen3 tool-calling support via Ollama's OpenAI-compatible API
|
||||
- Full control over the tool-call loop and error handling
|
||||
- Consistent with the project's graceful-degradation pattern
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Maximum tool-call round-trips before aborting (safety valve).
|
||||
_MAX_TOOL_ROUNDS = 10
|
||||
|
||||
|
||||
@dataclass
|
||||
class BridgeResult:
|
||||
"""Result from an MCP bridge run."""
|
||||
|
||||
content: str
|
||||
tool_calls_made: list[dict] = field(default_factory=list)
|
||||
rounds: int = 0
|
||||
latency_ms: float = 0.0
|
||||
model: str = ""
|
||||
error: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class MCPToolDef:
|
||||
"""An MCP tool definition translated for Ollama."""
|
||||
|
||||
name: str
|
||||
description: str
|
||||
parameters: dict[str, Any]
|
||||
handler: Any # async callable(**kwargs) -> str
|
||||
|
||||
|
||||
def _mcp_schema_to_ollama_tool(tool: MCPToolDef) -> dict:
|
||||
"""Convert an MCPToolDef into Ollama's tool format.
|
||||
|
||||
Ollama uses OpenAI-compatible tool definitions::
|
||||
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "...",
|
||||
"description": "...",
|
||||
"parameters": { "type": "object", "properties": {...}, "required": [...] }
|
||||
}
|
||||
}
|
||||
"""
|
||||
# Normalise parameters — ensure it has "type": "object" wrapper.
|
||||
params = tool.parameters
|
||||
if params.get("type") != "object":
|
||||
params = {
|
||||
"type": "object",
|
||||
"properties": params,
|
||||
"required": list(params.keys()),
|
||||
}
|
||||
|
||||
return {
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": tool.name,
|
||||
"description": tool.description,
|
||||
"parameters": params,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _build_shell_tool() -> MCPToolDef | None:
|
||||
"""Build the shell execution tool using the local ShellHand."""
|
||||
try:
|
||||
from infrastructure.hands.shell import shell_hand
|
||||
|
||||
async def _handle_shell(**kwargs: Any) -> str:
|
||||
command = kwargs.get("command", "")
|
||||
timeout = kwargs.get("timeout")
|
||||
result = await shell_hand.run(command, timeout=timeout)
|
||||
if result.success:
|
||||
return result.stdout or "(no output)"
|
||||
return f"[error] exit={result.exit_code} {result.error or result.stderr}"
|
||||
|
||||
return MCPToolDef(
|
||||
name="shell_exec",
|
||||
description=(
|
||||
"Execute a shell command in a sandboxed environment. "
|
||||
"Commands are validated against an allow-list. "
|
||||
"Returns stdout, stderr, and exit code."
|
||||
),
|
||||
parameters={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"command": {
|
||||
"type": "string",
|
||||
"description": "Shell command to execute (must match allow-list)",
|
||||
},
|
||||
"timeout": {
|
||||
"type": "integer",
|
||||
"description": "Timeout in seconds (default 60)",
|
||||
},
|
||||
},
|
||||
"required": ["command"],
|
||||
},
|
||||
handler=_handle_shell,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("Shell tool unavailable: %s", exc)
|
||||
return None
|
||||
|
||||
|
||||
def _build_gitea_tools() -> list[MCPToolDef]:
|
||||
"""Build Gitea MCP tool definitions for direct Ollama bridge use.
|
||||
|
||||
These tools call the Gitea REST API directly via httpx rather than
|
||||
spawning an MCP server subprocess, keeping the bridge lightweight.
|
||||
"""
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
return []
|
||||
|
||||
base_url = settings.gitea_url
|
||||
token = settings.gitea_token
|
||||
owner, repo = settings.gitea_repo.split("/", 1)
|
||||
|
||||
async def _list_issues(**kwargs: Any) -> str:
|
||||
state = kwargs.get("state", "open")
|
||||
limit = kwargs.get("limit", 10)
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
resp = await client.get(
|
||||
f"{base_url}/api/v1/repos/{owner}/{repo}/issues",
|
||||
headers={"Authorization": f"token {token}"},
|
||||
params={"state": state, "limit": limit, "type": "issues"},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
issues = resp.json()
|
||||
if not issues:
|
||||
return f"No {state} issues found."
|
||||
lines = []
|
||||
for issue in issues:
|
||||
labels = ", ".join(lb["name"] for lb in issue.get("labels", []))
|
||||
label_str = f" [{labels}]" if labels else ""
|
||||
lines.append(f"#{issue['number']}: {issue['title']}{label_str}")
|
||||
return "\n".join(lines)
|
||||
except Exception as exc:
|
||||
return f"Error listing issues: {exc}"
|
||||
|
||||
async def _create_issue(**kwargs: Any) -> str:
|
||||
title = kwargs.get("title", "")
|
||||
body = kwargs.get("body", "")
|
||||
if not title:
|
||||
return "Error: title is required"
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
resp = await client.post(
|
||||
f"{base_url}/api/v1/repos/{owner}/{repo}/issues",
|
||||
headers={
|
||||
"Authorization": f"token {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
json={"title": title, "body": body},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
return f"Created issue #{data['number']}: {data['title']}"
|
||||
except Exception as exc:
|
||||
return f"Error creating issue: {exc}"
|
||||
|
||||
async def _read_issue(**kwargs: Any) -> str:
|
||||
number = kwargs.get("number")
|
||||
if not number:
|
||||
return "Error: issue number is required"
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
resp = await client.get(
|
||||
f"{base_url}/api/v1/repos/{owner}/{repo}/issues/{number}",
|
||||
headers={"Authorization": f"token {token}"},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
issue = resp.json()
|
||||
labels = ", ".join(lb["name"] for lb in issue.get("labels", []))
|
||||
parts = [
|
||||
f"#{issue['number']}: {issue['title']}",
|
||||
f"State: {issue['state']}",
|
||||
]
|
||||
if labels:
|
||||
parts.append(f"Labels: {labels}")
|
||||
if issue.get("body"):
|
||||
parts.append(f"\n{issue['body']}")
|
||||
return "\n".join(parts)
|
||||
except Exception as exc:
|
||||
return f"Error reading issue: {exc}"
|
||||
|
||||
return [
|
||||
MCPToolDef(
|
||||
name="list_issues",
|
||||
description="List issues in the Gitea repository. Returns issue numbers and titles.",
|
||||
parameters={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"state": {
|
||||
"type": "string",
|
||||
"description": "Filter by state: open, closed, or all (default: open)",
|
||||
},
|
||||
"limit": {
|
||||
"type": "integer",
|
||||
"description": "Maximum number of issues to return (default: 10)",
|
||||
},
|
||||
},
|
||||
"required": [],
|
||||
},
|
||||
handler=_list_issues,
|
||||
),
|
||||
MCPToolDef(
|
||||
name="create_issue",
|
||||
description="Create a new issue in the Gitea repository.",
|
||||
parameters={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"title": {
|
||||
"type": "string",
|
||||
"description": "Issue title (required)",
|
||||
},
|
||||
"body": {
|
||||
"type": "string",
|
||||
"description": "Issue body in markdown (optional)",
|
||||
},
|
||||
},
|
||||
"required": ["title"],
|
||||
},
|
||||
handler=_create_issue,
|
||||
),
|
||||
MCPToolDef(
|
||||
name="read_issue",
|
||||
description="Read details of a specific issue by number.",
|
||||
parameters={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"number": {
|
||||
"type": "integer",
|
||||
"description": "Issue number to read",
|
||||
},
|
||||
},
|
||||
"required": ["number"],
|
||||
},
|
||||
handler=_read_issue,
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
class MCPBridge:
|
||||
"""Bridge between Ollama's tool-calling API and MCP tools.
|
||||
|
||||
Manages a set of tool definitions and executes a chat loop with
|
||||
tool calling against a Qwen3 model via Ollama.
|
||||
|
||||
The bridge:
|
||||
1. Registers available tools (Gitea, shell, custom)
|
||||
2. Sends prompts to Ollama with tool definitions
|
||||
3. Executes tool calls when the model requests them
|
||||
4. Returns tool results to the model for the next round
|
||||
5. Repeats until the model produces a final text response
|
||||
|
||||
Attributes:
|
||||
model: Ollama model name (default from settings).
|
||||
ollama_url: Ollama API base URL (default from settings).
|
||||
tools: Registered tool definitions.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model: str | None = None,
|
||||
ollama_url: str | None = None,
|
||||
*,
|
||||
include_gitea: bool = True,
|
||||
include_shell: bool = True,
|
||||
extra_tools: list[MCPToolDef] | None = None,
|
||||
max_rounds: int = _MAX_TOOL_ROUNDS,
|
||||
) -> None:
|
||||
self.model = model or settings.ollama_model
|
||||
self.ollama_url = ollama_url or settings.normalized_ollama_url
|
||||
self.max_rounds = max_rounds
|
||||
self._tools: dict[str, MCPToolDef] = {}
|
||||
self._client: httpx.AsyncClient | None = None
|
||||
|
||||
# Register built-in tools
|
||||
if include_gitea:
|
||||
for tool in _build_gitea_tools():
|
||||
self._tools[tool.name] = tool
|
||||
|
||||
if include_shell:
|
||||
shell = _build_shell_tool()
|
||||
if shell:
|
||||
self._tools[shell.name] = shell
|
||||
|
||||
# Register extra tools
|
||||
if extra_tools:
|
||||
for tool in extra_tools:
|
||||
self._tools[tool.name] = tool
|
||||
|
||||
logger.info(
|
||||
"MCPBridge initialised: model=%s, tools=%s",
|
||||
self.model,
|
||||
list(self._tools.keys()),
|
||||
)
|
||||
|
||||
async def __aenter__(self) -> MCPBridge:
|
||||
self._client = httpx.AsyncClient(timeout=settings.mcp_bridge_timeout)
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *exc: Any) -> None:
|
||||
if self._client:
|
||||
await self._client.aclose()
|
||||
self._client = None
|
||||
|
||||
@property
|
||||
def tool_names(self) -> list[str]:
|
||||
"""Return names of all registered tools."""
|
||||
return list(self._tools.keys())
|
||||
|
||||
def _build_ollama_tools(self) -> list[dict]:
|
||||
"""Convert registered tools to Ollama tool format."""
|
||||
return [_mcp_schema_to_ollama_tool(t) for t in self._tools.values()]
|
||||
|
||||
async def _chat(self, messages: list[dict], tools: list[dict]) -> dict:
|
||||
"""Send a chat request to Ollama and return the response.
|
||||
|
||||
Uses the ``/api/chat`` endpoint with tool definitions.
|
||||
"""
|
||||
if not self._client:
|
||||
raise RuntimeError("MCPBridge must be used as async context manager")
|
||||
|
||||
payload: dict[str, Any] = {
|
||||
"model": self.model,
|
||||
"messages": messages,
|
||||
"stream": False,
|
||||
}
|
||||
if tools:
|
||||
payload["tools"] = tools
|
||||
|
||||
# Set num_ctx if configured
|
||||
if settings.ollama_num_ctx > 0:
|
||||
payload["options"] = {"num_ctx": settings.ollama_num_ctx}
|
||||
|
||||
resp = await self._client.post(
|
||||
f"{self.ollama_url}/api/chat",
|
||||
json=payload,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
async def _execute_tool_call(self, tool_call: dict) -> str:
|
||||
"""Execute a single tool call and return the result string."""
|
||||
func = tool_call.get("function", {})
|
||||
name = func.get("name", "")
|
||||
arguments = func.get("arguments", {})
|
||||
|
||||
tool = self._tools.get(name)
|
||||
if not tool:
|
||||
return f"Error: unknown tool '{name}'"
|
||||
|
||||
try:
|
||||
result = await tool.handler(**arguments)
|
||||
return str(result)
|
||||
except Exception as exc:
|
||||
logger.warning("Tool '%s' execution failed: %s", name, exc)
|
||||
return f"Error executing {name}: {exc}"
|
||||
|
||||
async def run(
|
||||
self,
|
||||
prompt: str,
|
||||
*,
|
||||
system_prompt: str | None = None,
|
||||
) -> BridgeResult:
|
||||
"""Run a prompt through the MCP bridge with tool calling.
|
||||
|
||||
Sends the prompt to the Ollama model with tool definitions.
|
||||
If the model requests tool calls, executes them and feeds
|
||||
results back until the model produces a final text response.
|
||||
|
||||
Args:
|
||||
prompt: User message to send.
|
||||
system_prompt: Optional system prompt override.
|
||||
|
||||
Returns:
|
||||
BridgeResult with the final response and tool call history.
|
||||
"""
|
||||
start = time.time()
|
||||
messages: list[dict] = []
|
||||
|
||||
if system_prompt:
|
||||
messages.append({"role": "system", "content": system_prompt})
|
||||
|
||||
messages.append({"role": "user", "content": prompt})
|
||||
|
||||
tools = self._build_ollama_tools()
|
||||
tool_calls_made: list[dict] = []
|
||||
rounds = 0
|
||||
|
||||
try:
|
||||
for round_num in range(self.max_rounds):
|
||||
rounds = round_num + 1
|
||||
response = await self._chat(messages, tools)
|
||||
msg = response.get("message", {})
|
||||
|
||||
# Check if model made tool calls
|
||||
model_tool_calls = msg.get("tool_calls", [])
|
||||
if not model_tool_calls:
|
||||
# Final text response — done.
|
||||
content = msg.get("content", "")
|
||||
latency = (time.time() - start) * 1000
|
||||
return BridgeResult(
|
||||
content=content,
|
||||
tool_calls_made=tool_calls_made,
|
||||
rounds=rounds,
|
||||
latency_ms=latency,
|
||||
model=self.model,
|
||||
)
|
||||
|
||||
# Append the assistant message (with tool_calls) to history
|
||||
messages.append(msg)
|
||||
|
||||
# Execute each tool call and add results
|
||||
for tc in model_tool_calls:
|
||||
func = tc.get("function", {})
|
||||
tool_name = func.get("name", "unknown")
|
||||
tool_args = func.get("arguments", {})
|
||||
|
||||
logger.info(
|
||||
"Bridge tool call [round %d]: %s(%s)",
|
||||
rounds,
|
||||
tool_name,
|
||||
tool_args,
|
||||
)
|
||||
|
||||
result = await self._execute_tool_call(tc)
|
||||
tool_calls_made.append(
|
||||
{
|
||||
"round": rounds,
|
||||
"tool": tool_name,
|
||||
"arguments": tool_args,
|
||||
"result": result[:500], # Truncate for logging
|
||||
}
|
||||
)
|
||||
|
||||
# Add tool result to message history
|
||||
messages.append(
|
||||
{
|
||||
"role": "tool",
|
||||
"content": result,
|
||||
}
|
||||
)
|
||||
|
||||
# Hit max rounds
|
||||
latency = (time.time() - start) * 1000
|
||||
return BridgeResult(
|
||||
content="(max tool-call rounds reached)",
|
||||
tool_calls_made=tool_calls_made,
|
||||
rounds=rounds,
|
||||
latency_ms=latency,
|
||||
model=self.model,
|
||||
error=f"Exceeded maximum of {self.max_rounds} tool-call rounds",
|
||||
)
|
||||
|
||||
except httpx.ConnectError as exc:
|
||||
latency = (time.time() - start) * 1000
|
||||
logger.warning("Ollama connection failed: %s", exc)
|
||||
return BridgeResult(
|
||||
content="",
|
||||
tool_calls_made=tool_calls_made,
|
||||
rounds=rounds,
|
||||
latency_ms=latency,
|
||||
model=self.model,
|
||||
error=f"Ollama connection failed: {exc}",
|
||||
)
|
||||
except httpx.HTTPStatusError as exc:
|
||||
latency = (time.time() - start) * 1000
|
||||
logger.warning("Ollama HTTP error: %s", exc)
|
||||
return BridgeResult(
|
||||
content="",
|
||||
tool_calls_made=tool_calls_made,
|
||||
rounds=rounds,
|
||||
latency_ms=latency,
|
||||
model=self.model,
|
||||
error=f"Ollama HTTP error: {exc.response.status_code}",
|
||||
)
|
||||
except Exception as exc:
|
||||
latency = (time.time() - start) * 1000
|
||||
logger.error("MCPBridge run failed: %s", exc)
|
||||
return BridgeResult(
|
||||
content="",
|
||||
tool_calls_made=tool_calls_made,
|
||||
rounds=rounds,
|
||||
latency_ms=latency,
|
||||
model=self.model,
|
||||
error=str(exc),
|
||||
)
|
||||
|
||||
def status(self) -> dict:
|
||||
"""Return bridge status for the dashboard."""
|
||||
return {
|
||||
"model": self.model,
|
||||
"ollama_url": self.ollama_url,
|
||||
"tools": self.tool_names,
|
||||
"max_rounds": self.max_rounds,
|
||||
"connected": self._client is not None,
|
||||
}
|
||||
369
src/timmy/research_triage.py
Normal file
369
src/timmy/research_triage.py
Normal file
@@ -0,0 +1,369 @@
|
||||
"""Research triage — extract action items from research reports and file Gitea issues.
|
||||
|
||||
Closes the loop: research → knowledge → actionable engineering work.
|
||||
|
||||
The LLM extracts action items during synthesis (not post-processed), then
|
||||
each item is filed as a Gitea issue with appropriate labels, source links,
|
||||
and evidence from the original research.
|
||||
|
||||
Usage::
|
||||
|
||||
from timmy.research_triage import triage_research_report
|
||||
|
||||
results = await triage_research_report(
|
||||
report="## Findings\\n...",
|
||||
source_issue=946,
|
||||
)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Regex to strip markdown code fences from LLM output
|
||||
_FENCE_RE = re.compile(r"^```(?:json)?\s*\n?", re.MULTILINE)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ActionItem:
|
||||
"""A single actionable item extracted from a research report."""
|
||||
|
||||
title: str
|
||||
body: str
|
||||
labels: list[str] = field(default_factory=list)
|
||||
priority: str = "medium"
|
||||
source_urls: list[str] = field(default_factory=list)
|
||||
|
||||
def to_issue_body(self, source_issue: int | None = None) -> str:
|
||||
"""Format for a Gitea issue body with source attribution."""
|
||||
parts = [self.body]
|
||||
|
||||
if self.source_urls:
|
||||
parts.append("\n### Source Evidence")
|
||||
for url in self.source_urls:
|
||||
parts.append(f"- {url}")
|
||||
|
||||
if source_issue:
|
||||
parts.append(
|
||||
f"\n### Origin\nExtracted from research in #{source_issue}"
|
||||
)
|
||||
|
||||
parts.append("\n---\n*Auto-triaged from research findings by Timmy*")
|
||||
return "\n".join(parts)
|
||||
|
||||
|
||||
def _build_extraction_prompt(report: str) -> str:
|
||||
"""Build the LLM prompt for extracting action items from a research report."""
|
||||
return (
|
||||
"You are triaging a research report for actionable engineering work.\n"
|
||||
"Extract 0-5 CONCRETE action items — bugs to fix, features to build,\n"
|
||||
"infrastructure to set up, or investigations to run.\n\n"
|
||||
"Rules:\n"
|
||||
"- Only include items that map to real engineering tasks\n"
|
||||
"- Skip vague recommendations or philosophical observations\n"
|
||||
"- Each item should be specific enough to become a Gitea issue\n"
|
||||
"- Include evidence/URLs from the report in source_urls\n"
|
||||
"- Priority: high (blocking or critical), medium (important), low (nice-to-have)\n"
|
||||
"- Labels: pick from [actionable, research, bug, feature, infrastructure, "
|
||||
"performance, security, kimi-ready]\n"
|
||||
" - 'kimi-ready' means a well-scoped task suitable for an AI agent\n"
|
||||
" - 'actionable' should be on every item (these are all actionable)\n\n"
|
||||
"For each item return:\n"
|
||||
'- "title": Clear, specific title with area prefix '
|
||||
'(e.g. "[MCP] Restore tool server with FastMCP")\n'
|
||||
'- "body": Detailed markdown body with:\n'
|
||||
" **What:** What needs to be done\n"
|
||||
" **Why:** Why this matters (link to research finding)\n"
|
||||
" **Suggested approach:** How to implement\n"
|
||||
" **Acceptance criteria:** How to verify\n"
|
||||
'- "labels": Array of label strings\n'
|
||||
'- "priority": One of high, medium, low\n'
|
||||
'- "source_urls": Array of URLs referenced in the research\n\n'
|
||||
"Return ONLY a JSON array of objects. Return [] if nothing is actionable.\n\n"
|
||||
f"Research report:\n{report}\n\nJSON array:"
|
||||
)
|
||||
|
||||
|
||||
def _parse_llm_response(raw: str) -> list[dict[str, Any]]:
|
||||
"""Parse LLM JSON response, stripping code fences if present."""
|
||||
cleaned = raw.strip()
|
||||
|
||||
# Strip markdown code fences
|
||||
if cleaned.startswith("```"):
|
||||
cleaned = cleaned.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
|
||||
|
||||
items = json.loads(cleaned)
|
||||
if not isinstance(items, list):
|
||||
return []
|
||||
return items
|
||||
|
||||
|
||||
def _validate_action_item(raw_item: dict[str, Any]) -> ActionItem | None:
|
||||
"""Validate and convert a raw dict to an ActionItem, or None if invalid."""
|
||||
if not isinstance(raw_item, dict):
|
||||
return None
|
||||
|
||||
title = raw_item.get("title", "").strip()
|
||||
body = raw_item.get("body", "").strip()
|
||||
|
||||
if not title or len(title) < 10:
|
||||
return None
|
||||
if not body or len(body) < 20:
|
||||
return None
|
||||
|
||||
labels = raw_item.get("labels", [])
|
||||
if isinstance(labels, str):
|
||||
labels = [l.strip() for l in labels.split(",") if l.strip()]
|
||||
if not isinstance(labels, list):
|
||||
labels = []
|
||||
|
||||
# Ensure 'actionable' label is always present
|
||||
if "actionable" not in labels:
|
||||
labels.insert(0, "actionable")
|
||||
|
||||
priority = raw_item.get("priority", "medium").strip().lower()
|
||||
if priority not in ("high", "medium", "low"):
|
||||
priority = "medium"
|
||||
|
||||
source_urls = raw_item.get("source_urls", [])
|
||||
if not isinstance(source_urls, list):
|
||||
source_urls = []
|
||||
|
||||
return ActionItem(
|
||||
title=title,
|
||||
body=body,
|
||||
labels=labels,
|
||||
priority=priority,
|
||||
source_urls=source_urls,
|
||||
)
|
||||
|
||||
|
||||
async def extract_action_items(
|
||||
report: str,
|
||||
llm_caller: Any | None = None,
|
||||
) -> list[ActionItem]:
|
||||
"""Extract actionable engineering items from a research report.
|
||||
|
||||
Uses the LLM to identify concrete tasks, bugs, features, and
|
||||
infrastructure work from structured research output.
|
||||
|
||||
Args:
|
||||
report: The research report text (markdown).
|
||||
llm_caller: Optional async callable(prompt) -> str for LLM.
|
||||
Falls back to the cascade router.
|
||||
|
||||
Returns:
|
||||
List of validated ActionItem objects (0-5 items).
|
||||
"""
|
||||
if not report or not report.strip():
|
||||
return []
|
||||
|
||||
prompt = _build_extraction_prompt(report)
|
||||
|
||||
try:
|
||||
if llm_caller is not None:
|
||||
raw = await llm_caller(prompt)
|
||||
else:
|
||||
raw = await _call_llm(prompt)
|
||||
except Exception as exc:
|
||||
logger.warning("LLM extraction failed: %s", exc)
|
||||
return []
|
||||
|
||||
if not raw or not raw.strip():
|
||||
return []
|
||||
|
||||
try:
|
||||
raw_items = _parse_llm_response(raw)
|
||||
except (json.JSONDecodeError, ValueError) as exc:
|
||||
logger.warning("Failed to parse LLM action items: %s", exc)
|
||||
return []
|
||||
|
||||
items = []
|
||||
for raw_item in raw_items[:5]: # Safety cap
|
||||
item = _validate_action_item(raw_item)
|
||||
if item is not None:
|
||||
items.append(item)
|
||||
|
||||
logger.info("Extracted %d action items from research report", len(items))
|
||||
return items
|
||||
|
||||
|
||||
async def _call_llm(prompt: str) -> str:
|
||||
"""Call the cascade router for LLM completion.
|
||||
|
||||
Falls back gracefully if the router is unavailable.
|
||||
"""
|
||||
from infrastructure.router import get_router
|
||||
|
||||
router = get_router()
|
||||
messages = [{"role": "user", "content": prompt}]
|
||||
result = await router.complete(messages=messages, temperature=0.1)
|
||||
return result.get("content", "") if isinstance(result, dict) else str(result)
|
||||
|
||||
|
||||
async def create_gitea_issue(
|
||||
item: ActionItem,
|
||||
source_issue: int | None = None,
|
||||
) -> dict[str, Any] | None:
|
||||
"""Create a Gitea issue from an ActionItem via the REST API.
|
||||
|
||||
Args:
|
||||
item: The action item to file.
|
||||
source_issue: Parent research issue number to link back to.
|
||||
|
||||
Returns:
|
||||
The created issue dict from Gitea API, or None on failure.
|
||||
"""
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
logger.debug("Gitea not configured — skipping issue creation")
|
||||
return None
|
||||
|
||||
owner, repo = settings.gitea_repo.split("/", 1)
|
||||
api_url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/issues"
|
||||
|
||||
body = item.to_issue_body(source_issue=source_issue)
|
||||
|
||||
payload: dict[str, Any] = {
|
||||
"title": item.title,
|
||||
"body": body,
|
||||
}
|
||||
|
||||
# Resolve label names to IDs
|
||||
label_ids = await _resolve_label_ids(item.labels, owner, repo)
|
||||
if label_ids:
|
||||
payload["labels"] = label_ids
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
resp = await client.post(
|
||||
api_url,
|
||||
headers={
|
||||
"Authorization": f"token {settings.gitea_token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
json=payload,
|
||||
)
|
||||
|
||||
if resp.status_code in (200, 201):
|
||||
issue_data = resp.json()
|
||||
logger.info(
|
||||
"Created Gitea issue #%s: %s",
|
||||
issue_data.get("number", "?"),
|
||||
item.title[:60],
|
||||
)
|
||||
return issue_data
|
||||
|
||||
logger.warning(
|
||||
"Gitea issue creation failed (HTTP %s): %s",
|
||||
resp.status_code,
|
||||
resp.text[:200],
|
||||
)
|
||||
return None
|
||||
|
||||
except (httpx.ConnectError, httpx.ReadError, ConnectionError) as exc:
|
||||
logger.warning("Gitea connection failed: %s", exc)
|
||||
return None
|
||||
except Exception as exc:
|
||||
logger.error("Unexpected error creating Gitea issue: %s", exc)
|
||||
return None
|
||||
|
||||
|
||||
async def _resolve_label_ids(
|
||||
label_names: list[str],
|
||||
owner: str,
|
||||
repo: str,
|
||||
) -> list[int]:
|
||||
"""Resolve label names to Gitea label IDs, creating missing labels.
|
||||
|
||||
Returns a list of integer label IDs for the issue payload.
|
||||
"""
|
||||
if not label_names:
|
||||
return []
|
||||
|
||||
labels_url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/labels"
|
||||
headers = {
|
||||
"Authorization": f"token {settings.gitea_token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
# Fetch existing labels
|
||||
resp = await client.get(labels_url, headers=headers)
|
||||
if resp.status_code != 200:
|
||||
return []
|
||||
|
||||
existing = {l["name"]: l["id"] for l in resp.json()}
|
||||
label_ids = []
|
||||
|
||||
for name in label_names:
|
||||
if name in existing:
|
||||
label_ids.append(existing[name])
|
||||
else:
|
||||
# Auto-create missing labels with a default color
|
||||
create_resp = await client.post(
|
||||
labels_url,
|
||||
headers=headers,
|
||||
json={"name": name, "color": "#0075ca"},
|
||||
)
|
||||
if create_resp.status_code in (200, 201):
|
||||
label_ids.append(create_resp.json()["id"])
|
||||
|
||||
return label_ids
|
||||
|
||||
except Exception as exc:
|
||||
logger.debug("Label resolution failed: %s", exc)
|
||||
return []
|
||||
|
||||
|
||||
async def triage_research_report(
|
||||
report: str,
|
||||
source_issue: int | None = None,
|
||||
llm_caller: Any | None = None,
|
||||
dry_run: bool = False,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""End-to-end: extract action items from research and file Gitea issues.
|
||||
|
||||
This is the main entry point that closes the research → backlog loop.
|
||||
|
||||
Args:
|
||||
report: Research report text (markdown).
|
||||
source_issue: The Gitea issue number that produced this research.
|
||||
llm_caller: Optional async callable(prompt) -> str for LLM calls.
|
||||
dry_run: If True, extract items but don't create issues.
|
||||
|
||||
Returns:
|
||||
List of dicts with 'action_item' and 'gitea_issue' (or None) keys.
|
||||
"""
|
||||
items = await extract_action_items(report, llm_caller=llm_caller)
|
||||
|
||||
if not items:
|
||||
logger.info("No action items extracted from research report")
|
||||
return []
|
||||
|
||||
results = []
|
||||
for item in items:
|
||||
if dry_run:
|
||||
results.append({"action_item": item, "gitea_issue": None})
|
||||
continue
|
||||
|
||||
issue_data = await create_gitea_issue(item, source_issue=source_issue)
|
||||
results.append({"action_item": item, "gitea_issue": issue_data})
|
||||
|
||||
created_count = sum(1 for r in results if r["gitea_issue"] is not None)
|
||||
logger.info(
|
||||
"Research triage complete: %d items extracted, %d issues created",
|
||||
len(results),
|
||||
created_count,
|
||||
)
|
||||
return results
|
||||
@@ -473,6 +473,69 @@ def consult_grok(query: str) -> str:
|
||||
return response
|
||||
|
||||
|
||||
def web_fetch(url: str, max_tokens: int = 4000) -> str:
|
||||
"""Fetch a web page and return its main text content.
|
||||
|
||||
Downloads the URL, extracts readable text using trafilatura, and
|
||||
truncates to a token budget. Use this to read full articles, docs,
|
||||
or blog posts that web_search only returns snippets for.
|
||||
|
||||
Args:
|
||||
url: The URL to fetch (must start with http:// or https://).
|
||||
max_tokens: Maximum approximate token budget (default 4000).
|
||||
Text is truncated to max_tokens * 4 characters.
|
||||
|
||||
Returns:
|
||||
Extracted text content, or an error message on failure.
|
||||
"""
|
||||
if not url or not url.startswith(("http://", "https://")):
|
||||
return f"Error: invalid URL — must start with http:// or https://: {url!r}"
|
||||
|
||||
try:
|
||||
import requests as _requests
|
||||
except ImportError:
|
||||
return "Error: 'requests' package is not installed. Install with: pip install requests"
|
||||
|
||||
try:
|
||||
import trafilatura
|
||||
except ImportError:
|
||||
return (
|
||||
"Error: 'trafilatura' package is not installed. Install with: pip install trafilatura"
|
||||
)
|
||||
|
||||
try:
|
||||
resp = _requests.get(
|
||||
url,
|
||||
timeout=15,
|
||||
headers={"User-Agent": "TimmyResearchBot/1.0"},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
except _requests.exceptions.Timeout:
|
||||
return f"Error: request timed out after 15 seconds for {url}"
|
||||
except _requests.exceptions.HTTPError as exc:
|
||||
return f"Error: HTTP {exc.response.status_code} for {url}"
|
||||
except _requests.exceptions.RequestException as exc:
|
||||
return f"Error: failed to fetch {url} — {exc}"
|
||||
|
||||
text = trafilatura.extract(resp.text, include_tables=True, include_links=True)
|
||||
if not text:
|
||||
return f"Error: could not extract readable content from {url}"
|
||||
|
||||
char_budget = max_tokens * 4
|
||||
if len(text) > char_budget:
|
||||
text = text[:char_budget] + f"\n\n[…truncated to ~{max_tokens} tokens]"
|
||||
|
||||
return text
|
||||
|
||||
|
||||
def _register_web_fetch_tool(toolkit: Toolkit) -> None:
|
||||
"""Register the web_fetch tool for full-page content extraction."""
|
||||
try:
|
||||
toolkit.register(web_fetch, name="web_fetch")
|
||||
except Exception as exc:
|
||||
logger.warning("Tool execution failed (web_fetch registration): %s", exc)
|
||||
|
||||
|
||||
def _register_core_tools(toolkit: Toolkit, base_path: Path) -> None:
|
||||
"""Register core execution and file tools."""
|
||||
# Python execution
|
||||
@@ -672,6 +735,7 @@ def create_full_toolkit(base_dir: str | Path | None = None):
|
||||
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
||||
|
||||
_register_core_tools(toolkit, base_path)
|
||||
_register_web_fetch_tool(toolkit)
|
||||
_register_grok_tool(toolkit)
|
||||
_register_memory_tools(toolkit)
|
||||
_register_agentic_loop_tool(toolkit)
|
||||
@@ -829,6 +893,11 @@ def _analysis_tool_catalog() -> dict:
|
||||
"description": "Evaluate mathematical expressions with exact results",
|
||||
"available_in": ["orchestrator"],
|
||||
},
|
||||
"web_fetch": {
|
||||
"name": "Web Fetch",
|
||||
"description": "Fetch a web page and extract clean readable text (trafilatura)",
|
||||
"available_in": ["orchestrator"],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -14,10 +14,15 @@ app = typer.Typer(help="Timmy Serve — sovereign AI agent API")
|
||||
def start(
|
||||
port: int = typer.Option(8402, "--port", "-p", help="Port for the serve API"),
|
||||
host: str = typer.Option("0.0.0.0", "--host", "-h", help="Host to bind to"),
|
||||
price: int = typer.Option(100, "--price", help="Price per request in sats"),
|
||||
price: int = typer.Option(None, "--price", help="Price per request in sats (default: from config)"),
|
||||
dry_run: bool = typer.Option(False, "--dry-run", help="Print config and exit (for testing)"),
|
||||
):
|
||||
"""Start Timmy in serve mode."""
|
||||
from config import settings
|
||||
|
||||
if price is None:
|
||||
price = settings.grok_sats_hard_cap
|
||||
|
||||
typer.echo(f"Starting Timmy Serve on {host}:{port}")
|
||||
typer.echo(f"L402 payment proxy active — {price} sats per request")
|
||||
typer.echo("Press Ctrl-C to stop")
|
||||
|
||||
@@ -147,10 +147,12 @@ def clean_database(tmp_path):
|
||||
# IMPORTANT: swarm.task_queue.models also has a DB_PATH that writes to
|
||||
# tasks.db — it MUST be patched too, or error_capture.capture_error()
|
||||
# will write test data to the production database.
|
||||
tmp_sovereignty_db = tmp_path / "sovereignty_metrics.db"
|
||||
for mod_name, tmp_db in [
|
||||
("dashboard.routes.tasks", tmp_tasks_db),
|
||||
("dashboard.routes.work_orders", tmp_work_orders_db),
|
||||
("swarm.task_queue.models", tmp_tasks_db),
|
||||
("infrastructure.sovereignty_metrics", tmp_sovereignty_db),
|
||||
]:
|
||||
try:
|
||||
mod = __import__(mod_name, fromlist=["DB_PATH"])
|
||||
|
||||
496
tests/dashboard/test_health.py
Normal file
496
tests/dashboard/test_health.py
Normal file
@@ -0,0 +1,496 @@
|
||||
"""Unit tests for dashboard/routes/health.py.
|
||||
|
||||
Covers helper functions, caching, endpoint responses, and graceful
|
||||
degradation when subsystems (Ollama, SQLite) are unavailable.
|
||||
|
||||
Fixes #945
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from dashboard.routes.health import (
|
||||
DependencyStatus,
|
||||
HealthStatus,
|
||||
SovereigntyReport,
|
||||
_calculate_overall_score,
|
||||
_check_lightning,
|
||||
_check_ollama_sync,
|
||||
_check_sqlite,
|
||||
_generate_recommendations,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pydantic models
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDependencyStatusModel:
|
||||
"""Validate DependencyStatus model."""
|
||||
|
||||
def test_fields(self):
|
||||
dep = DependencyStatus(
|
||||
name="Test", status="healthy", sovereignty_score=8, details={"key": "val"}
|
||||
)
|
||||
assert dep.name == "Test"
|
||||
assert dep.status == "healthy"
|
||||
assert dep.sovereignty_score == 8
|
||||
assert dep.details == {"key": "val"}
|
||||
|
||||
def test_empty_details(self):
|
||||
dep = DependencyStatus(name="X", status="unavailable", sovereignty_score=0, details={})
|
||||
assert dep.details == {}
|
||||
|
||||
|
||||
class TestSovereigntyReportModel:
|
||||
"""Validate SovereigntyReport model."""
|
||||
|
||||
def test_fields(self):
|
||||
report = SovereigntyReport(
|
||||
overall_score=9.3,
|
||||
dependencies=[],
|
||||
timestamp="2026-01-01T00:00:00+00:00",
|
||||
recommendations=["All good"],
|
||||
)
|
||||
assert report.overall_score == 9.3
|
||||
assert report.dependencies == []
|
||||
assert report.recommendations == ["All good"]
|
||||
|
||||
|
||||
class TestHealthStatusModel:
|
||||
"""Validate HealthStatus model."""
|
||||
|
||||
def test_fields(self):
|
||||
hs = HealthStatus(
|
||||
status="ok",
|
||||
timestamp="2026-01-01T00:00:00+00:00",
|
||||
version="2.0.0",
|
||||
uptime_seconds=42.5,
|
||||
)
|
||||
assert hs.status == "ok"
|
||||
assert hs.uptime_seconds == 42.5
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helper functions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCalculateOverallScore:
|
||||
"""Test _calculate_overall_score."""
|
||||
|
||||
def test_empty_deps(self):
|
||||
assert _calculate_overall_score([]) == 0.0
|
||||
|
||||
def test_single_dep(self):
|
||||
deps = [DependencyStatus(name="A", status="healthy", sovereignty_score=7, details={})]
|
||||
assert _calculate_overall_score(deps) == 7.0
|
||||
|
||||
def test_averages_multiple(self):
|
||||
deps = [
|
||||
DependencyStatus(name="A", status="healthy", sovereignty_score=10, details={}),
|
||||
DependencyStatus(name="B", status="healthy", sovereignty_score=8, details={}),
|
||||
DependencyStatus(name="C", status="unavailable", sovereignty_score=6, details={}),
|
||||
]
|
||||
assert _calculate_overall_score(deps) == 8.0
|
||||
|
||||
def test_rounding(self):
|
||||
deps = [
|
||||
DependencyStatus(name="A", status="healthy", sovereignty_score=10, details={}),
|
||||
DependencyStatus(name="B", status="healthy", sovereignty_score=9, details={}),
|
||||
DependencyStatus(name="C", status="healthy", sovereignty_score=10, details={}),
|
||||
]
|
||||
assert _calculate_overall_score(deps) == 9.7
|
||||
|
||||
|
||||
class TestGenerateRecommendations:
|
||||
"""Test _generate_recommendations."""
|
||||
|
||||
def test_all_healthy(self):
|
||||
deps = [DependencyStatus(name="X", status="healthy", sovereignty_score=10, details={})]
|
||||
recs = _generate_recommendations(deps)
|
||||
assert recs == ["System operating optimally - all dependencies healthy"]
|
||||
|
||||
def test_unavailable_service(self):
|
||||
deps = [
|
||||
DependencyStatus(name="Ollama AI", status="unavailable", sovereignty_score=10, details={})
|
||||
]
|
||||
recs = _generate_recommendations(deps)
|
||||
assert any("Ollama AI is unavailable" in r for r in recs)
|
||||
|
||||
def test_degraded_lightning_mock(self):
|
||||
deps = [
|
||||
DependencyStatus(
|
||||
name="Lightning Payments",
|
||||
status="degraded",
|
||||
sovereignty_score=8,
|
||||
details={"backend": "mock"},
|
||||
)
|
||||
]
|
||||
recs = _generate_recommendations(deps)
|
||||
assert any("Switch to real Lightning" in r for r in recs)
|
||||
|
||||
def test_degraded_non_lightning(self):
|
||||
"""Degraded non-Lightning dep produces no specific recommendation."""
|
||||
deps = [
|
||||
DependencyStatus(name="Redis", status="degraded", sovereignty_score=5, details={})
|
||||
]
|
||||
recs = _generate_recommendations(deps)
|
||||
assert recs == ["System operating optimally - all dependencies healthy"]
|
||||
|
||||
def test_multiple_unavailable(self):
|
||||
deps = [
|
||||
DependencyStatus(name="A", status="unavailable", sovereignty_score=5, details={}),
|
||||
DependencyStatus(name="B", status="unavailable", sovereignty_score=5, details={}),
|
||||
]
|
||||
recs = _generate_recommendations(deps)
|
||||
assert len(recs) == 2
|
||||
assert "A is unavailable" in recs[0]
|
||||
assert "B is unavailable" in recs[1]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _check_lightning (static)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCheckLightning:
|
||||
"""Test _check_lightning — always returns unavailable for now."""
|
||||
|
||||
def test_returns_unavailable(self):
|
||||
dep = _check_lightning()
|
||||
assert dep.name == "Lightning Payments"
|
||||
assert dep.status == "unavailable"
|
||||
assert dep.sovereignty_score == 8
|
||||
assert "removed" in dep.details.get("note", "").lower()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _check_ollama_sync
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCheckOllamaSync:
|
||||
"""Test synchronous Ollama health probe."""
|
||||
|
||||
def test_healthy_when_reachable(self):
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status = 200
|
||||
mock_resp.__enter__ = MagicMock(return_value=mock_resp)
|
||||
mock_resp.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
with patch("urllib.request.urlopen", return_value=mock_resp):
|
||||
dep = _check_ollama_sync()
|
||||
|
||||
assert dep.status == "healthy"
|
||||
assert dep.name == "Ollama AI"
|
||||
assert dep.sovereignty_score == 10
|
||||
|
||||
def test_unavailable_on_connection_error(self):
|
||||
with patch(
|
||||
"urllib.request.urlopen",
|
||||
side_effect=ConnectionError("refused"),
|
||||
):
|
||||
dep = _check_ollama_sync()
|
||||
|
||||
assert dep.status == "unavailable"
|
||||
assert "Cannot connect" in dep.details.get("error", "")
|
||||
|
||||
def test_unavailable_on_timeout(self):
|
||||
from urllib.error import URLError
|
||||
|
||||
with patch(
|
||||
"urllib.request.urlopen",
|
||||
side_effect=URLError("timeout"),
|
||||
):
|
||||
dep = _check_ollama_sync()
|
||||
|
||||
assert dep.status == "unavailable"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _check_sqlite
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCheckSQLite:
|
||||
"""Test SQLite health probe."""
|
||||
|
||||
def test_healthy_when_db_reachable(self, tmp_path):
|
||||
import sqlite3
|
||||
|
||||
db_path = tmp_path / "data" / "timmy.db"
|
||||
db_path.parent.mkdir(parents=True)
|
||||
sqlite3.connect(str(db_path)).close()
|
||||
|
||||
with patch("dashboard.routes.health.settings") as mock_settings:
|
||||
mock_settings.repo_root = str(tmp_path)
|
||||
dep = _check_sqlite()
|
||||
|
||||
assert dep.status == "healthy"
|
||||
assert dep.name == "SQLite Database"
|
||||
|
||||
def test_unavailable_on_missing_db(self, tmp_path):
|
||||
with patch("dashboard.routes.health.settings") as mock_settings:
|
||||
mock_settings.repo_root = str(tmp_path / "nonexistent")
|
||||
dep = _check_sqlite()
|
||||
|
||||
assert dep.status == "unavailable"
|
||||
assert "error" in dep.details
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _check_ollama (async, with caching)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCheckOllamaAsync:
|
||||
"""Test async Ollama check with TTL cache."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_cache(self):
|
||||
"""Clear the module-level Ollama cache before each test."""
|
||||
import dashboard.routes.health as mod
|
||||
|
||||
mod._ollama_cache = None
|
||||
mod._ollama_cache_ts = 0.0
|
||||
yield
|
||||
mod._ollama_cache = None
|
||||
mod._ollama_cache_ts = 0.0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_dependency_status(self):
|
||||
healthy = DependencyStatus(
|
||||
name="Ollama AI", status="healthy", sovereignty_score=10, details={}
|
||||
)
|
||||
with patch(
|
||||
"dashboard.routes.health._check_ollama_sync",
|
||||
return_value=healthy,
|
||||
):
|
||||
from dashboard.routes.health import _check_ollama
|
||||
|
||||
result = await _check_ollama()
|
||||
|
||||
assert result.status == "healthy"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_caches_result(self):
|
||||
healthy = DependencyStatus(
|
||||
name="Ollama AI", status="healthy", sovereignty_score=10, details={}
|
||||
)
|
||||
with patch(
|
||||
"dashboard.routes.health._check_ollama_sync",
|
||||
return_value=healthy,
|
||||
) as mock_sync:
|
||||
from dashboard.routes.health import _check_ollama
|
||||
|
||||
await _check_ollama()
|
||||
await _check_ollama()
|
||||
|
||||
# Should only call the sync function once due to cache
|
||||
assert mock_sync.call_count == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cache_expires(self):
|
||||
healthy = DependencyStatus(
|
||||
name="Ollama AI", status="healthy", sovereignty_score=10, details={}
|
||||
)
|
||||
import dashboard.routes.health as mod
|
||||
|
||||
with patch(
|
||||
"dashboard.routes.health._check_ollama_sync",
|
||||
return_value=healthy,
|
||||
) as mock_sync:
|
||||
from dashboard.routes.health import _check_ollama
|
||||
|
||||
await _check_ollama()
|
||||
# Expire the cache
|
||||
mod._ollama_cache_ts = time.monotonic() - 60
|
||||
await _check_ollama()
|
||||
|
||||
assert mock_sync.call_count == 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fallback_on_thread_exception(self):
|
||||
"""If to_thread raises, return unavailable status."""
|
||||
import asyncio
|
||||
|
||||
with patch.object(
|
||||
asyncio,
|
||||
"to_thread",
|
||||
side_effect=RuntimeError("thread pool exhausted"),
|
||||
):
|
||||
from dashboard.routes.health import _check_ollama
|
||||
|
||||
result = await _check_ollama()
|
||||
|
||||
assert result.status == "unavailable"
|
||||
|
||||
|
||||
class TestCheckOllamaBool:
|
||||
"""Test the legacy bool wrapper."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_cache(self):
|
||||
import dashboard.routes.health as mod
|
||||
|
||||
mod._ollama_cache = None
|
||||
mod._ollama_cache_ts = 0.0
|
||||
yield
|
||||
mod._ollama_cache = None
|
||||
mod._ollama_cache_ts = 0.0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_true_when_healthy(self):
|
||||
healthy = DependencyStatus(
|
||||
name="Ollama AI", status="healthy", sovereignty_score=10, details={}
|
||||
)
|
||||
with patch("dashboard.routes.health._check_ollama_sync", return_value=healthy):
|
||||
from dashboard.routes.health import check_ollama
|
||||
|
||||
assert await check_ollama() is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_false_when_unavailable(self):
|
||||
down = DependencyStatus(
|
||||
name="Ollama AI", status="unavailable", sovereignty_score=10, details={}
|
||||
)
|
||||
with patch("dashboard.routes.health._check_ollama_sync", return_value=down):
|
||||
from dashboard.routes.health import check_ollama
|
||||
|
||||
assert await check_ollama() is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Endpoint tests via FastAPI TestClient
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestHealthEndpoint:
|
||||
"""Tests for GET /health."""
|
||||
|
||||
def test_returns_200(self, client):
|
||||
response = client.get("/health")
|
||||
assert response.status_code == 200
|
||||
|
||||
def test_ok_when_ollama_up(self, client):
|
||||
with patch("dashboard.routes.health.check_ollama", new_callable=AsyncMock, return_value=True):
|
||||
data = client.get("/health").json()
|
||||
|
||||
assert data["status"] == "ok"
|
||||
assert data["services"]["ollama"] == "up"
|
||||
assert data["agents"]["agent"]["status"] == "idle"
|
||||
|
||||
def test_degraded_when_ollama_down(self, client):
|
||||
with patch(
|
||||
"dashboard.routes.health.check_ollama", new_callable=AsyncMock, return_value=False
|
||||
):
|
||||
data = client.get("/health").json()
|
||||
|
||||
assert data["status"] == "degraded"
|
||||
assert data["services"]["ollama"] == "down"
|
||||
assert data["agents"]["agent"]["status"] == "offline"
|
||||
|
||||
def test_extended_fields(self, client):
|
||||
data = client.get("/health").json()
|
||||
assert "timestamp" in data
|
||||
assert "version" in data
|
||||
assert "uptime_seconds" in data
|
||||
assert isinstance(data["uptime_seconds"], (int, float))
|
||||
assert "llm_backend" in data
|
||||
assert "llm_model" in data
|
||||
|
||||
|
||||
class TestHealthStatusPanel:
|
||||
"""Tests for GET /health/status (HTML response)."""
|
||||
|
||||
def test_returns_html(self, client):
|
||||
response = client.get("/health/status")
|
||||
assert response.status_code == 200
|
||||
assert "text/html" in response.headers["content-type"]
|
||||
|
||||
def test_shows_up_when_ollama_healthy(self, client):
|
||||
with patch("dashboard.routes.health.check_ollama", new_callable=AsyncMock, return_value=True):
|
||||
text = client.get("/health/status").text
|
||||
|
||||
assert "UP" in text
|
||||
|
||||
def test_shows_down_when_ollama_unhealthy(self, client):
|
||||
with patch(
|
||||
"dashboard.routes.health.check_ollama", new_callable=AsyncMock, return_value=False
|
||||
):
|
||||
text = client.get("/health/status").text
|
||||
|
||||
assert "DOWN" in text
|
||||
|
||||
def test_includes_model_name(self, client):
|
||||
text = client.get("/health/status").text
|
||||
assert "Model:" in text
|
||||
|
||||
|
||||
class TestSovereigntyEndpoint:
|
||||
"""Tests for GET /health/sovereignty."""
|
||||
|
||||
def test_aggregates_three_subsystems(self, client):
|
||||
data = client.get("/health/sovereignty").json()
|
||||
names = [d["name"] for d in data["dependencies"]]
|
||||
assert "Ollama AI" in names
|
||||
assert "Lightning Payments" in names
|
||||
assert "SQLite Database" in names
|
||||
|
||||
def test_score_range(self, client):
|
||||
data = client.get("/health/sovereignty").json()
|
||||
assert 0 <= data["overall_score"] <= 10
|
||||
|
||||
|
||||
class TestComponentsEndpoint:
|
||||
"""Tests for GET /health/components."""
|
||||
|
||||
def test_returns_timestamp(self, client):
|
||||
data = client.get("/health/components").json()
|
||||
assert "timestamp" in data
|
||||
|
||||
def test_config_keys(self, client):
|
||||
data = client.get("/health/components").json()
|
||||
cfg = data["config"]
|
||||
assert "debug" in cfg
|
||||
assert "model_backend" in cfg
|
||||
assert "ollama_model" in cfg
|
||||
|
||||
|
||||
class TestSnapshotEndpoint:
|
||||
"""Tests for GET /health/snapshot."""
|
||||
|
||||
def test_returns_200(self, client):
|
||||
response = client.get("/health/snapshot")
|
||||
assert response.status_code == 200
|
||||
|
||||
def test_overall_status_valid(self, client):
|
||||
data = client.get("/health/snapshot").json()
|
||||
assert data["overall_status"] in ["green", "yellow", "red", "unknown"]
|
||||
|
||||
def test_graceful_fallback_on_import_error(self, client):
|
||||
"""Snapshot degrades gracefully when automation module fails."""
|
||||
with patch(
|
||||
"dashboard.routes.health.asyncio.to_thread",
|
||||
side_effect=ImportError("no module"),
|
||||
):
|
||||
data = client.get("/health/snapshot").json()
|
||||
|
||||
assert data["overall_status"] == "unknown"
|
||||
assert "error" in data
|
||||
assert data["ci"]["status"] == "unknown"
|
||||
|
||||
def test_graceful_fallback_on_runtime_error(self, client):
|
||||
with patch(
|
||||
"dashboard.routes.health.asyncio.to_thread",
|
||||
side_effect=RuntimeError("boom"),
|
||||
):
|
||||
data = client.get("/health/snapshot").json()
|
||||
|
||||
assert data["overall_status"] == "unknown"
|
||||
@@ -3,99 +3,6 @@
|
||||
Verifies task CRUD operations and the dashboard page rendering.
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DB error handling tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_DB_ERROR = sqlite3.OperationalError("database is locked")
|
||||
|
||||
|
||||
def test_tasks_page_degrades_on_db_error(client):
|
||||
"""GET /tasks renders empty columns when DB is unavailable."""
|
||||
with patch(
|
||||
"dashboard.routes.tasks._get_db",
|
||||
side_effect=_DB_ERROR,
|
||||
):
|
||||
response = client.get("/tasks")
|
||||
assert response.status_code == 200
|
||||
assert "TASK QUEUE" in response.text
|
||||
|
||||
|
||||
def test_pending_partial_degrades_on_db_error(client):
|
||||
"""GET /tasks/pending returns fallback HTML when DB is unavailable."""
|
||||
with patch(
|
||||
"dashboard.routes.tasks._get_db",
|
||||
side_effect=_DB_ERROR,
|
||||
):
|
||||
response = client.get("/tasks/pending")
|
||||
assert response.status_code == 200
|
||||
assert "Database unavailable" in response.text
|
||||
|
||||
|
||||
def test_active_partial_degrades_on_db_error(client):
|
||||
"""GET /tasks/active returns fallback HTML when DB is unavailable."""
|
||||
with patch(
|
||||
"dashboard.routes.tasks._get_db",
|
||||
side_effect=_DB_ERROR,
|
||||
):
|
||||
response = client.get("/tasks/active")
|
||||
assert response.status_code == 200
|
||||
assert "Database unavailable" in response.text
|
||||
|
||||
|
||||
def test_completed_partial_degrades_on_db_error(client):
|
||||
"""GET /tasks/completed returns fallback HTML when DB is unavailable."""
|
||||
with patch(
|
||||
"dashboard.routes.tasks._get_db",
|
||||
side_effect=_DB_ERROR,
|
||||
):
|
||||
response = client.get("/tasks/completed")
|
||||
assert response.status_code == 200
|
||||
assert "Database unavailable" in response.text
|
||||
|
||||
|
||||
def test_api_create_task_503_on_db_error(client):
|
||||
"""POST /api/tasks returns 503 when DB is unavailable."""
|
||||
with patch(
|
||||
"dashboard.routes.tasks._get_db",
|
||||
side_effect=_DB_ERROR,
|
||||
):
|
||||
response = client.post("/api/tasks", json={"title": "Test"})
|
||||
assert response.status_code == 503
|
||||
|
||||
|
||||
def test_api_list_tasks_empty_on_db_error(client):
|
||||
"""GET /api/tasks returns empty list when DB is unavailable."""
|
||||
with patch(
|
||||
"dashboard.routes.tasks._get_db",
|
||||
side_effect=_DB_ERROR,
|
||||
):
|
||||
response = client.get("/api/tasks")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == []
|
||||
|
||||
|
||||
def test_queue_status_degrades_on_db_error(client):
|
||||
"""GET /api/queue/status returns idle status when DB is unavailable."""
|
||||
with patch(
|
||||
"dashboard.routes.tasks._get_db",
|
||||
side_effect=_DB_ERROR,
|
||||
):
|
||||
response = client.get("/api/queue/status")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["is_working"] is False
|
||||
assert data["current_task"] is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Existing tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_tasks_page_returns_200(client):
|
||||
response = client.get("/tasks")
|
||||
|
||||
139
tests/infrastructure/test_claude_quota.py
Normal file
139
tests/infrastructure/test_claude_quota.py
Normal file
@@ -0,0 +1,139 @@
|
||||
"""Tests for the Claude quota tracker and metabolic mode advisor.
|
||||
|
||||
Refs: #1074
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from infrastructure.claude_quota import (
|
||||
ACTIVE_THRESHOLD,
|
||||
BURST_THRESHOLD,
|
||||
ClaudeCall,
|
||||
ClaudeQuotaStore,
|
||||
MetabolicMode,
|
||||
_mode_for_cost,
|
||||
current_mode,
|
||||
quota_report,
|
||||
record_usage,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def store(tmp_path):
|
||||
"""Fresh quota store backed by a temp DB."""
|
||||
return ClaudeQuotaStore(db_path=tmp_path / "test_quota.db")
|
||||
|
||||
|
||||
# ── Unit: cost calculation ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestClaudeCallCost:
|
||||
def test_haiku_cost(self):
|
||||
call = ClaudeCall(model="haiku", input_tokens=1_000_000, output_tokens=0)
|
||||
assert call.cost_usd == pytest.approx(0.25)
|
||||
|
||||
def test_sonnet_output_cost(self):
|
||||
call = ClaudeCall(model="sonnet", input_tokens=0, output_tokens=1_000_000)
|
||||
assert call.cost_usd == pytest.approx(15.00)
|
||||
|
||||
def test_opus_combined_cost(self):
|
||||
call = ClaudeCall(model="opus", input_tokens=100_000, output_tokens=50_000)
|
||||
# input: 100k * 15/1M = 1.50, output: 50k * 75/1M = 3.75 → 5.25
|
||||
assert call.cost_usd == pytest.approx(5.25)
|
||||
|
||||
def test_unknown_model_uses_default(self):
|
||||
call = ClaudeCall(model="unknown-model-xyz", input_tokens=1_000_000, output_tokens=0)
|
||||
assert call.cost_usd == pytest.approx(3.00) # default input cost
|
||||
|
||||
def test_zero_tokens_zero_cost(self):
|
||||
call = ClaudeCall(model="haiku", input_tokens=0, output_tokens=0)
|
||||
assert call.cost_usd == 0.0
|
||||
|
||||
|
||||
# ── Unit: metabolic mode thresholds ──────────────────────────────────────────
|
||||
|
||||
|
||||
class TestMetabolicMode:
|
||||
def test_under_burst_threshold(self):
|
||||
assert _mode_for_cost(0.0) == "BURST"
|
||||
assert _mode_for_cost(BURST_THRESHOLD - 0.01) == "BURST"
|
||||
|
||||
def test_at_burst_threshold_is_active(self):
|
||||
assert _mode_for_cost(BURST_THRESHOLD) == "ACTIVE"
|
||||
|
||||
def test_between_thresholds(self):
|
||||
mid = (BURST_THRESHOLD + ACTIVE_THRESHOLD) / 2
|
||||
assert _mode_for_cost(mid) == "ACTIVE"
|
||||
|
||||
def test_at_active_threshold_is_resting(self):
|
||||
assert _mode_for_cost(ACTIVE_THRESHOLD) == "RESTING"
|
||||
|
||||
def test_over_active_threshold(self):
|
||||
assert _mode_for_cost(ACTIVE_THRESHOLD + 10) == "RESTING"
|
||||
|
||||
|
||||
# ── Store: record and query ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestClaudeQuotaStore:
|
||||
def test_record_call(self, store):
|
||||
call = ClaudeCall(model="haiku", input_tokens=1000, output_tokens=500)
|
||||
store.record_call(call)
|
||||
summary = store.today_summary()
|
||||
assert summary.calls == 1
|
||||
assert summary.input_tokens == 1000
|
||||
assert summary.output_tokens == 500
|
||||
assert summary.cost_usd > 0
|
||||
|
||||
def test_today_summary_empty_db(self, store):
|
||||
summary = store.today_summary()
|
||||
assert summary.calls == 0
|
||||
assert summary.cost_usd == 0.0
|
||||
assert summary.mode == "BURST"
|
||||
|
||||
def test_month_summary_aggregates_multiple_calls(self, store):
|
||||
for _ in range(5):
|
||||
store.record_call(ClaudeCall(model="haiku", input_tokens=100, output_tokens=50))
|
||||
month = store.month_summary()
|
||||
assert month.calls == 5
|
||||
assert month.input_tokens == 500
|
||||
assert month.output_tokens == 250
|
||||
|
||||
def test_current_mode_burst_when_empty(self, store):
|
||||
assert store.current_mode() == "BURST"
|
||||
|
||||
def test_current_mode_resting_when_expensive(self, store):
|
||||
# Record enough usage to push past ACTIVE_THRESHOLD
|
||||
# ACTIVE_THRESHOLD = 5.00, opus input = 15/1M
|
||||
# Need >5.00: 5.00/15 * 1M ≈ 333_334 input tokens
|
||||
store.record_call(
|
||||
ClaudeCall(model="opus", input_tokens=400_000, output_tokens=0)
|
||||
)
|
||||
mode = store.current_mode()
|
||||
assert mode == "RESTING"
|
||||
|
||||
def test_summary_as_dict(self, store):
|
||||
summary = store.today_summary()
|
||||
d = summary.as_dict()
|
||||
assert "period" in d
|
||||
assert "calls" in d
|
||||
assert "cost_usd" in d
|
||||
assert "mode" in d
|
||||
|
||||
|
||||
# ── Convenience functions ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestConvenienceFunctions:
|
||||
def test_record_usage_does_not_raise(self):
|
||||
# Uses module-level store; should not raise even if DB path issues
|
||||
record_usage(model="haiku", input_tokens=10, output_tokens=5, task_label="test")
|
||||
|
||||
def test_current_mode_returns_valid_mode(self):
|
||||
mode = current_mode()
|
||||
assert mode in ("BURST", "ACTIVE", "RESTING")
|
||||
|
||||
def test_quota_report_returns_string(self):
|
||||
report = quota_report()
|
||||
assert isinstance(report, str)
|
||||
assert "BURST" in report or "ACTIVE" in report or "RESTING" in report
|
||||
332
tests/infrastructure/test_moderation.py
Normal file
332
tests/infrastructure/test_moderation.py
Normal file
@@ -0,0 +1,332 @@
|
||||
"""Tests for the content moderation pipeline."""
|
||||
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from infrastructure.guards.moderation import (
|
||||
ContentModerator,
|
||||
GameProfile,
|
||||
ModerationResult,
|
||||
ModerationVerdict,
|
||||
ViolationCategory,
|
||||
_parse_guard_category,
|
||||
get_moderator,
|
||||
)
|
||||
|
||||
# ── Unit tests for data types ────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestModerationResult:
|
||||
"""Test ModerationResult dataclass."""
|
||||
|
||||
def test_passed_property_true(self):
|
||||
result = ModerationResult(verdict=ModerationVerdict.PASS, blocked=False)
|
||||
assert result.passed is True
|
||||
|
||||
def test_passed_property_false(self):
|
||||
result = ModerationResult(verdict=ModerationVerdict.FAIL, blocked=True)
|
||||
assert result.passed is False
|
||||
|
||||
def test_default_values(self):
|
||||
result = ModerationResult(verdict=ModerationVerdict.PASS, blocked=False)
|
||||
assert result.category == ViolationCategory.NONE
|
||||
assert result.confidence == 0.0
|
||||
assert result.fallback == ""
|
||||
assert result.reason == ""
|
||||
|
||||
|
||||
class TestGameProfile:
|
||||
"""Test GameProfile dataclass."""
|
||||
|
||||
def test_default_values(self):
|
||||
profile = GameProfile(game_id="test", display_name="Test Game")
|
||||
assert profile.vocabulary_whitelist == []
|
||||
assert profile.threshold == 0.8
|
||||
assert profile.fallbacks == {}
|
||||
|
||||
def test_morrowind_profile(self):
|
||||
profile = GameProfile(
|
||||
game_id="morrowind",
|
||||
display_name="Morrowind",
|
||||
vocabulary_whitelist=["Skooma", "slave"],
|
||||
threshold=0.85,
|
||||
)
|
||||
assert "Skooma" in profile.vocabulary_whitelist
|
||||
assert profile.threshold == 0.85
|
||||
|
||||
|
||||
class TestParseGuardCategory:
|
||||
"""Test Llama Guard category parsing."""
|
||||
|
||||
def test_hate_speech(self):
|
||||
assert _parse_guard_category("S1: Hate speech") == ViolationCategory.HATE_SPEECH
|
||||
|
||||
def test_violence(self):
|
||||
assert _parse_guard_category("S2: Violence") == ViolationCategory.VIOLENCE_GLORIFICATION
|
||||
|
||||
def test_sexual_content(self):
|
||||
assert _parse_guard_category("S3: Sexual content") == ViolationCategory.SEXUAL_CONTENT
|
||||
|
||||
def test_self_harm(self):
|
||||
assert _parse_guard_category("S4: Self-harm") == ViolationCategory.SELF_HARM
|
||||
|
||||
def test_dangerous(self):
|
||||
assert _parse_guard_category("S5: Dangerous activity") == ViolationCategory.REAL_WORLD_HARM
|
||||
|
||||
def test_unknown_category(self):
|
||||
assert _parse_guard_category("S99: Unknown") == ViolationCategory.NONE
|
||||
|
||||
|
||||
# ── ContentModerator tests ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestContentModerator:
|
||||
"""Test the content moderation pipeline."""
|
||||
|
||||
def _make_moderator(self, **kwargs) -> ContentModerator:
|
||||
"""Create a moderator with test defaults."""
|
||||
profiles = {
|
||||
"morrowind": GameProfile(
|
||||
game_id="morrowind",
|
||||
display_name="Morrowind",
|
||||
vocabulary_whitelist=["Skooma", "Moon Sugar", "slave", "Morag Tong"],
|
||||
context_prompt="Narrate Morrowind gameplay.",
|
||||
threshold=0.85,
|
||||
fallbacks={
|
||||
"combat": "The battle continues.",
|
||||
"default": "The adventure continues.",
|
||||
},
|
||||
),
|
||||
"default": GameProfile(
|
||||
game_id="default",
|
||||
display_name="Generic",
|
||||
vocabulary_whitelist=[],
|
||||
context_prompt="Narrate gameplay.",
|
||||
threshold=0.8,
|
||||
fallbacks={"default": "Gameplay continues."},
|
||||
),
|
||||
}
|
||||
return ContentModerator(profiles=profiles, **kwargs)
|
||||
|
||||
def test_get_profile_known_game(self):
|
||||
mod = self._make_moderator()
|
||||
profile = mod.get_profile("morrowind")
|
||||
assert profile.game_id == "morrowind"
|
||||
|
||||
def test_get_profile_unknown_game_falls_back(self):
|
||||
mod = self._make_moderator()
|
||||
profile = mod.get_profile("unknown_game")
|
||||
assert profile.game_id == "default"
|
||||
|
||||
def test_get_context_prompt(self):
|
||||
mod = self._make_moderator()
|
||||
prompt = mod.get_context_prompt("morrowind")
|
||||
assert "Morrowind" in prompt
|
||||
|
||||
def test_register_profile(self):
|
||||
mod = self._make_moderator()
|
||||
new_profile = GameProfile(game_id="skyrim", display_name="Skyrim")
|
||||
mod.register_profile(new_profile)
|
||||
assert mod.get_profile("skyrim").game_id == "skyrim"
|
||||
|
||||
def test_whitelist_replaces_game_terms(self):
|
||||
mod = self._make_moderator()
|
||||
profile = mod.get_profile("morrowind")
|
||||
cleaned = mod._apply_whitelist(
|
||||
"The merchant sells Skooma and Moon Sugar in the slave market.",
|
||||
profile,
|
||||
)
|
||||
assert "Skooma" not in cleaned
|
||||
assert "Moon Sugar" not in cleaned
|
||||
assert "slave" not in cleaned
|
||||
assert "[GAME_TERM]" in cleaned
|
||||
|
||||
def test_whitelist_case_insensitive(self):
|
||||
mod = self._make_moderator()
|
||||
profile = mod.get_profile("morrowind")
|
||||
cleaned = mod._apply_whitelist("skooma and SKOOMA", profile)
|
||||
assert "skooma" not in cleaned
|
||||
assert "SKOOMA" not in cleaned
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_safe_content_passes(self):
|
||||
"""Safe content should pass moderation."""
|
||||
mod = self._make_moderator()
|
||||
with patch.object(mod, "_is_guard_available", new_callable=AsyncMock, return_value=False):
|
||||
result = await mod.check("The player walks through the town.", game="morrowind")
|
||||
assert result.passed
|
||||
assert not result.blocked
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_blocked_content_has_fallback(self):
|
||||
"""Blocked content should include scene-appropriate fallback."""
|
||||
mod = self._make_moderator()
|
||||
# Force a block via regex by using real-world harm language
|
||||
text = "In real life you should attack and hurt people"
|
||||
with patch.object(mod, "_is_guard_available", new_callable=AsyncMock, return_value=False):
|
||||
result = await mod.check(text, game="morrowind", scene_type="combat")
|
||||
assert result.blocked
|
||||
assert result.fallback == "The battle continues."
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_with_moderation_disabled(self):
|
||||
"""When moderation is disabled, everything passes."""
|
||||
mod = self._make_moderator()
|
||||
with patch("infrastructure.guards.moderation.settings") as mock_settings:
|
||||
mock_settings.moderation_enabled = False
|
||||
mock_settings.moderation_guard_model = "llama-guard3:1b"
|
||||
mock_settings.normalized_ollama_url = "http://127.0.0.1:11434"
|
||||
result = await mod.check("anything goes here")
|
||||
assert result.passed
|
||||
assert result.layer == "disabled"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_threshold_below_allows_content(self):
|
||||
"""Content flagged below threshold should pass through (Layer 3)."""
|
||||
mod = self._make_moderator()
|
||||
# Mock the guard to return a low-confidence flag
|
||||
low_conf_result = ModerationResult(
|
||||
verdict=ModerationVerdict.FAIL,
|
||||
blocked=True,
|
||||
confidence=0.5, # Below morrowind threshold of 0.85
|
||||
layer="llama_guard",
|
||||
category=ViolationCategory.VIOLENCE_GLORIFICATION,
|
||||
)
|
||||
with patch.object(mod, "_run_guard", new_callable=AsyncMock, return_value=low_conf_result):
|
||||
result = await mod.check("sword fight scene", game="morrowind")
|
||||
assert result.passed
|
||||
assert not result.blocked
|
||||
assert result.layer == "threshold"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_threshold_above_blocks_content(self):
|
||||
"""Content flagged above threshold should remain blocked."""
|
||||
mod = self._make_moderator()
|
||||
high_conf_result = ModerationResult(
|
||||
verdict=ModerationVerdict.FAIL,
|
||||
blocked=True,
|
||||
confidence=0.95, # Above morrowind threshold of 0.85
|
||||
layer="llama_guard",
|
||||
category=ViolationCategory.REAL_WORLD_HARM,
|
||||
)
|
||||
with patch.object(mod, "_run_guard", new_callable=AsyncMock, return_value=high_conf_result):
|
||||
result = await mod.check("harmful content", game="morrowind")
|
||||
assert result.blocked
|
||||
|
||||
def test_regex_catches_real_world_harm(self):
|
||||
"""Regex fallback should catch obvious real-world harm patterns."""
|
||||
mod = self._make_moderator()
|
||||
result = mod._check_with_regex("you should actually harm real people")
|
||||
assert result.blocked
|
||||
assert result.category == ViolationCategory.REAL_WORLD_HARM
|
||||
assert result.layer == "regex_fallback"
|
||||
|
||||
def test_regex_passes_game_violence(self):
|
||||
"""Regex should not flag in-game violence narration."""
|
||||
mod = self._make_moderator()
|
||||
result = mod._check_with_regex("The warrior slays the dragon with a mighty blow.")
|
||||
assert result.passed
|
||||
|
||||
def test_regex_passes_normal_narration(self):
|
||||
"""Normal narration should pass regex checks."""
|
||||
mod = self._make_moderator()
|
||||
result = mod._check_with_regex(
|
||||
"The Nerevarine enters the city of Balmora and speaks with Caius Cosades."
|
||||
)
|
||||
assert result.passed
|
||||
|
||||
def test_metrics_tracking(self):
|
||||
"""Metrics should track checks accurately."""
|
||||
mod = self._make_moderator()
|
||||
assert mod.get_metrics()["total_checks"] == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_metrics_increment_after_check(self):
|
||||
"""Metrics should increment after moderation checks."""
|
||||
mod = self._make_moderator()
|
||||
with patch.object(mod, "_is_guard_available", new_callable=AsyncMock, return_value=False):
|
||||
await mod.check("safe text", game="default")
|
||||
metrics = mod.get_metrics()
|
||||
assert metrics["total_checks"] == 1
|
||||
assert metrics["passed"] == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_guard_fallback_on_error(self):
|
||||
"""Should fall back to regex when guard model errors."""
|
||||
mod = self._make_moderator()
|
||||
with (
|
||||
patch.object(mod, "_is_guard_available", new_callable=AsyncMock, return_value=True),
|
||||
patch.object(
|
||||
mod,
|
||||
"_check_with_guard",
|
||||
new_callable=AsyncMock,
|
||||
side_effect=RuntimeError("timeout"),
|
||||
),
|
||||
):
|
||||
result = await mod.check("safe text", game="default")
|
||||
# Should fall back to regex and pass
|
||||
assert result.passed
|
||||
assert result.layer == "regex_fallback"
|
||||
|
||||
|
||||
class TestGetModerator:
|
||||
"""Test the singleton accessor."""
|
||||
|
||||
def test_returns_same_instance(self):
|
||||
"""get_moderator should return the same instance."""
|
||||
# Reset the global to test fresh
|
||||
import infrastructure.guards.moderation as mod_module
|
||||
|
||||
mod_module._moderator = None
|
||||
m1 = get_moderator()
|
||||
m2 = get_moderator()
|
||||
assert m1 is m2
|
||||
# Clean up
|
||||
mod_module._moderator = None
|
||||
|
||||
|
||||
# ── Profile loader tests ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestProfileLoader:
|
||||
"""Test YAML profile loading."""
|
||||
|
||||
def test_load_missing_file_returns_empty(self, tmp_path):
|
||||
from infrastructure.guards.profiles import load_profiles
|
||||
|
||||
result = load_profiles(tmp_path / "nonexistent.yaml")
|
||||
assert result == {}
|
||||
|
||||
def test_load_valid_config(self, tmp_path):
|
||||
import yaml
|
||||
|
||||
from infrastructure.guards.profiles import load_profiles
|
||||
|
||||
config = {
|
||||
"profiles": {
|
||||
"testgame": {
|
||||
"display_name": "Test Game",
|
||||
"threshold": 0.9,
|
||||
"vocabulary_whitelist": ["sword", "potion"],
|
||||
"context_prompt": "Narrate test game.",
|
||||
"fallbacks": {"default": "Game continues."},
|
||||
}
|
||||
}
|
||||
}
|
||||
config_file = tmp_path / "moderation.yaml"
|
||||
config_file.write_text(yaml.dump(config))
|
||||
|
||||
profiles = load_profiles(config_file)
|
||||
assert "testgame" in profiles
|
||||
assert profiles["testgame"].threshold == 0.9
|
||||
assert "sword" in profiles["testgame"].vocabulary_whitelist
|
||||
|
||||
def test_load_malformed_yaml_returns_empty(self, tmp_path):
|
||||
from infrastructure.guards.profiles import load_profiles
|
||||
|
||||
config_file = tmp_path / "moderation.yaml"
|
||||
config_file.write_text("{{{{invalid yaml")
|
||||
|
||||
result = load_profiles(config_file)
|
||||
assert result == {}
|
||||
183
tests/infrastructure/test_sovereignty_metrics.py
Normal file
183
tests/infrastructure/test_sovereignty_metrics.py
Normal file
@@ -0,0 +1,183 @@
|
||||
"""Tests for the sovereignty metrics store and API routes.
|
||||
|
||||
Refs: #981
|
||||
"""
|
||||
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from infrastructure.sovereignty_metrics import (
|
||||
GRADUATION_TARGETS,
|
||||
SovereigntyMetric,
|
||||
SovereigntyMetricsStore,
|
||||
emit_sovereignty_metric,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def store(tmp_path):
|
||||
"""Create a fresh sovereignty metrics store with a temp DB."""
|
||||
return SovereigntyMetricsStore(db_path=tmp_path / "test_sov.db")
|
||||
|
||||
|
||||
class TestSovereigntyMetricsStore:
|
||||
def test_record_and_get_latest(self, store):
|
||||
metric = SovereigntyMetric(metric_type="cache_hit_rate", value=0.42)
|
||||
store.record(metric)
|
||||
|
||||
results = store.get_latest("cache_hit_rate", limit=10)
|
||||
assert len(results) == 1
|
||||
assert results[0]["value"] == 0.42
|
||||
|
||||
def test_get_latest_returns_most_recent_first(self, store):
|
||||
for val in [0.1, 0.2, 0.3]:
|
||||
store.record(SovereigntyMetric(metric_type="cache_hit_rate", value=val))
|
||||
|
||||
results = store.get_latest("cache_hit_rate", limit=10)
|
||||
assert len(results) == 3
|
||||
assert results[0]["value"] == 0.3 # most recent first
|
||||
|
||||
def test_get_latest_respects_limit(self, store):
|
||||
for i in range(10):
|
||||
store.record(SovereigntyMetric(metric_type="api_cost", value=float(i)))
|
||||
|
||||
results = store.get_latest("api_cost", limit=3)
|
||||
assert len(results) == 3
|
||||
|
||||
def test_get_latest_filters_by_type(self, store):
|
||||
store.record(SovereigntyMetric(metric_type="cache_hit_rate", value=0.5))
|
||||
store.record(SovereigntyMetric(metric_type="api_cost", value=1.20))
|
||||
|
||||
results = store.get_latest("cache_hit_rate")
|
||||
assert len(results) == 1
|
||||
assert results[0]["value"] == 0.5
|
||||
|
||||
def test_get_summary_empty(self, store):
|
||||
summary = store.get_summary()
|
||||
assert "cache_hit_rate" in summary
|
||||
assert summary["cache_hit_rate"]["current"] is None
|
||||
assert summary["cache_hit_rate"]["phase"] == "pre-start"
|
||||
|
||||
def test_get_summary_with_data(self, store):
|
||||
store.record(SovereigntyMetric(metric_type="cache_hit_rate", value=0.85))
|
||||
store.record(SovereigntyMetric(metric_type="api_cost", value=0.08))
|
||||
|
||||
summary = store.get_summary()
|
||||
assert summary["cache_hit_rate"]["current"] == 0.85
|
||||
assert summary["cache_hit_rate"]["phase"] == "month3"
|
||||
assert summary["api_cost"]["current"] == 0.08
|
||||
assert summary["api_cost"]["phase"] == "month3"
|
||||
|
||||
def test_get_summary_graduation(self, store):
|
||||
store.record(SovereigntyMetric(metric_type="cache_hit_rate", value=0.95))
|
||||
summary = store.get_summary()
|
||||
assert summary["cache_hit_rate"]["phase"] == "graduated"
|
||||
|
||||
def test_alert_on_high_api_cost(self, store):
|
||||
"""API cost above threshold triggers an alert."""
|
||||
with patch("infrastructure.sovereignty_metrics.settings") as mock_settings:
|
||||
mock_settings.sovereignty_api_cost_alert_threshold = 1.00
|
||||
mock_settings.db_busy_timeout_ms = 5000
|
||||
store.record(SovereigntyMetric(metric_type="api_cost", value=2.50))
|
||||
|
||||
alerts = store.get_alerts(unacknowledged_only=True)
|
||||
assert len(alerts) == 1
|
||||
assert alerts[0]["alert_type"] == "api_cost_exceeded"
|
||||
assert alerts[0]["value"] == 2.50
|
||||
|
||||
def test_no_alert_below_threshold(self, store):
|
||||
"""API cost below threshold does not trigger an alert."""
|
||||
with patch("infrastructure.sovereignty_metrics.settings") as mock_settings:
|
||||
mock_settings.sovereignty_api_cost_alert_threshold = 1.00
|
||||
mock_settings.db_busy_timeout_ms = 5000
|
||||
store.record(SovereigntyMetric(metric_type="api_cost", value=0.50))
|
||||
|
||||
alerts = store.get_alerts(unacknowledged_only=True)
|
||||
assert len(alerts) == 0
|
||||
|
||||
def test_acknowledge_alert(self, store):
|
||||
with patch("infrastructure.sovereignty_metrics.settings") as mock_settings:
|
||||
mock_settings.sovereignty_api_cost_alert_threshold = 0.50
|
||||
mock_settings.db_busy_timeout_ms = 5000
|
||||
store.record(SovereigntyMetric(metric_type="api_cost", value=1.00))
|
||||
|
||||
alerts = store.get_alerts(unacknowledged_only=True)
|
||||
assert len(alerts) == 1
|
||||
|
||||
store.acknowledge_alert(alerts[0]["id"])
|
||||
assert len(store.get_alerts(unacknowledged_only=True)) == 0
|
||||
assert len(store.get_alerts(unacknowledged_only=False)) == 1
|
||||
|
||||
def test_metadata_preserved(self, store):
|
||||
store.record(
|
||||
SovereigntyMetric(
|
||||
metric_type="cache_hit_rate",
|
||||
value=0.5,
|
||||
metadata={"source": "research_orchestrator"},
|
||||
)
|
||||
)
|
||||
results = store.get_latest("cache_hit_rate")
|
||||
assert results[0]["metadata"]["source"] == "research_orchestrator"
|
||||
|
||||
def test_summary_trend_data(self, store):
|
||||
for v in [0.1, 0.2, 0.3]:
|
||||
store.record(SovereigntyMetric(metric_type="cache_hit_rate", value=v))
|
||||
|
||||
summary = store.get_summary()
|
||||
trend = summary["cache_hit_rate"]["trend"]
|
||||
assert len(trend) == 3
|
||||
assert trend[0]["v"] == 0.1 # oldest first (reversed)
|
||||
assert trend[-1]["v"] == 0.3
|
||||
|
||||
def test_graduation_targets_complete(self):
|
||||
"""All expected metric types have graduation targets."""
|
||||
expected = {
|
||||
"cache_hit_rate",
|
||||
"api_cost",
|
||||
"time_to_report",
|
||||
"human_involvement",
|
||||
"local_artifacts",
|
||||
}
|
||||
assert set(GRADUATION_TARGETS.keys()) == expected
|
||||
|
||||
|
||||
class TestEmitSovereigntyMetric:
|
||||
@pytest.mark.asyncio
|
||||
async def test_emit_records_and_publishes(self, tmp_path):
|
||||
"""emit_sovereignty_metric records to store and publishes event."""
|
||||
with (
|
||||
patch("infrastructure.sovereignty_metrics._store", None),
|
||||
patch(
|
||||
"infrastructure.sovereignty_metrics.DB_PATH",
|
||||
tmp_path / "emit_test.db",
|
||||
),
|
||||
patch("infrastructure.events.bus.emit", new_callable=AsyncMock) as mock_emit,
|
||||
):
|
||||
await emit_sovereignty_metric("cache_hit_rate", 0.75, {"source": "test"})
|
||||
|
||||
mock_emit.assert_called_once()
|
||||
call_args = mock_emit.call_args
|
||||
assert call_args[0][0] == "sovereignty.metric.cache_hit_rate"
|
||||
|
||||
|
||||
class TestSovereigntyMetricsRoutes:
|
||||
def test_metrics_api_returns_200(self, client):
|
||||
response = client.get("/sovereignty/metrics")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert "metrics" in data
|
||||
assert "alerts" in data
|
||||
assert "targets" in data
|
||||
|
||||
def test_metrics_panel_returns_html(self, client):
|
||||
response = client.get("/sovereignty/metrics/panel")
|
||||
assert response.status_code == 200
|
||||
assert "text/html" in response.headers["content-type"]
|
||||
|
||||
def test_alerts_api_returns_200(self, client):
|
||||
response = client.get("/sovereignty/alerts")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert "alerts" in data
|
||||
assert "unacknowledged" in data
|
||||
394
tests/infrastructure/world/test_benchmark.py
Normal file
394
tests/infrastructure/world/test_benchmark.py
Normal file
@@ -0,0 +1,394 @@
|
||||
"""Tests for the agent performance regression benchmark suite.
|
||||
|
||||
Covers: scenario loading, metrics collection, runner execution,
|
||||
goal predicates, and result persistence.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from infrastructure.world.benchmark.metrics import (
|
||||
BenchmarkMetrics,
|
||||
ScenarioResult,
|
||||
compare_runs,
|
||||
load_history,
|
||||
)
|
||||
from infrastructure.world.benchmark.runner import BenchmarkRunner
|
||||
from infrastructure.world.benchmark.scenarios import (
|
||||
BUILTIN_SCENARIOS,
|
||||
BenchmarkScenario,
|
||||
load_scenarios,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Scenario definitions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBenchmarkScenario:
|
||||
def test_builtin_scenarios_exist(self):
|
||||
assert len(BUILTIN_SCENARIOS) >= 5
|
||||
|
||||
def test_scenario_fields(self):
|
||||
s = BUILTIN_SCENARIOS[0]
|
||||
assert s.name
|
||||
assert s.description
|
||||
assert s.start_location
|
||||
assert s.max_cycles > 0
|
||||
|
||||
def test_load_all_scenarios(self):
|
||||
scenarios = load_scenarios()
|
||||
assert len(scenarios) == len(BUILTIN_SCENARIOS)
|
||||
|
||||
def test_load_scenarios_by_tag(self):
|
||||
nav = load_scenarios(tags=["navigation"])
|
||||
assert len(nav) >= 2
|
||||
for s in nav:
|
||||
assert "navigation" in s.tags
|
||||
|
||||
def test_load_scenarios_no_match(self):
|
||||
result = load_scenarios(tags=["nonexistent_tag"])
|
||||
assert result == []
|
||||
|
||||
def test_scenario_is_frozen(self):
|
||||
s = BUILTIN_SCENARIOS[0]
|
||||
with pytest.raises(AttributeError):
|
||||
s.name = "modified"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Goal predicates
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGoalPredicates:
|
||||
def test_reached_location_predicate(self):
|
||||
s = BUILTIN_SCENARIOS[0] # Walk to Balmora
|
||||
assert s.goal_predicate is not None
|
||||
assert s.goal_predicate([], "Balmora") is True
|
||||
assert s.goal_predicate([], "Seyda Neen") is False
|
||||
|
||||
def test_reached_location_case_insensitive(self):
|
||||
s = BUILTIN_SCENARIOS[0]
|
||||
assert s.goal_predicate([], "balmora") is True
|
||||
assert s.goal_predicate([], "BALMORA") is True
|
||||
|
||||
def test_interacted_with_predicate(self):
|
||||
s = BUILTIN_SCENARIOS[1] # Fargoth quest
|
||||
assert s.goal_predicate is not None
|
||||
actions = [{"action": "speak", "target": "Fargoth"}]
|
||||
assert s.goal_predicate(actions, "Seyda Neen") is True
|
||||
|
||||
def test_interacted_with_no_match(self):
|
||||
s = BUILTIN_SCENARIOS[1]
|
||||
actions = [{"action": "speak", "target": "Guard"}]
|
||||
assert s.goal_predicate(actions, "Seyda Neen") is False
|
||||
|
||||
def test_interacted_with_interact_action(self):
|
||||
s = BUILTIN_SCENARIOS[1]
|
||||
actions = [{"action": "interact", "target": "Fargoth"}]
|
||||
assert s.goal_predicate(actions, "Seyda Neen") is True
|
||||
|
||||
def test_no_predicate_scenario(self):
|
||||
combat = [s for s in BUILTIN_SCENARIOS if "combat" in s.tags][0]
|
||||
assert combat.goal_predicate is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Metrics
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestScenarioResult:
|
||||
def test_default_values(self):
|
||||
r = ScenarioResult(scenario_name="test")
|
||||
assert r.success is False
|
||||
assert r.cycles_used == 0
|
||||
assert r.llm_calls == 0
|
||||
assert r.metabolic_cost == 0.0
|
||||
assert r.error is None
|
||||
|
||||
|
||||
class TestBenchmarkMetrics:
|
||||
def test_empty_metrics(self):
|
||||
m = BenchmarkMetrics()
|
||||
assert m.pass_count == 0
|
||||
assert m.fail_count == 0
|
||||
assert m.success_rate == 0.0
|
||||
assert m.total_llm_calls == 0
|
||||
assert m.total_metabolic_cost == 0.0
|
||||
|
||||
def test_success_rate(self):
|
||||
m = BenchmarkMetrics(
|
||||
results=[
|
||||
ScenarioResult(scenario_name="a", success=True),
|
||||
ScenarioResult(scenario_name="b", success=False),
|
||||
ScenarioResult(scenario_name="c", success=True),
|
||||
]
|
||||
)
|
||||
assert m.pass_count == 2
|
||||
assert m.fail_count == 1
|
||||
assert abs(m.success_rate - 2 / 3) < 0.01
|
||||
|
||||
def test_totals(self):
|
||||
m = BenchmarkMetrics(
|
||||
results=[
|
||||
ScenarioResult(scenario_name="a", llm_calls=10, metabolic_cost=30.0),
|
||||
ScenarioResult(scenario_name="b", llm_calls=5, metabolic_cost=15.0),
|
||||
]
|
||||
)
|
||||
assert m.total_llm_calls == 15
|
||||
assert m.total_metabolic_cost == 45.0
|
||||
|
||||
def test_save_and_load(self, tmp_path):
|
||||
path = tmp_path / "bench.jsonl"
|
||||
m = BenchmarkMetrics(
|
||||
timestamp="2026-01-01T00:00:00",
|
||||
commit_sha="abc123",
|
||||
total_time_ms=1000,
|
||||
results=[
|
||||
ScenarioResult(
|
||||
scenario_name="a",
|
||||
success=True,
|
||||
cycles_used=5,
|
||||
max_cycles=10,
|
||||
),
|
||||
],
|
||||
)
|
||||
m.save(path)
|
||||
|
||||
history = load_history(path)
|
||||
assert len(history) == 1
|
||||
assert history[0]["commit_sha"] == "abc123"
|
||||
assert history[0]["scenarios"][0]["scenario_name"] == "a"
|
||||
|
||||
def test_save_appends(self, tmp_path):
|
||||
path = tmp_path / "bench.jsonl"
|
||||
for i in range(3):
|
||||
m = BenchmarkMetrics(
|
||||
timestamp=f"2026-01-0{i + 1}T00:00:00",
|
||||
results=[ScenarioResult(scenario_name=f"s{i}")],
|
||||
)
|
||||
m.save(path)
|
||||
|
||||
history = load_history(path)
|
||||
assert len(history) == 3
|
||||
# Most recent first
|
||||
assert history[0]["timestamp"] == "2026-01-03T00:00:00"
|
||||
|
||||
def test_summary_output(self):
|
||||
m = BenchmarkMetrics(
|
||||
timestamp="2026-01-01T00:00:00",
|
||||
commit_sha="abc123",
|
||||
total_time_ms=500,
|
||||
results=[
|
||||
ScenarioResult(
|
||||
scenario_name="Walk Test",
|
||||
success=True,
|
||||
cycles_used=5,
|
||||
max_cycles=10,
|
||||
wall_time_ms=200,
|
||||
llm_calls=15,
|
||||
),
|
||||
],
|
||||
)
|
||||
summary = m.summary()
|
||||
assert "Walk Test" in summary
|
||||
assert "PASS" in summary
|
||||
assert "abc123" in summary
|
||||
|
||||
def test_load_history_missing_file(self, tmp_path):
|
||||
assert load_history(tmp_path / "nope.jsonl") == []
|
||||
|
||||
def test_load_history_corrupt_lines(self, tmp_path):
|
||||
path = tmp_path / "bench.jsonl"
|
||||
path.write_text('{"valid": true}\nnot json\n{"also": "valid"}\n')
|
||||
history = load_history(path)
|
||||
assert len(history) == 2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Comparison
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCompareRuns:
|
||||
def test_regression_detected(self):
|
||||
baseline = BenchmarkMetrics(
|
||||
results=[
|
||||
ScenarioResult(scenario_name="walk", success=True, cycles_used=10),
|
||||
]
|
||||
)
|
||||
current = BenchmarkMetrics(
|
||||
results=[
|
||||
ScenarioResult(scenario_name="walk", success=False, cycles_used=10),
|
||||
]
|
||||
)
|
||||
report = compare_runs(current, baseline)
|
||||
assert "REGRESSION" in report
|
||||
|
||||
def test_improvement_detected(self):
|
||||
baseline = BenchmarkMetrics(
|
||||
results=[
|
||||
ScenarioResult(scenario_name="walk", success=False, cycles_used=10),
|
||||
]
|
||||
)
|
||||
current = BenchmarkMetrics(
|
||||
results=[
|
||||
ScenarioResult(scenario_name="walk", success=True, cycles_used=10),
|
||||
]
|
||||
)
|
||||
report = compare_runs(current, baseline)
|
||||
assert "IMPROVEMENT" in report
|
||||
|
||||
def test_slower_detected(self):
|
||||
baseline = BenchmarkMetrics(
|
||||
results=[
|
||||
ScenarioResult(scenario_name="walk", success=True, cycles_used=10),
|
||||
]
|
||||
)
|
||||
current = BenchmarkMetrics(
|
||||
results=[
|
||||
ScenarioResult(scenario_name="walk", success=True, cycles_used=20),
|
||||
]
|
||||
)
|
||||
report = compare_runs(current, baseline)
|
||||
assert "SLOWER" in report
|
||||
|
||||
def test_new_scenario_noted(self):
|
||||
baseline = BenchmarkMetrics(results=[])
|
||||
current = BenchmarkMetrics(results=[ScenarioResult(scenario_name="new_one", success=True)])
|
||||
report = compare_runs(current, baseline)
|
||||
assert "NEW" in report
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Runner
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBenchmarkRunner:
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_single_scenario(self):
|
||||
"""Runner executes a scenario and returns a result."""
|
||||
scenario = BenchmarkScenario(
|
||||
name="Test Walk",
|
||||
description="Simple test",
|
||||
start_location="A",
|
||||
goal_location="A",
|
||||
max_cycles=3,
|
||||
tags=["test"],
|
||||
)
|
||||
runner = BenchmarkRunner()
|
||||
metrics = await runner.run([scenario])
|
||||
assert len(metrics.results) == 1
|
||||
r = metrics.results[0]
|
||||
assert r.scenario_name == "Test Walk"
|
||||
assert r.cycles_used == 3 # no predicate, runs all cycles
|
||||
assert r.success is True # no predicate = success if survived
|
||||
assert r.wall_time_ms >= 0
|
||||
assert r.llm_calls == 9 # 3 cycles * 3 calls
|
||||
assert r.metabolic_cost > 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_with_goal_predicate(self):
|
||||
"""Runner stops early when goal predicate is satisfied."""
|
||||
|
||||
def always_true(actions, location):
|
||||
return True
|
||||
|
||||
scenario = BenchmarkScenario(
|
||||
name="Instant Win",
|
||||
description="Predicate satisfied immediately",
|
||||
start_location="A",
|
||||
max_cycles=100,
|
||||
goal_predicate=always_true,
|
||||
tags=["test"],
|
||||
)
|
||||
runner = BenchmarkRunner()
|
||||
metrics = await runner.run([scenario])
|
||||
r = metrics.results[0]
|
||||
assert r.success is True
|
||||
assert r.cycles_used == 1 # Stopped at first cycle
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_with_failing_predicate(self):
|
||||
"""Scenario fails when predicate never satisfied."""
|
||||
|
||||
def never_true(actions, location):
|
||||
return False
|
||||
|
||||
scenario = BenchmarkScenario(
|
||||
name="Impossible",
|
||||
description="Predicate never satisfied",
|
||||
start_location="A",
|
||||
max_cycles=5,
|
||||
goal_predicate=never_true,
|
||||
tags=["test"],
|
||||
)
|
||||
runner = BenchmarkRunner()
|
||||
metrics = await runner.run([scenario])
|
||||
r = metrics.results[0]
|
||||
assert r.success is False
|
||||
assert r.cycles_used == 5
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_multiple_scenarios(self):
|
||||
"""Runner handles multiple scenarios in sequence."""
|
||||
scenarios = [
|
||||
BenchmarkScenario(
|
||||
name=f"Scenario {i}",
|
||||
description=f"Test {i}",
|
||||
start_location="A",
|
||||
max_cycles=2,
|
||||
tags=["test"],
|
||||
)
|
||||
for i in range(3)
|
||||
]
|
||||
runner = BenchmarkRunner()
|
||||
metrics = await runner.run(scenarios)
|
||||
assert len(metrics.results) == 3
|
||||
assert metrics.total_time_ms >= 0
|
||||
assert metrics.timestamp
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_metrics_commit_sha(self):
|
||||
"""Runner captures git SHA in metrics."""
|
||||
scenario = BenchmarkScenario(
|
||||
name="SHA Test",
|
||||
description="Check SHA capture",
|
||||
start_location="A",
|
||||
max_cycles=1,
|
||||
tags=["test"],
|
||||
)
|
||||
runner = BenchmarkRunner()
|
||||
metrics = await runner.run([scenario])
|
||||
# SHA may or may not be available in test env; just ensure no crash
|
||||
assert isinstance(metrics.commit_sha, str)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_builtin_scenarios_run(self):
|
||||
"""All built-in scenarios run without crashing."""
|
||||
# Use just 2 cycles each to keep tests fast
|
||||
scenarios = [
|
||||
BenchmarkScenario(
|
||||
name=s.name,
|
||||
description=s.description,
|
||||
start_location=s.start_location,
|
||||
goal_location=s.goal_location,
|
||||
entities=list(s.entities),
|
||||
events=list(s.events),
|
||||
max_cycles=2, # Override for speed
|
||||
goal_predicate=None, # Skip predicate for smoke test
|
||||
tags=list(s.tags),
|
||||
)
|
||||
for s in BUILTIN_SCENARIOS
|
||||
]
|
||||
runner = BenchmarkRunner()
|
||||
metrics = await runner.run(scenarios)
|
||||
assert len(metrics.results) == len(BUILTIN_SCENARIOS)
|
||||
# All should succeed (no predicate + survived = pass)
|
||||
for r in metrics.results:
|
||||
assert r.success is True
|
||||
assert r.error is None
|
||||
619
tests/timmy/test_mcp_bridge.py
Normal file
619
tests/timmy/test_mcp_bridge.py
Normal file
@@ -0,0 +1,619 @@
|
||||
"""Tests for the MCP bridge module (Qwen3 via Ollama)."""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from timmy.mcp_bridge import (
|
||||
BridgeResult,
|
||||
MCPBridge,
|
||||
MCPToolDef,
|
||||
_build_gitea_tools,
|
||||
_build_shell_tool,
|
||||
_mcp_schema_to_ollama_tool,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _mcp_schema_to_ollama_tool
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_schema_to_ollama_tool_basic():
|
||||
"""Converts an MCPToolDef to Ollama tool format."""
|
||||
tool = MCPToolDef(
|
||||
name="test_tool",
|
||||
description="A test tool",
|
||||
parameters={
|
||||
"type": "object",
|
||||
"properties": {"arg1": {"type": "string"}},
|
||||
"required": ["arg1"],
|
||||
},
|
||||
handler=AsyncMock(),
|
||||
)
|
||||
result = _mcp_schema_to_ollama_tool(tool)
|
||||
assert result["type"] == "function"
|
||||
assert result["function"]["name"] == "test_tool"
|
||||
assert result["function"]["description"] == "A test tool"
|
||||
assert result["function"]["parameters"]["type"] == "object"
|
||||
assert "arg1" in result["function"]["parameters"]["properties"]
|
||||
|
||||
|
||||
def test_schema_to_ollama_tool_wraps_bare_params():
|
||||
"""Wraps bare parameter dicts in an object type."""
|
||||
tool = MCPToolDef(
|
||||
name="bare",
|
||||
description="Bare params",
|
||||
parameters={"x": {"type": "integer"}},
|
||||
handler=AsyncMock(),
|
||||
)
|
||||
result = _mcp_schema_to_ollama_tool(tool)
|
||||
params = result["function"]["parameters"]
|
||||
assert params["type"] == "object"
|
||||
assert "x" in params["properties"]
|
||||
assert "x" in params["required"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _build_shell_tool
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_build_shell_tool_returns_def():
|
||||
"""Shell tool builder returns an MCPToolDef."""
|
||||
tool = _build_shell_tool()
|
||||
assert tool is not None
|
||||
assert tool.name == "shell_exec"
|
||||
assert "command" in tool.parameters["properties"]
|
||||
|
||||
|
||||
def test_build_shell_tool_graceful_on_import_error():
|
||||
"""Shell tool returns None when infrastructure is unavailable."""
|
||||
with patch.dict("sys.modules", {"infrastructure.hands.shell": None}):
|
||||
# Force re-import failure — but _build_shell_tool catches it
|
||||
with patch(
|
||||
"timmy.mcp_bridge._build_shell_tool",
|
||||
wraps=_build_shell_tool,
|
||||
):
|
||||
# The real function should handle import errors
|
||||
tool = _build_shell_tool()
|
||||
# May return tool if import cache succeeds, or None if not
|
||||
# Just verify it doesn't raise
|
||||
assert tool is None or isinstance(tool, MCPToolDef)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _build_gitea_tools
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_gitea_tools_empty_when_disabled():
|
||||
"""Gitea tools returns empty list when disabled."""
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
result = _build_gitea_tools()
|
||||
assert result == []
|
||||
|
||||
|
||||
def test_gitea_tools_empty_when_no_token():
|
||||
"""Gitea tools returns empty list when no token."""
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = ""
|
||||
result = _build_gitea_tools()
|
||||
assert result == []
|
||||
|
||||
|
||||
def test_gitea_tools_returns_three_tools():
|
||||
"""Gitea tools returns list_issues, create_issue, read_issue."""
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok123"
|
||||
mock_settings.gitea_url = "http://localhost:3000"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
result = _build_gitea_tools()
|
||||
assert len(result) == 3
|
||||
names = {t.name for t in result}
|
||||
assert names == {"list_issues", "create_issue", "read_issue"}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MCPBridge.__init__
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_bridge_init_default():
|
||||
"""MCPBridge initialises with default settings."""
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.ollama_model = "qwen3:14b"
|
||||
mock_settings.normalized_ollama_url = "http://localhost:11434"
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
bridge = MCPBridge(include_gitea=False, include_shell=False)
|
||||
assert bridge.model == "qwen3:14b"
|
||||
assert bridge.tool_names == []
|
||||
|
||||
|
||||
def test_bridge_init_with_extra_tools():
|
||||
"""MCPBridge accepts extra tool definitions."""
|
||||
custom = MCPToolDef(
|
||||
name="custom_tool",
|
||||
description="Custom",
|
||||
parameters={"type": "object", "properties": {}, "required": []},
|
||||
handler=AsyncMock(),
|
||||
)
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.ollama_model = "qwen3:14b"
|
||||
mock_settings.normalized_ollama_url = "http://localhost:11434"
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
bridge = MCPBridge(
|
||||
include_gitea=False,
|
||||
include_shell=False,
|
||||
extra_tools=[custom],
|
||||
)
|
||||
assert "custom_tool" in bridge.tool_names
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MCPBridge.run — tool-call loop
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_bridge_run_simple_response():
|
||||
"""Bridge returns model content when no tool calls are made."""
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.ollama_model = "qwen3:14b"
|
||||
mock_settings.normalized_ollama_url = "http://localhost:11434"
|
||||
mock_settings.ollama_num_ctx = 4096
|
||||
mock_settings.mcp_bridge_timeout = 60
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
|
||||
bridge = MCPBridge(include_gitea=False, include_shell=False)
|
||||
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.json.return_value = {
|
||||
"message": {"role": "assistant", "content": "Hello!"}
|
||||
}
|
||||
mock_resp.raise_for_status = MagicMock()
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post = AsyncMock(return_value=mock_resp)
|
||||
mock_client.aclose = AsyncMock()
|
||||
|
||||
bridge._client = mock_client
|
||||
result = await bridge.run("Hi")
|
||||
|
||||
assert result.content == "Hello!"
|
||||
assert result.rounds == 1
|
||||
assert result.tool_calls_made == []
|
||||
assert result.error == ""
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_bridge_run_with_tool_call():
|
||||
"""Bridge executes tool calls and returns final response."""
|
||||
handler = AsyncMock(return_value="tool result data")
|
||||
tool = MCPToolDef(
|
||||
name="my_tool",
|
||||
description="Test",
|
||||
parameters={"type": "object", "properties": {}, "required": []},
|
||||
handler=handler,
|
||||
)
|
||||
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.ollama_model = "qwen3:14b"
|
||||
mock_settings.normalized_ollama_url = "http://localhost:11434"
|
||||
mock_settings.ollama_num_ctx = 0
|
||||
mock_settings.mcp_bridge_timeout = 60
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
|
||||
bridge = MCPBridge(
|
||||
include_gitea=False,
|
||||
include_shell=False,
|
||||
extra_tools=[tool],
|
||||
)
|
||||
|
||||
# Round 1: model requests tool call
|
||||
tool_call_resp = MagicMock()
|
||||
tool_call_resp.json.return_value = {
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"content": "",
|
||||
"tool_calls": [
|
||||
{
|
||||
"function": {
|
||||
"name": "my_tool",
|
||||
"arguments": {},
|
||||
}
|
||||
}
|
||||
],
|
||||
}
|
||||
}
|
||||
tool_call_resp.raise_for_status = MagicMock()
|
||||
|
||||
# Round 2: model returns final text
|
||||
final_resp = MagicMock()
|
||||
final_resp.json.return_value = {
|
||||
"message": {"role": "assistant", "content": "Done with tools!"}
|
||||
}
|
||||
final_resp.raise_for_status = MagicMock()
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post = AsyncMock(side_effect=[tool_call_resp, final_resp])
|
||||
mock_client.aclose = AsyncMock()
|
||||
|
||||
bridge._client = mock_client
|
||||
result = await bridge.run("Do something")
|
||||
|
||||
assert result.content == "Done with tools!"
|
||||
assert result.rounds == 2
|
||||
assert len(result.tool_calls_made) == 1
|
||||
assert result.tool_calls_made[0]["tool"] == "my_tool"
|
||||
handler.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_bridge_run_unknown_tool():
|
||||
"""Bridge handles calls to unknown tools gracefully."""
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.ollama_model = "qwen3:14b"
|
||||
mock_settings.normalized_ollama_url = "http://localhost:11434"
|
||||
mock_settings.ollama_num_ctx = 0
|
||||
mock_settings.mcp_bridge_timeout = 60
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
|
||||
bridge = MCPBridge(include_gitea=False, include_shell=False)
|
||||
|
||||
# Model calls a tool that doesn't exist
|
||||
tool_call_resp = MagicMock()
|
||||
tool_call_resp.json.return_value = {
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"content": "",
|
||||
"tool_calls": [
|
||||
{"function": {"name": "nonexistent", "arguments": {}}}
|
||||
],
|
||||
}
|
||||
}
|
||||
tool_call_resp.raise_for_status = MagicMock()
|
||||
|
||||
final_resp = MagicMock()
|
||||
final_resp.json.return_value = {
|
||||
"message": {"role": "assistant", "content": "OK"}
|
||||
}
|
||||
final_resp.raise_for_status = MagicMock()
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post = AsyncMock(side_effect=[tool_call_resp, final_resp])
|
||||
mock_client.aclose = AsyncMock()
|
||||
|
||||
bridge._client = mock_client
|
||||
result = await bridge.run("test")
|
||||
|
||||
assert len(result.tool_calls_made) == 1
|
||||
assert "unknown tool" in result.tool_calls_made[0]["result"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_bridge_run_max_rounds():
|
||||
"""Bridge stops after max_rounds and returns error."""
|
||||
handler = AsyncMock(return_value="result")
|
||||
tool = MCPToolDef(
|
||||
name="loop_tool",
|
||||
description="Loops forever",
|
||||
parameters={"type": "object", "properties": {}, "required": []},
|
||||
handler=handler,
|
||||
)
|
||||
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.ollama_model = "qwen3:14b"
|
||||
mock_settings.normalized_ollama_url = "http://localhost:11434"
|
||||
mock_settings.ollama_num_ctx = 0
|
||||
mock_settings.mcp_bridge_timeout = 60
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
|
||||
bridge = MCPBridge(
|
||||
include_gitea=False,
|
||||
include_shell=False,
|
||||
extra_tools=[tool],
|
||||
max_rounds=2,
|
||||
)
|
||||
|
||||
# Always return tool calls (never a final response)
|
||||
tool_call_resp = MagicMock()
|
||||
tool_call_resp.json.return_value = {
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"content": "",
|
||||
"tool_calls": [
|
||||
{"function": {"name": "loop_tool", "arguments": {}}}
|
||||
],
|
||||
}
|
||||
}
|
||||
tool_call_resp.raise_for_status = MagicMock()
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post = AsyncMock(return_value=tool_call_resp)
|
||||
mock_client.aclose = AsyncMock()
|
||||
|
||||
bridge._client = mock_client
|
||||
result = await bridge.run("loop")
|
||||
|
||||
assert "max tool-call rounds" in result.content
|
||||
assert "Exceeded" in result.error
|
||||
assert result.rounds == 2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_bridge_run_connection_error():
|
||||
"""Bridge handles Ollama connection errors gracefully."""
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.ollama_model = "qwen3:14b"
|
||||
mock_settings.normalized_ollama_url = "http://localhost:11434"
|
||||
mock_settings.ollama_num_ctx = 0
|
||||
mock_settings.mcp_bridge_timeout = 60
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
|
||||
bridge = MCPBridge(include_gitea=False, include_shell=False)
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post = AsyncMock(
|
||||
side_effect=httpx.ConnectError("Connection refused")
|
||||
)
|
||||
mock_client.aclose = AsyncMock()
|
||||
|
||||
bridge._client = mock_client
|
||||
result = await bridge.run("test")
|
||||
|
||||
assert result.error
|
||||
assert "connection" in result.error.lower()
|
||||
assert result.content == ""
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_bridge_run_http_error():
|
||||
"""Bridge handles Ollama HTTP errors gracefully."""
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.ollama_model = "qwen3:14b"
|
||||
mock_settings.normalized_ollama_url = "http://localhost:11434"
|
||||
mock_settings.ollama_num_ctx = 0
|
||||
mock_settings.mcp_bridge_timeout = 60
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
|
||||
bridge = MCPBridge(include_gitea=False, include_shell=False)
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 500
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post = AsyncMock(
|
||||
side_effect=httpx.HTTPStatusError(
|
||||
"Server Error",
|
||||
request=MagicMock(),
|
||||
response=mock_response,
|
||||
)
|
||||
)
|
||||
mock_client.aclose = AsyncMock()
|
||||
|
||||
bridge._client = mock_client
|
||||
result = await bridge.run("test")
|
||||
|
||||
assert result.error
|
||||
assert "500" in result.error
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_bridge_run_without_context_manager():
|
||||
"""Bridge returns error when used without async context manager."""
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.ollama_model = "qwen3:14b"
|
||||
mock_settings.normalized_ollama_url = "http://localhost:11434"
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
|
||||
bridge = MCPBridge(include_gitea=False, include_shell=False)
|
||||
|
||||
result = await bridge.run("test")
|
||||
assert result.error
|
||||
assert "context manager" in result.error.lower()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MCPBridge.status
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_bridge_status():
|
||||
"""Bridge status returns model and tool info."""
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.ollama_model = "qwen3:14b"
|
||||
mock_settings.normalized_ollama_url = "http://localhost:11434"
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
|
||||
bridge = MCPBridge(include_gitea=False, include_shell=False)
|
||||
|
||||
status = bridge.status()
|
||||
assert status["model"] == "qwen3:14b"
|
||||
assert status["connected"] is False
|
||||
assert isinstance(status["tools"], list)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MCPBridge context manager
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_bridge_context_manager():
|
||||
"""Bridge opens and closes httpx client via async context manager."""
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.ollama_model = "qwen3:14b"
|
||||
mock_settings.normalized_ollama_url = "http://localhost:11434"
|
||||
mock_settings.mcp_bridge_timeout = 60
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
|
||||
bridge = MCPBridge(include_gitea=False, include_shell=False)
|
||||
|
||||
assert bridge._client is None
|
||||
|
||||
async with bridge:
|
||||
assert bridge._client is not None
|
||||
|
||||
assert bridge._client is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gitea tool handlers (integration-style, mocked HTTP)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gitea_list_issues_handler():
|
||||
"""list_issues handler calls Gitea API and formats results."""
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok123"
|
||||
mock_settings.gitea_url = "http://localhost:3000"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
tools = _build_gitea_tools()
|
||||
|
||||
list_tool = next(t for t in tools if t.name == "list_issues")
|
||||
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.json.return_value = [
|
||||
{"number": 1, "title": "Bug one", "labels": [{"name": "bug"}]},
|
||||
{"number": 2, "title": "Feature two", "labels": []},
|
||||
]
|
||||
mock_resp.raise_for_status = MagicMock()
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.get = AsyncMock(return_value=mock_resp)
|
||||
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
||||
mock_client.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with patch("timmy.mcp_bridge.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await list_tool.handler(state="open", limit=10)
|
||||
|
||||
assert "#1: Bug one [bug]" in result
|
||||
assert "#2: Feature two" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gitea_create_issue_handler():
|
||||
"""create_issue handler calls Gitea API and returns confirmation."""
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok123"
|
||||
mock_settings.gitea_url = "http://localhost:3000"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
tools = _build_gitea_tools()
|
||||
|
||||
create_tool = next(t for t in tools if t.name == "create_issue")
|
||||
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.json.return_value = {"number": 42, "title": "New bug"}
|
||||
mock_resp.raise_for_status = MagicMock()
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post = AsyncMock(return_value=mock_resp)
|
||||
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
||||
mock_client.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with patch("timmy.mcp_bridge.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await create_tool.handler(title="New bug", body="Description")
|
||||
|
||||
assert "#42" in result
|
||||
assert "New bug" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gitea_create_issue_requires_title():
|
||||
"""create_issue handler returns error when title is missing."""
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok123"
|
||||
mock_settings.gitea_url = "http://localhost:3000"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
tools = _build_gitea_tools()
|
||||
|
||||
create_tool = next(t for t in tools if t.name == "create_issue")
|
||||
result = await create_tool.handler()
|
||||
assert "required" in result.lower()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gitea_read_issue_handler():
|
||||
"""read_issue handler calls Gitea API and formats result."""
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok123"
|
||||
mock_settings.gitea_url = "http://localhost:3000"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
tools = _build_gitea_tools()
|
||||
|
||||
read_tool = next(t for t in tools if t.name == "read_issue")
|
||||
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.json.return_value = {
|
||||
"number": 5,
|
||||
"title": "Test issue",
|
||||
"state": "open",
|
||||
"body": "Issue body text",
|
||||
"labels": [{"name": "enhancement"}],
|
||||
}
|
||||
mock_resp.raise_for_status = MagicMock()
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.get = AsyncMock(return_value=mock_resp)
|
||||
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
||||
mock_client.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with patch("timmy.mcp_bridge.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await read_tool.handler(number=5)
|
||||
|
||||
assert "#5" in result
|
||||
assert "Test issue" in result
|
||||
assert "open" in result
|
||||
assert "enhancement" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gitea_read_issue_requires_number():
|
||||
"""read_issue handler returns error when number is missing."""
|
||||
with patch("timmy.mcp_bridge.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok123"
|
||||
mock_settings.gitea_url = "http://localhost:3000"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
tools = _build_gitea_tools()
|
||||
|
||||
read_tool = next(t for t in tools if t.name == "read_issue")
|
||||
result = await read_tool.handler()
|
||||
assert "required" in result.lower()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# BridgeResult dataclass
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_bridge_result_defaults():
|
||||
"""BridgeResult has sensible defaults."""
|
||||
r = BridgeResult(content="hello")
|
||||
assert r.content == "hello"
|
||||
assert r.tool_calls_made == []
|
||||
assert r.rounds == 0
|
||||
assert r.latency_ms == 0.0
|
||||
assert r.model == ""
|
||||
assert r.error == ""
|
||||
348
tests/timmy/test_research_triage.py
Normal file
348
tests/timmy/test_research_triage.py
Normal file
@@ -0,0 +1,348 @@
|
||||
"""Tests for research triage — action item extraction and Gitea issue filing."""
|
||||
|
||||
import json
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from timmy.research_triage import (
|
||||
ActionItem,
|
||||
_parse_llm_response,
|
||||
_resolve_label_ids,
|
||||
_validate_action_item,
|
||||
create_gitea_issue,
|
||||
extract_action_items,
|
||||
triage_research_report,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ActionItem
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
SAMPLE_REPORT = """
|
||||
## Research: MCP Abstraction Layer
|
||||
|
||||
### Finding 1: FastMCP overhead is negligible
|
||||
FastMCP averages 26.45ms per tool call. Total overhead <3% of budget.
|
||||
|
||||
### Finding 2: Agno tool calling is broken
|
||||
Agno issues #2231, #2625 document persistent breakage with Ollama.
|
||||
Fix: Use Ollama's `format` parameter with Pydantic JSON schemas.
|
||||
|
||||
### Recommendation
|
||||
Implement three-tier router for structured output.
|
||||
"""
|
||||
|
||||
SAMPLE_LLM_RESPONSE = json.dumps(
|
||||
[
|
||||
{
|
||||
"title": "[Router] Implement three-tier structured output router",
|
||||
"body": (
|
||||
"**What:** Build a three-tier router that uses Ollama's "
|
||||
"`format` parameter for structured output.\n"
|
||||
"**Why:** Agno's native tool calling is broken (#2231, #2625). "
|
||||
"Pydantic JSON schemas with `format` bypass the issue.\n"
|
||||
"**Suggested approach:** Add format parameter support to "
|
||||
"CascadeRouter.\n"
|
||||
"**Acceptance criteria:** Tool calls return valid JSON matching "
|
||||
"the Pydantic schema."
|
||||
),
|
||||
"labels": ["actionable", "feature", "kimi-ready"],
|
||||
"priority": "high",
|
||||
"source_urls": ["https://github.com/agno-agi/agno/issues/2231"],
|
||||
},
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
class TestActionItem:
|
||||
def test_to_issue_body_basic(self):
|
||||
item = ActionItem(title="Test", body="Test body")
|
||||
body = item.to_issue_body()
|
||||
assert "Test body" in body
|
||||
assert "Auto-triaged" in body
|
||||
|
||||
def test_to_issue_body_with_source_issue(self):
|
||||
item = ActionItem(title="Test", body="Test body")
|
||||
body = item.to_issue_body(source_issue=946)
|
||||
assert "#946" in body
|
||||
assert "Origin" in body
|
||||
|
||||
def test_to_issue_body_with_source_urls(self):
|
||||
item = ActionItem(
|
||||
title="Test",
|
||||
body="Body",
|
||||
source_urls=["https://example.com/finding"],
|
||||
)
|
||||
body = item.to_issue_body()
|
||||
assert "https://example.com/finding" in body
|
||||
assert "Source Evidence" in body
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _parse_llm_response
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestParseLlmResponse:
|
||||
def test_plain_json(self):
|
||||
items = _parse_llm_response('[{"title": "foo"}]')
|
||||
assert len(items) == 1
|
||||
assert items[0]["title"] == "foo"
|
||||
|
||||
def test_fenced_json(self):
|
||||
raw = '```json\n[{"title": "bar"}]\n```'
|
||||
items = _parse_llm_response(raw)
|
||||
assert len(items) == 1
|
||||
assert items[0]["title"] == "bar"
|
||||
|
||||
def test_empty_array(self):
|
||||
assert _parse_llm_response("[]") == []
|
||||
|
||||
def test_non_array_returns_empty(self):
|
||||
assert _parse_llm_response('{"title": "not an array"}') == []
|
||||
|
||||
def test_invalid_json_raises(self):
|
||||
with pytest.raises(json.JSONDecodeError):
|
||||
_parse_llm_response("not json at all")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _validate_action_item
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestValidateActionItem:
|
||||
def test_valid_item(self):
|
||||
raw = {
|
||||
"title": "[Area] A specific clear title",
|
||||
"body": "Detailed body with enough content to be useful.",
|
||||
"labels": ["actionable", "bug"],
|
||||
"priority": "high",
|
||||
}
|
||||
item = _validate_action_item(raw)
|
||||
assert item is not None
|
||||
assert item.title == "[Area] A specific clear title"
|
||||
assert item.priority == "high"
|
||||
assert "actionable" in item.labels
|
||||
|
||||
def test_short_title_rejected(self):
|
||||
raw = {"title": "Short", "body": "Detailed body with enough content here."}
|
||||
assert _validate_action_item(raw) is None
|
||||
|
||||
def test_short_body_rejected(self):
|
||||
raw = {"title": "A perfectly fine title here", "body": "Too short"}
|
||||
assert _validate_action_item(raw) is None
|
||||
|
||||
def test_missing_title_rejected(self):
|
||||
raw = {"body": "Detailed body with enough content to be useful."}
|
||||
assert _validate_action_item(raw) is None
|
||||
|
||||
def test_non_dict_rejected(self):
|
||||
assert _validate_action_item("not a dict") is None
|
||||
|
||||
def test_actionable_label_auto_added(self):
|
||||
raw = {
|
||||
"title": "A perfectly fine title here",
|
||||
"body": "Detailed body with enough content to be useful.",
|
||||
"labels": ["bug"],
|
||||
}
|
||||
item = _validate_action_item(raw)
|
||||
assert item is not None
|
||||
assert "actionable" in item.labels
|
||||
|
||||
def test_labels_as_csv_string(self):
|
||||
raw = {
|
||||
"title": "A perfectly fine title here",
|
||||
"body": "Detailed body with enough content to be useful.",
|
||||
"labels": "bug, feature",
|
||||
}
|
||||
item = _validate_action_item(raw)
|
||||
assert item is not None
|
||||
assert "bug" in item.labels
|
||||
assert "feature" in item.labels
|
||||
|
||||
def test_invalid_priority_defaults_medium(self):
|
||||
raw = {
|
||||
"title": "A perfectly fine title here",
|
||||
"body": "Detailed body with enough content to be useful.",
|
||||
"priority": "urgent",
|
||||
}
|
||||
item = _validate_action_item(raw)
|
||||
assert item is not None
|
||||
assert item.priority == "medium"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# extract_action_items
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExtractActionItems:
|
||||
@pytest.mark.asyncio
|
||||
async def test_extracts_items_from_report(self):
|
||||
mock_llm = AsyncMock(return_value=SAMPLE_LLM_RESPONSE)
|
||||
items = await extract_action_items(SAMPLE_REPORT, llm_caller=mock_llm)
|
||||
assert len(items) == 1
|
||||
assert "three-tier" in items[0].title.lower()
|
||||
assert items[0].priority == "high"
|
||||
mock_llm.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_report_returns_empty(self):
|
||||
items = await extract_action_items("")
|
||||
assert items == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_llm_failure_returns_empty(self):
|
||||
mock_llm = AsyncMock(side_effect=RuntimeError("LLM down"))
|
||||
items = await extract_action_items(SAMPLE_REPORT, llm_caller=mock_llm)
|
||||
assert items == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_llm_returns_empty_string(self):
|
||||
mock_llm = AsyncMock(return_value="")
|
||||
items = await extract_action_items(SAMPLE_REPORT, llm_caller=mock_llm)
|
||||
assert items == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_llm_returns_invalid_json(self):
|
||||
mock_llm = AsyncMock(return_value="not valid json")
|
||||
items = await extract_action_items(SAMPLE_REPORT, llm_caller=mock_llm)
|
||||
assert items == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_caps_at_five_items(self):
|
||||
many_items = [
|
||||
{
|
||||
"title": f"[Area] Action item number {i} is specific",
|
||||
"body": f"Detailed body for action item {i} with enough words.",
|
||||
"labels": ["actionable"],
|
||||
"priority": "medium",
|
||||
}
|
||||
for i in range(10)
|
||||
]
|
||||
mock_llm = AsyncMock(return_value=json.dumps(many_items))
|
||||
items = await extract_action_items(SAMPLE_REPORT, llm_caller=mock_llm)
|
||||
assert len(items) <= 5
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# create_gitea_issue
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCreateGiteaIssue:
|
||||
@pytest.mark.asyncio
|
||||
async def test_creates_issue_via_api(self):
|
||||
item = ActionItem(
|
||||
title="[Test] Create a test issue",
|
||||
body="This is a test issue body with details.",
|
||||
labels=["actionable"],
|
||||
)
|
||||
issue_resp = MagicMock()
|
||||
issue_resp.status_code = 201
|
||||
issue_resp.json.return_value = {"number": 42, "title": item.title}
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post.return_value = issue_resp
|
||||
|
||||
with (
|
||||
patch("timmy.research_triage.settings") as mock_settings,
|
||||
patch("timmy.research_triage._resolve_label_ids", new_callable=AsyncMock, return_value=[1]),
|
||||
patch("timmy.research_triage.httpx.AsyncClient") as mock_cls,
|
||||
):
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "test-token"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
mock_settings.gitea_url = "http://localhost:3000"
|
||||
mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
|
||||
mock_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||
result = await create_gitea_issue(item, source_issue=946)
|
||||
|
||||
assert result is not None
|
||||
assert result["number"] == 42
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_none_when_disabled(self):
|
||||
item = ActionItem(title="[Test] Disabled test", body="Body content here.")
|
||||
with patch("timmy.research_triage.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = False
|
||||
mock_settings.gitea_token = ""
|
||||
result = await create_gitea_issue(item)
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handles_connection_error(self):
|
||||
item = ActionItem(
|
||||
title="[Test] Connection fail",
|
||||
body="Body content for connection test.",
|
||||
)
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post.side_effect = httpx.ConnectError("refused")
|
||||
|
||||
with (
|
||||
patch("timmy.research_triage.settings") as mock_settings,
|
||||
patch("timmy.research_triage._resolve_label_ids", new_callable=AsyncMock, return_value=[]),
|
||||
patch("timmy.research_triage.httpx.AsyncClient") as mock_cls,
|
||||
):
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "test-token"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
mock_settings.gitea_url = "http://localhost:3000"
|
||||
mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
|
||||
mock_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||
result = await create_gitea_issue(item)
|
||||
assert result is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# triage_research_report (integration)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestTriageResearchReport:
|
||||
@pytest.mark.asyncio
|
||||
async def test_dry_run_extracts_without_filing(self):
|
||||
mock_llm = AsyncMock(return_value=SAMPLE_LLM_RESPONSE)
|
||||
results = await triage_research_report(
|
||||
SAMPLE_REPORT, source_issue=946, llm_caller=mock_llm, dry_run=True
|
||||
)
|
||||
assert len(results) == 1
|
||||
assert results[0]["action_item"] is not None
|
||||
assert results[0]["gitea_issue"] is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_report_returns_empty(self):
|
||||
results = await triage_research_report("", llm_caller=AsyncMock(return_value="[]"))
|
||||
assert results == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_end_to_end_with_mock_gitea(self):
|
||||
mock_llm = AsyncMock(return_value=SAMPLE_LLM_RESPONSE)
|
||||
|
||||
issue_resp = MagicMock()
|
||||
issue_resp.status_code = 201
|
||||
issue_resp.json.return_value = {"number": 99, "title": "test"}
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post.return_value = issue_resp
|
||||
|
||||
with (
|
||||
patch("timmy.research_triage.settings") as mock_settings,
|
||||
patch("timmy.research_triage._resolve_label_ids", new_callable=AsyncMock, return_value=[]),
|
||||
patch("timmy.research_triage.httpx.AsyncClient") as mock_cls,
|
||||
):
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "test-token"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
mock_settings.gitea_url = "http://localhost:3000"
|
||||
mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
|
||||
mock_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||
results = await triage_research_report(
|
||||
SAMPLE_REPORT, source_issue=946, llm_caller=mock_llm
|
||||
)
|
||||
|
||||
assert len(results) == 1
|
||||
assert results[0]["gitea_issue"]["number"] == 99
|
||||
158
tests/timmy/test_tools_web_fetch.py
Normal file
158
tests/timmy/test_tools_web_fetch.py
Normal file
@@ -0,0 +1,158 @@
|
||||
"""Unit tests for the web_fetch tool in timmy.tools."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from timmy.tools import web_fetch
|
||||
|
||||
|
||||
class TestWebFetch:
|
||||
"""Tests for web_fetch function."""
|
||||
|
||||
def test_invalid_url_no_scheme(self):
|
||||
"""URLs without http(s) scheme are rejected."""
|
||||
result = web_fetch("example.com")
|
||||
assert "Error: invalid URL" in result
|
||||
|
||||
def test_invalid_url_empty(self):
|
||||
"""Empty URL is rejected."""
|
||||
result = web_fetch("")
|
||||
assert "Error: invalid URL" in result
|
||||
|
||||
def test_invalid_url_ftp(self):
|
||||
"""Non-HTTP schemes are rejected."""
|
||||
result = web_fetch("ftp://example.com")
|
||||
assert "Error: invalid URL" in result
|
||||
|
||||
@patch("timmy.tools.trafilatura", create=True)
|
||||
@patch("timmy.tools._requests", create=True)
|
||||
def test_successful_fetch(self, mock_requests, mock_trafilatura):
|
||||
"""Happy path: fetch + extract returns text."""
|
||||
# We need to patch at import level inside the function
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.text = "<html><body><p>Hello world</p></body></html>"
|
||||
|
||||
with patch.dict(
|
||||
"sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}
|
||||
):
|
||||
mock_requests.get.return_value = mock_resp
|
||||
mock_requests.exceptions = _make_exceptions()
|
||||
mock_trafilatura.extract.return_value = "Hello world"
|
||||
|
||||
result = web_fetch("https://example.com")
|
||||
|
||||
assert result == "Hello world"
|
||||
|
||||
@patch.dict("sys.modules", {"requests": MagicMock(), "trafilatura": MagicMock()})
|
||||
def test_truncation(self):
|
||||
"""Long text is truncated to max_tokens * 4 chars."""
|
||||
import sys
|
||||
|
||||
mock_trafilatura = sys.modules["trafilatura"]
|
||||
mock_requests = sys.modules["requests"]
|
||||
|
||||
long_text = "a" * 20000
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.text = "<html><body>" + long_text + "</body></html>"
|
||||
mock_requests.get.return_value = mock_resp
|
||||
mock_requests.exceptions = _make_exceptions()
|
||||
mock_trafilatura.extract.return_value = long_text
|
||||
|
||||
result = web_fetch("https://example.com", max_tokens=100)
|
||||
|
||||
# 100 tokens * 4 chars = 400 chars max
|
||||
assert len(result) < 500
|
||||
assert "[…truncated" in result
|
||||
|
||||
@patch.dict("sys.modules", {"requests": MagicMock(), "trafilatura": MagicMock()})
|
||||
def test_extraction_failure(self):
|
||||
"""Returns error when trafilatura can't extract text."""
|
||||
import sys
|
||||
|
||||
mock_trafilatura = sys.modules["trafilatura"]
|
||||
mock_requests = sys.modules["requests"]
|
||||
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.text = "<html></html>"
|
||||
mock_requests.get.return_value = mock_resp
|
||||
mock_requests.exceptions = _make_exceptions()
|
||||
mock_trafilatura.extract.return_value = None
|
||||
|
||||
result = web_fetch("https://example.com")
|
||||
assert "Error: could not extract" in result
|
||||
|
||||
@patch.dict("sys.modules", {"trafilatura": MagicMock()})
|
||||
def test_timeout(self):
|
||||
"""Timeout errors are handled gracefully."""
|
||||
|
||||
mock_requests = MagicMock()
|
||||
exc_mod = _make_exceptions()
|
||||
mock_requests.exceptions = exc_mod
|
||||
mock_requests.get.side_effect = exc_mod.Timeout("timed out")
|
||||
|
||||
with patch.dict("sys.modules", {"requests": mock_requests}):
|
||||
result = web_fetch("https://example.com")
|
||||
|
||||
assert "timed out" in result
|
||||
|
||||
@patch.dict("sys.modules", {"trafilatura": MagicMock()})
|
||||
def test_http_error(self):
|
||||
"""HTTP errors (404, 500, etc.) are handled gracefully."""
|
||||
|
||||
mock_requests = MagicMock()
|
||||
exc_mod = _make_exceptions()
|
||||
mock_requests.exceptions = exc_mod
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 404
|
||||
mock_requests.get.return_value.raise_for_status.side_effect = exc_mod.HTTPError(
|
||||
response=mock_response
|
||||
)
|
||||
|
||||
with patch.dict("sys.modules", {"requests": mock_requests}):
|
||||
result = web_fetch("https://example.com/nope")
|
||||
|
||||
assert "404" in result
|
||||
|
||||
def test_missing_requests(self):
|
||||
"""Graceful error when requests not installed."""
|
||||
with patch.dict("sys.modules", {"requests": None}):
|
||||
result = web_fetch("https://example.com")
|
||||
assert "requests" in result and "not installed" in result
|
||||
|
||||
def test_missing_trafilatura(self):
|
||||
"""Graceful error when trafilatura not installed."""
|
||||
mock_requests = MagicMock()
|
||||
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": None}):
|
||||
result = web_fetch("https://example.com")
|
||||
assert "trafilatura" in result and "not installed" in result
|
||||
|
||||
def test_catalog_entry_exists(self):
|
||||
"""web_fetch should appear in the tool catalog."""
|
||||
from timmy.tools import get_all_available_tools
|
||||
|
||||
catalog = get_all_available_tools()
|
||||
assert "web_fetch" in catalog
|
||||
assert "orchestrator" in catalog["web_fetch"]["available_in"]
|
||||
|
||||
|
||||
def _make_exceptions():
|
||||
"""Create a mock exceptions module with real exception classes."""
|
||||
|
||||
class Timeout(Exception):
|
||||
pass
|
||||
|
||||
class HTTPError(Exception):
|
||||
def __init__(self, *args, response=None, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.response = response
|
||||
|
||||
class RequestException(Exception):
|
||||
pass
|
||||
|
||||
mod = MagicMock()
|
||||
mod.Timeout = Timeout
|
||||
mod.HTTPError = HTTPError
|
||||
mod.RequestException = RequestException
|
||||
return mod
|
||||
5
tox.ini
5
tox.ini
@@ -87,6 +87,11 @@ description = Live LLM tests via Ollama (requires running Ollama)
|
||||
commands =
|
||||
pytest tests/ -q --tb=short -m ollama --timeout=120
|
||||
|
||||
[testenv:benchmark]
|
||||
description = Agent performance regression benchmark suite
|
||||
commands =
|
||||
python scripts/run_benchmarks.py {posargs}
|
||||
|
||||
# ── CI / Coverage ────────────────────────────────────────────────────────────
|
||||
|
||||
[testenv:ci]
|
||||
|
||||
Reference in New Issue
Block a user