Compare commits

...

3 Commits

Author SHA1 Message Date
kimi
66dfb353d7 feat: generate daily/weekly agent scorecards (#712)
Implements agent scorecard system that tracks and summarizes agent performance:

- Track issues touched, PRs opened/merged, tests affected, tokens earned/spent
- Generate compact scorecards for daily or weekly periods
- Pattern detection: high/low merge rates, silent workers, token accumulation
- API endpoints for programmatic access (/scorecards/api/*)
- HTML dashboard with HTMX-powered live updates
- Added navigation link in both desktop and mobile menus

New modules:
- dashboard/services/scorecard_service.py: Core scoring logic
- dashboard/routes/scorecards.py: API and HTML routes
- templates/scorecards.html: Dashboard UI
- tests/dashboard/test_scorecards.py: Comprehensive test suite

Refs #712
2026-03-21 16:55:15 -04:00
a95cf806c8 [kimi] Implement token quest system for agents (#713) (#789) 2026-03-21 20:45:35 +00:00
19367d6e41 [kimi] OpenClaw architecture and deployment research report (#721) (#788) 2026-03-21 20:36:23 +00:00
15 changed files with 4361 additions and 0 deletions

178
config/quests.yaml Normal file
View File

@@ -0,0 +1,178 @@
# ── Token Quest System Configuration ─────────────────────────────────────────
#
# Quests are special objectives that agents (and humans) can complete for
# bonus tokens. Each quest has:
# - id: Unique identifier
# - name: Display name
# - description: What the quest requires
# - reward_tokens: Number of tokens awarded on completion
# - criteria: Detection rules for completion
# - enabled: Whether this quest is active
# - repeatable: Whether this quest can be completed multiple times
# - cooldown_hours: Minimum hours between completions (if repeatable)
#
# Quest Types:
# - issue_count: Complete when N issues matching criteria are closed
# - issue_reduce: Complete when open issue count drops by N
# - docs_update: Complete when documentation files are updated
# - test_improve: Complete when test coverage/cases improve
# - daily_run: Complete Daily Run session objectives
# - custom: Special quests with manual completion
#
# ── Active Quests ─────────────────────────────────────────────────────────────
quests:
# ── Daily Run & Test Improvement Quests ───────────────────────────────────
close_flaky_tests:
id: close_flaky_tests
name: Flaky Test Hunter
description: Close 3 issues labeled "flaky-test"
reward_tokens: 150
type: issue_count
enabled: true
repeatable: true
cooldown_hours: 24
criteria:
issue_labels:
- flaky-test
target_count: 3
issue_state: closed
lookback_days: 7
notification_message: "Quest Complete! You closed 3 flaky-test issues and earned {tokens} tokens."
reduce_p1_issues:
id: reduce_p1_issues
name: Priority Firefighter
description: Reduce open P1 Daily Run issues by 2
reward_tokens: 200
type: issue_reduce
enabled: true
repeatable: true
cooldown_hours: 48
criteria:
issue_labels:
- layer:triage
- P1
target_reduction: 2
lookback_days: 3
notification_message: "Quest Complete! You reduced P1 issues by 2 and earned {tokens} tokens."
improve_test_coverage:
id: improve_test_coverage
name: Coverage Champion
description: Improve test coverage by 5% or add 10 new test cases
reward_tokens: 300
type: test_improve
enabled: true
repeatable: false
criteria:
coverage_increase_percent: 5
min_new_tests: 10
notification_message: "Quest Complete! You improved test coverage and earned {tokens} tokens."
complete_daily_run_session:
id: complete_daily_run_session
name: Daily Runner
description: Successfully complete 5 Daily Run sessions in a week
reward_tokens: 250
type: daily_run
enabled: true
repeatable: true
cooldown_hours: 168 # 1 week
criteria:
min_sessions: 5
lookback_days: 7
notification_message: "Quest Complete! You completed 5 Daily Run sessions and earned {tokens} tokens."
# ── Documentation & Maintenance Quests ────────────────────────────────────
improve_automation_docs:
id: improve_automation_docs
name: Documentation Hero
description: Improve documentation for automations (update 3+ doc files)
reward_tokens: 100
type: docs_update
enabled: true
repeatable: true
cooldown_hours: 72
criteria:
file_patterns:
- "docs/**/*.md"
- "**/README.md"
- "timmy_automations/**/*.md"
min_files_changed: 3
lookback_days: 7
notification_message: "Quest Complete! You improved automation docs and earned {tokens} tokens."
close_micro_fixes:
id: close_micro_fixes
name: Micro Fix Master
description: Close 5 issues labeled "layer:micro-fix"
reward_tokens: 125
type: issue_count
enabled: true
repeatable: true
cooldown_hours: 24
criteria:
issue_labels:
- layer:micro-fix
target_count: 5
issue_state: closed
lookback_days: 7
notification_message: "Quest Complete! You closed 5 micro-fix issues and earned {tokens} tokens."
# ── Special Achievements ──────────────────────────────────────────────────
first_contribution:
id: first_contribution
name: First Steps
description: Make your first contribution (close any issue)
reward_tokens: 50
type: issue_count
enabled: true
repeatable: false
criteria:
target_count: 1
issue_state: closed
lookback_days: 30
notification_message: "Welcome! You completed your first contribution and earned {tokens} tokens."
bug_squasher:
id: bug_squasher
name: Bug Squasher
description: Close 10 issues labeled "bug"
reward_tokens: 500
type: issue_count
enabled: true
repeatable: true
cooldown_hours: 168 # 1 week
criteria:
issue_labels:
- bug
target_count: 10
issue_state: closed
lookback_days: 7
notification_message: "Quest Complete! You squashed 10 bugs and earned {tokens} tokens."
# ── Quest System Settings ───────────────────────────────────────────────────
settings:
# Enable/disable quest notifications
notifications_enabled: true
# Maximum number of concurrent active quests per agent
max_concurrent_quests: 5
# Auto-detect quest completions on Daily Run metrics update
auto_detect_on_daily_run: true
# Gitea issue labels that indicate quest-related work
quest_work_labels:
- layer:triage
- layer:micro-fix
- layer:tests
- layer:economy
- flaky-test
- bug
- documentation

View File

@@ -0,0 +1,912 @@
# OpenClaw Architecture, Deployment Modes, and Ollama Integration
## Research Report for Timmy Time Dashboard Project
**Issue:** #721 — [Kimi Research] OpenClaw architecture, deployment modes, and Ollama integration
**Date:** 2026-03-21
**Author:** Kimi (Moonshot AI)
**Status:** Complete
---
## Executive Summary
OpenClaw is an open-source AI agent framework that bridges messaging platforms (WhatsApp, Telegram, Slack, Discord, iMessage) to AI coding agents through a centralized gateway. Originally known as Clawdbot and Moltbot, it was rebranded to OpenClaw in early 2026. This report provides a comprehensive analysis of OpenClaw's architecture, deployment options, Ollama integration capabilities, and suitability for deployment on resource-constrained VPS environments like the Hermes DigitalOcean droplet (2GB RAM / 1 vCPU).
**Key Finding:** Running OpenClaw with local LLMs on a 2GB RAM VPS is **not recommended**. The absolute minimum for a text-only agent with external API models is 4GB RAM. For local model inference via Ollama, 8-16GB RAM is the practical minimum. A hybrid approach using OpenRouter as the primary provider with Ollama as fallback is the most viable configuration for small VPS deployments.
---
## 1. Architecture Overview
### 1.1 Core Components
OpenClaw follows a **hub-and-spoke (轴辐式)** architecture optimized for multi-agent task execution:
```
┌─────────────────────────────────────────────────────────────────────────┐
│ OPENCLAW ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ WhatsApp │ │ Telegram │ │ Discord │ │
│ │ Channel │ │ Channel │ │ Channel │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └────────────────────┼────────────────────┘ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Gateway │◄─────── WebSocket/API │
│ │ (Port 18789) │ Control Plane │
│ └────────┬─────────┘ │
│ │ │
│ ┌──────────────┼──────────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Agent A │ │ Agent B │ │ Pi Agent│ │
│ │ (main) │ │ (coder) │ │(delegate)│ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ ▼ │
│ ┌────────────────────────┐ │
│ │ LLM Router │ │
│ │ (Primary/Fallback) │ │
│ └───────────┬────────────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Ollama │ │ OpenAI │ │Anthropic│ │
│ │(local) │ │(cloud) │ │(cloud) │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │ ┌─────┐ │
│ └────────────────────────────────────────────────────►│ MCP │ │
│ │Tools│ │
│ └─────┘ │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Memory │ │ Skills │ │ Workspace │ │
│ │ (SOUL.md) │ │ (SKILL.md) │ │ (sessions) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
### 1.2 Component Deep Dive
| Component | Purpose | Configuration File |
|-----------|---------|-------------------|
| **Gateway** | Central control plane, WebSocket/API server, session management | `gateway` section in `openclaw.json` |
| **Pi Agent** | Core agent runner, "指挥中心" - schedules LLM calls, tool execution, error handling | `agents` section in `openclaw.json` |
| **Channels** | Messaging platform integrations (Telegram, WhatsApp, Slack, Discord, iMessage) | `channels` section in `openclaw.json` |
| **SOUL.md** | Agent persona definition - personality, communication style, behavioral guidelines | `~/.openclaw/workspace/SOUL.md` |
| **AGENTS.md** | Multi-agent configuration, routing rules, agent specialization definitions | `~/.openclaw/workspace/AGENTS.md` |
| **Workspace** | File system for agent state, session data, temporary files | `~/.openclaw/workspace/` |
| **Skills** | Bundled tools, prompts, configurations that teach agents specific tasks | `~/.openclaw/workspace/skills/` |
| **Sessions** | Conversation history, context persistence between interactions | `~/.openclaw/agents/<agent>/sessions/` |
| **MCP Tools** | Model Context Protocol integration for external tool access | Via `mcporter` or native MCP |
### 1.3 Agent Runner Execution Flow
According to OpenClaw documentation, a complete agent run follows these stages:
1. **Queuing** - Session-level queue (serializes same-session requests) → Global queue (controls total concurrency)
2. **Preparation** - Parse workspace, provider/model, thinking level parameters
3. **Plugin Loading** - Load relevant skills based on task context
4. **Memory Retrieval** - Fetch relevant context from SOUL.md and conversation history
5. **LLM Inference** - Send prompt to configured provider with tool definitions
6. **Tool Execution** - Execute any tool calls returned by the LLM
7. **Response Generation** - Format and return final response to the channel
8. **Memory Storage** - Persist conversation and results to session storage
---
## 2. Deployment Modes
### 2.1 Comparison Matrix
| Deployment Mode | Best For | Setup Complexity | Resource Overhead | Stability |
|----------------|----------|------------------|-------------------|-----------|
| **npm global** | Development, quick testing | Low | Minimal (~200MB) | Moderate |
| **Docker** | Production, isolation, reproducibility | Medium | Higher (~2.5GB base image) | High |
| **Docker Compose** | Multi-service stacks, complex setups | Medium-High | Higher | High |
| **Bare metal/systemd** | Maximum performance, dedicated hardware | High | Minimal | Moderate |
### 2.2 NPM Global Installation (Recommended for Quick Start)
```bash
# One-line installer
curl -fsSL https://openclaw.ai/install.sh | bash
# Or manual npm install
npm install -g openclaw
# Initialize configuration
openclaw onboard
# Start gateway
openclaw gateway
```
**Pros:**
- Fastest setup (~30 seconds)
- Direct access to host resources
- Easy updates via `npm update -g openclaw`
**Cons:**
- Node.js 22+ dependency required
- No process isolation
- Manual dependency management
### 2.3 Docker Deployment (Recommended for Production)
```bash
# Pull and run
docker pull openclaw/openclaw:latest
docker run -d \
--name openclaw \
-p 127.0.0.1:18789:18789 \
-v ~/.openclaw:/root/.openclaw \
-e ANTHROPIC_API_KEY=sk-ant-... \
openclaw/openclaw:latest
# Or with Docker Compose
docker compose -f compose.yml --env-file .env up -d --build
```
**Docker Compose Configuration (production-ready):**
```yaml
version: '3.8'
services:
openclaw:
image: openclaw/openclaw:latest
container_name: openclaw
restart: unless-stopped
ports:
- "127.0.0.1:18789:18789" # Never expose to 0.0.0.0
volumes:
- ./openclaw-data:/root/.openclaw
- ./workspace:/root/.openclaw/workspace
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- OPENROUTER_API_KEY=${OPENROUTER_API_KEY}
- OLLAMA_API_KEY=ollama-local
networks:
- openclaw-net
# Resource limits for small VPS
deploy:
resources:
limits:
cpus: '1.5'
memory: 3G
reservations:
cpus: '0.5'
memory: 1G
networks:
openclaw-net:
driver: bridge
```
### 2.4 Bare Metal / Systemd Installation
For running as a system service on Linux:
```bash
# Create systemd service
sudo tee /etc/systemd/system/openclaw.service > /dev/null <<EOF
[Unit]
Description=OpenClaw Gateway
After=network.target
[Service]
Type=simple
User=openclaw
Group=openclaw
WorkingDirectory=/home/openclaw
Environment="PATH=/usr/local/bin:/usr/bin:/bin"
Environment="NODE_ENV=production"
Environment="ANTHROPIC_API_KEY=sk-ant-..."
ExecStart=/usr/local/bin/openclaw gateway
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
sudo systemctl daemon-reload
sudo systemctl enable openclaw
sudo systemctl start openclaw
```
### 2.5 Recommended Deployment for 2GB RAM VPS
**⚠️ Critical Finding:** OpenClaw's official minimum is 4GB RAM. On a 2GB VPS:
1. **Do NOT run local LLMs** - Use external API providers exclusively
2. **Use npm installation** - Docker overhead is too heavy
3. **Disable browser automation** - Chromium requires 2-4GB alone
4. **Enable swap** - Critical for preventing OOM kills
5. **Use OpenRouter** - Cheap/free tier models reduce costs
**Setup script for 2GB VPS:**
```bash
#!/bin/bash
# openclaw-minimal-vps.sh
# Setup for 2GB RAM VPS - EXTERNAL API ONLY
# Create 4GB swap
sudo fallocate -l 4G /swapfile
sudo chmod 600 /swapfile
sudo mkswap /swapfile
sudo swapon /swapfile
echo '/swapfile none swap sw 0 0' | sudo tee -a /etc/fstab
# Install Node.js 22
curl -fsSL https://deb.nodesource.com/setup_22.x | sudo bash -
sudo apt-get install -y nodejs
# Install OpenClaw
npm install -g openclaw
# Configure for minimal resource usage
mkdir -p ~/.openclaw
cat > ~/.openclaw/openclaw.json <<'EOF'
{
"gateway": {
"bind": "127.0.0.1",
"port": 18789,
"mode": "local"
},
"agents": {
"defaults": {
"model": {
"primary": "openrouter/google/gemma-3-4b-it:free",
"fallbacks": [
"openrouter/meta/llama-3.1-8b-instruct:free"
]
},
"maxIterations": 15,
"timeout": 120
}
},
"channels": {
"telegram": {
"enabled": true,
"dmPolicy": "pairing"
}
}
}
EOF
# Set OpenRouter API key
export OPENROUTER_API_KEY="sk-or-v1-..."
# Start gateway
openclaw gateway &
```
---
## 3. Ollama Integration
### 3.1 Architecture
OpenClaw integrates with Ollama through its native `/api/chat` endpoint, supporting both streaming responses and tool calling simultaneously:
```
┌──────────────┐ HTTP/JSON ┌──────────────┐ GGUF/CPU/GPU ┌──────────┐
│ OpenClaw │◄───────────────────►│ Ollama │◄────────────────────►│ Local │
│ Gateway │ /api/chat │ Server │ Model inference │ LLM │
│ │ Port 11434 │ Port 11434 │ │ │
└──────────────┘ └──────────────┘ └──────────┘
```
### 3.2 Configuration
**Basic Ollama Setup:**
```bash
# Install Ollama
curl -fsSL https://ollama.com/install.sh | sh
# Start server
ollama serve
# Pull a tool-capable model
ollama pull qwen2.5-coder:7b
ollama pull llama3.1:8b
# Configure OpenClaw
export OLLAMA_API_KEY="ollama-local" # Any non-empty string works
```
**OpenClaw Configuration for Ollama:**
```json
{
"models": {
"providers": {
"ollama": {
"baseUrl": "http://localhost:11434",
"apiKey": "ollama-local",
"api": "ollama",
"models": [
{
"id": "qwen2.5-coder:7b",
"name": "Qwen 2.5 Coder 7B",
"contextWindow": 32768,
"maxTokens": 8192,
"cost": { "input": 0, "output": 0 }
},
{
"id": "llama3.1:8b",
"name": "Llama 3.1 8B",
"contextWindow": 128000,
"maxTokens": 8192,
"cost": { "input": 0, "output": 0 }
}
]
}
}
},
"agents": {
"defaults": {
"model": {
"primary": "ollama/qwen2.5-coder:7b",
"fallbacks": ["ollama/llama3.1:8b"]
}
}
}
}
```
### 3.3 Context Window Requirements
**⚠️ Critical Requirement:** OpenClaw requires a minimum **64K token context window** for reliable multi-step task execution.
| Model | Parameters | Context Window | Tool Support | OpenClaw Compatible |
|-------|-----------|----------------|--------------|---------------------|
| **llama3.1** | 8B | 128K | ✅ Yes | ✅ Yes |
| **qwen2.5-coder** | 7B | 32K | ✅ Yes | ⚠️ Below minimum |
| **qwen2.5-coder** | 32B | 128K | ✅ Yes | ✅ Yes |
| **gpt-oss** | 20B | 128K | ✅ Yes | ✅ Yes |
| **glm-4.7-flash** | - | 128K | ✅ Yes | ✅ Yes |
| **deepseek-coder-v2** | 33B | 128K | ✅ Yes | ✅ Yes |
| **mistral-small3.1** | - | 128K | ✅ Yes | ✅ Yes |
**Context Window Configuration:**
For models that don't report context window via Ollama's API:
```bash
# Create custom Modelfile with extended context
cat > ~/qwen-custom.modelfile <<EOF
FROM qwen2.5-coder:7b
PARAMETER num_ctx 65536
PARAMETER temperature 0.7
EOF
# Create custom model
ollama create qwen2.5-coder-64k -f ~/qwen-custom.modelfile
```
### 3.4 Models for Small VPS (≤8B Parameters)
For resource-constrained environments (2-4GB RAM):
| Model | Quantization | RAM Required | VRAM Required | Performance |
|-------|-------------|--------------|---------------|-------------|
| **Llama 3.1 8B** | Q4_K_M | ~5GB | ~6GB | Good |
| **Llama 3.2 3B** | Q4_K_M | ~2.5GB | ~3GB | Basic |
| **Qwen 2.5 7B** | Q4_K_M | ~5GB | ~6GB | Good |
| **Qwen 2.5 3B** | Q4_K_M | ~2.5GB | ~3GB | Basic |
| **DeepSeek 7B** | Q4_K_M | ~5GB | ~6GB | Good |
| **Phi-4 4B** | Q4_K_M | ~3GB | ~4GB | Moderate |
**⚠️ Verdict for 2GB VPS:** Running local LLMs is **NOT viable**. Use external APIs only.
---
## 4. OpenRouter Integration (Fallback Strategy)
### 4.1 Overview
OpenRouter provides a unified API gateway to multiple LLM providers, enabling:
- Single API key access to 200+ models
- Automatic failover between providers
- Free tier models for cost-conscious deployments
- Unified billing and usage tracking
### 4.2 Configuration
**Environment Variable Setup:**
```bash
export OPENROUTER_API_KEY="sk-or-v1-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
```
**OpenClaw Configuration:**
```json
{
"models": {
"providers": {
"openrouter": {
"apiKey": "${OPENROUTER_API_KEY}",
"baseUrl": "https://openrouter.ai/api/v1"
}
}
},
"agents": {
"defaults": {
"model": {
"primary": "openrouter/anthropic/claude-sonnet-4-6",
"fallbacks": [
"openrouter/google/gemini-3.1-pro",
"openrouter/meta/llama-3.3-70b-instruct",
"openrouter/google/gemma-3-4b-it:free"
]
}
}
}
}
```
### 4.3 Recommended Free/Cheap Models on OpenRouter
For cost-conscious VPS deployments:
| Model | Cost | Context | Best For |
|-------|------|---------|----------|
| **google/gemma-3-4b-it:free** | Free | 128K | General tasks, simple automation |
| **meta/llama-3.1-8b-instruct:free** | Free | 128K | General tasks, longer contexts |
| **deepseek/deepseek-chat-v3.2** | $0.53/M | 64K | Code generation, reasoning |
| **xiaomi/mimo-v2-flash** | $0.40/M | 128K | Fast responses, basic tasks |
| **qwen/qwen3-coder-next** | $1.20/M | 128K | Code-focused tasks |
### 4.4 Hybrid Configuration (Recommended for Timmy)
A production-ready configuration for the Hermes VPS:
```json
{
"models": {
"providers": {
"openrouter": {
"apiKey": "${OPENROUTER_API_KEY}",
"models": [
{
"id": "google/gemma-3-4b-it:free",
"name": "Gemma 3 4B (Free)",
"contextWindow": 131072,
"maxTokens": 8192,
"cost": { "input": 0, "output": 0 }
},
{
"id": "deepseek/deepseek-chat-v3.2",
"name": "DeepSeek V3.2",
"contextWindow": 64000,
"maxTokens": 8192,
"cost": { "input": 0.00053, "output": 0.00053 }
}
]
},
"ollama": {
"baseUrl": "http://localhost:11434",
"apiKey": "ollama-local",
"models": [
{
"id": "llama3.2:3b",
"name": "Llama 3.2 3B (Local Fallback)",
"contextWindow": 128000,
"maxTokens": 4096,
"cost": { "input": 0, "output": 0 }
}
]
}
}
},
"agents": {
"defaults": {
"model": {
"primary": "openrouter/google/gemma-3-4b-it:free",
"fallbacks": [
"openrouter/deepseek/deepseek-chat-v3.2",
"ollama/llama3.2:3b"
]
},
"maxIterations": 10,
"timeout": 90
}
}
}
```
---
## 5. Hardware Constraints & VPS Viability
### 5.1 System Requirements Summary
| Component | Minimum | Recommended | Notes |
|-----------|---------|-------------|-------|
| **CPU** | 2 vCPU | 4 vCPU | Dedicated preferred over shared |
| **RAM** | 4 GB | 8 GB | 2GB causes OOM with external APIs |
| **Storage** | 40 GB SSD | 80 GB NVMe | Docker images are ~10-15GB |
| **Network** | 100 Mbps | 1 Gbps | For API calls and model downloads |
| **OS** | Ubuntu 22.04/Debian 12 | Ubuntu 24.04 LTS | Linux required for production |
### 5.2 2GB RAM VPS Analysis
**Can it work?** Yes, with severe limitations:
**What works:**
- Text-only agents with external API providers
- Single Telegram/Discord channel
- Basic file operations and shell commands
- No browser automation
**What doesn't work:**
- Local LLM inference via Ollama
- Browser automation (Chromium needs 2-4GB)
- Multiple concurrent channels
- Python environment-heavy skills
**Required mitigations for 2GB VPS:**
```bash
# 1. Create substantial swap
sudo fallocate -l 4G /swapfile
sudo chmod 600 /swapfile
sudo mkswap /swapfile
sudo swapon /swapfile
# 2. Configure swappiness
echo 'vm.swappiness=60' | sudo tee -a /etc/sysctl.conf
sudo sysctl -p
# 3. Limit Node.js memory
export NODE_OPTIONS="--max-old-space-size=1536"
# 4. Use external APIs only - NO OLLAMA
# 5. Disable browser skills
# 6. Set conservative concurrency limits
```
### 5.3 4-bit Quantization Viability
**Qwen 2.5 7B Q4_K_M on 2GB VPS:**
- Model size: ~4.5GB
- RAM required at runtime: ~5-6GB
- **Verdict:** Will cause immediate OOM on 2GB VPS
- **Even with 4GB VPS:** Marginal, heavy swap usage, poor performance
**Viable models for 4GB VPS with Ollama:**
- Llama 3.2 3B Q4_K_M (~2.5GB RAM)
- Qwen 2.5 3B Q4_K_M (~2.5GB RAM)
- Phi-4 4B Q4_K_M (~3GB RAM)
---
## 6. Security Configuration
### 6.1 Network Ports
| Port | Purpose | Exposure |
|------|---------|----------|
| **18789/tcp** | OpenClaw Gateway (WebSocket/HTTP) | **NEVER expose to internet** |
| **11434/tcp** | Ollama API (if running locally) | Localhost only |
| **22/tcp** | SSH | Restrict to known IPs |
**⚠️ CRITICAL:** Never expose port 18789 to the public internet. Use Tailscale or SSH tunnels for remote access.
### 6.2 Tailscale Integration
Tailscale provides zero-configuration VPN mesh for secure remote access:
```bash
# Install Tailscale
curl -fsSL https://tailscale.com/install.sh | sh
sudo tailscale up
# Get Tailscale IP
tailscale ip
# Returns: 100.x.y.z
# Configure OpenClaw to bind to Tailscale
cat > ~/.openclaw/openclaw.json <<EOF
{
"gateway": {
"bind": "tailnet",
"port": 18789
},
"tailscale": {
"mode": "on",
"resetOnExit": false
}
}
EOF
```
**Tailscale vs SSH Tunnel:**
| Feature | Tailscale | SSH Tunnel |
|---------|-----------|------------|
| Setup | Very easy | Moderate |
| Persistence | Automatic | Requires autossh |
| Multiple devices | Built-in | One tunnel per connection |
| NAT traversal | Works | Requires exposed SSH |
| Access control | Tailscale ACL | SSH keys |
### 6.3 Firewall Configuration (UFW)
```bash
# Default deny
sudo ufw default deny incoming
sudo ufw default allow outgoing
# Allow SSH
sudo ufw allow 22/tcp
# Allow Tailscale only (if using)
sudo ufw allow in on tailscale0 to any port 18789
# Block public access to OpenClaw
# (bind is 127.0.0.1, so this is defense in depth)
sudo ufw enable
```
### 6.4 Authentication Configuration
```json
{
"gateway": {
"bind": "127.0.0.1",
"port": 18789,
"auth": {
"mode": "token",
"token": "your-64-char-hex-token-here"
},
"controlUi": {
"allowedOrigins": [
"http://localhost:18789",
"https://your-domain.tailnet-name.ts.net"
],
"allowInsecureAuth": false,
"dangerouslyDisableDeviceAuth": false
}
}
}
```
**Generate secure token:**
```bash
openssl rand -hex 32
```
### 6.5 Sandboxing Considerations
OpenClaw executes arbitrary shell commands and file operations by default. For production:
1. **Run as non-root user:**
```bash
sudo useradd -r -s /bin/false openclaw
sudo mkdir -p /home/openclaw/.openclaw
sudo chown -R openclaw:openclaw /home/openclaw
```
2. **Use Docker for isolation:**
```bash
docker run --security-opt=no-new-privileges \
--cap-drop=ALL \
--read-only \
--tmpfs /tmp:noexec,nosuid,size=100m \
openclaw/openclaw:latest
```
3. **Enable dmPolicy for channels:**
```json
{
"channels": {
"telegram": {
"dmPolicy": "pairing" // Require one-time code for new contacts
}
}
}
```
---
## 7. MCP (Model Context Protocol) Tools
### 7.1 Overview
MCP is an open standard created by Anthropic (donated to Linux Foundation in Dec 2025) that lets AI applications connect to external tools through a universal interface. Think of it as "USB-C for AI."
### 7.2 MCP vs OpenClaw Skills
| Aspect | MCP | OpenClaw Skills |
|--------|-----|-----------------|
| **Protocol** | Standardized (Anthropic) | OpenClaw-specific |
| **Isolation** | Process-isolated | Runs in agent context |
| **Security** | Higher (sandboxed) | Lower (full system access) |
| **Discovery** | Automatic via protocol | Manual via SKILL.md |
| **Ecosystem** | 10,000+ servers | 5400+ skills |
**Note:** OpenClaw currently has limited native MCP support. Use `mcporter` tool for MCP integration.
### 7.3 Using MCPorter (MCP Bridge)
```bash
# Install mcporter
clawhub install mcporter
# Configure MCP server
mcporter config add github \
--url "https://api.github.com/mcp" \
--token "ghp_..."
# List available tools
mcporter list
# Call MCP tool
mcporter call github.list_repos --owner "rockachopa"
```
### 7.4 Popular MCP Servers
| Server | Purpose | Integration |
|--------|---------|-------------|
| **GitHub** | Repo management, PRs, issues | `mcp-github` |
| **Slack** | Messaging, channel management | `mcp-slack` |
| **PostgreSQL** | Database queries | `mcp-postgres` |
| **Filesystem** | File operations (sandboxed) | `mcp-filesystem` |
| **Brave Search** | Web search | `mcp-brave` |
---
## 8. Recommendations for Timmy Time Dashboard
### 8.1 Deployment Strategy for Hermes VPS (2GB RAM)
Given the hardware constraints, here's the recommended approach:
**Option A: External API Only (Recommended)**
```
┌─────────────────────────────────────────┐
│ Hermes VPS (2GB RAM) │
│ ┌─────────────────────────────────┐ │
│ │ OpenClaw Gateway │ │
│ │ (npm global install) │ │
│ └─────────────┬───────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ OpenRouter API (Free Tier) │ │
│ │ google/gemma-3-4b-it:free │ │
│ └─────────────────────────────────┘ │
│ │
│ NO OLLAMA - insufficient RAM │
└─────────────────────────────────────────┘
```
**Option B: Hybrid with External Ollama**
```
┌──────────────────────┐ ┌──────────────────────────┐
│ Hermes VPS (2GB) │ │ Separate Ollama Host │
│ ┌────────────────┐ │ │ ┌────────────────────┐ │
│ │ OpenClaw │ │◄────►│ │ Ollama Server │ │
│ │ (external API) │ │ │ │ (8GB+ RAM required)│ │
│ └────────────────┘ │ │ └────────────────────┘ │
└──────────────────────┘ └──────────────────────────┘
```
### 8.2 Configuration Summary
```json
{
"gateway": {
"bind": "127.0.0.1",
"port": 18789,
"auth": {
"mode": "token",
"token": "GENERATE_WITH_OPENSSL_RAND"
}
},
"models": {
"providers": {
"openrouter": {
"apiKey": "${OPENROUTER_API_KEY}",
"models": [
{
"id": "google/gemma-3-4b-it:free",
"contextWindow": 131072,
"maxTokens": 4096
}
]
}
}
},
"agents": {
"defaults": {
"model": {
"primary": "openrouter/google/gemma-3-4b-it:free"
},
"maxIterations": 10,
"timeout": 90,
"maxConcurrent": 2
}
},
"channels": {
"telegram": {
"enabled": true,
"dmPolicy": "pairing"
}
}
}
```
### 8.3 Migration Path (Future)
When upgrading to a larger VPS (4-8GB RAM):
1. **Phase 1:** Enable Ollama with Llama 3.2 3B as fallback
2. **Phase 2:** Add browser automation skills (requires 4GB+ RAM)
3. **Phase 3:** Enable multi-agent routing with specialized agents
4. **Phase 4:** Add MCP server integration for external tools
---
## 9. References
1. OpenClaw Official Documentation: https://docs.openclaw.ai
2. Ollama Integration Guide: https://docs.ollama.com/integrations/openclaw
3. OpenRouter Documentation: https://openrouter.ai/docs
4. MCP Specification: https://modelcontextprotocol.io
5. OpenClaw Community Discord: https://discord.gg/openclaw
6. GitHub Repository: https://github.com/openclaw/openclaw
---
## 10. Appendix: Quick Command Reference
```bash
# Installation
curl -fsSL https://openclaw.ai/install.sh | bash
# Configuration
openclaw onboard # Interactive setup
openclaw configure # Edit config
openclaw config set <key> <value> # Set specific value
# Gateway management
openclaw gateway # Start gateway
openclaw gateway --verbose # Start with logs
openclaw gateway status # Check status
openclaw gateway restart # Restart gateway
openclaw gateway stop # Stop gateway
# Model management
openclaw models list # List available models
openclaw models set <model> # Set default model
openclaw models status # Check model status
# Diagnostics
openclaw doctor # System health check
openclaw doctor --repair # Auto-fix issues
openclaw security audit # Security check
# Dashboard
openclaw dashboard # Open web UI
```
---
*End of Research Report*

View File

@@ -43,6 +43,8 @@ from dashboard.routes.memory import router as memory_router
from dashboard.routes.mobile import router as mobile_router
from dashboard.routes.models import api_router as models_api_router
from dashboard.routes.models import router as models_router
from dashboard.routes.quests import router as quests_router
from dashboard.routes.scorecards import router as scorecards_router
from dashboard.routes.spark import router as spark_router
from dashboard.routes.system import router as system_router
from dashboard.routes.tasks import router as tasks_router
@@ -627,6 +629,8 @@ app.include_router(world_router)
app.include_router(matrix_router)
app.include_router(tower_router)
app.include_router(daily_run_router)
app.include_router(quests_router)
app.include_router(scorecards_router)
@app.websocket("/ws")

View File

@@ -365,6 +365,15 @@ async def daily_run_metrics_api(lookback_days: int = 7):
status_code=503,
)
# Check for quest completions based on Daily Run metrics
quest_rewards = []
try:
from dashboard.routes.quests import check_daily_run_quests
quest_rewards = await check_daily_run_quests(agent_id="system")
except Exception as exc:
logger.debug("Quest checking failed: %s", exc)
return JSONResponse(
{
"status": "ok",
@@ -389,6 +398,7 @@ async def daily_run_metrics_api(lookback_days: int = 7):
"previous": metrics.total_touched_previous,
},
"generated_at": metrics.generated_at,
"quest_rewards": quest_rewards,
}
)

View File

@@ -0,0 +1,377 @@
"""Quest system routes for agent token rewards.
Provides API endpoints for:
- Listing quests and their status
- Claiming quest rewards
- Getting quest leaderboard
- Quest progress tracking
"""
from __future__ import annotations
import logging
from typing import Any
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse, JSONResponse
from pydantic import BaseModel
from dashboard.templating import templates
from timmy.quest_system import (
QuestStatus,
auto_evaluate_all_quests,
claim_quest_reward,
evaluate_quest_progress,
get_active_quests,
get_agent_quests_status,
get_quest_definition,
get_quest_leaderboard,
load_quest_config,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/quests", tags=["quests"])
class ClaimQuestRequest(BaseModel):
"""Request to claim a quest reward."""
agent_id: str
quest_id: str
class EvaluateQuestRequest(BaseModel):
"""Request to manually evaluate quest progress."""
agent_id: str
quest_id: str
# ---------------------------------------------------------------------------
# API Endpoints
# ---------------------------------------------------------------------------
@router.get("/api/definitions")
async def get_quest_definitions_api() -> JSONResponse:
"""Get all quest definitions.
Returns:
JSON list of all quest definitions with their criteria.
"""
definitions = get_active_quests()
return JSONResponse(
{
"quests": [
{
"id": q.id,
"name": q.name,
"description": q.description,
"reward_tokens": q.reward_tokens,
"type": q.quest_type.value,
"repeatable": q.repeatable,
"cooldown_hours": q.cooldown_hours,
"criteria": q.criteria,
}
for q in definitions
]
}
)
@router.get("/api/status/{agent_id}")
async def get_agent_quest_status(agent_id: str) -> JSONResponse:
"""Get quest status for a specific agent.
Returns:
Complete quest status including progress, completion counts,
and tokens earned.
"""
status = get_agent_quests_status(agent_id)
return JSONResponse(status)
@router.post("/api/claim")
async def claim_quest_reward_api(request: ClaimQuestRequest) -> JSONResponse:
"""Claim a quest reward for an agent.
The quest must be completed but not yet claimed.
"""
reward = claim_quest_reward(request.quest_id, request.agent_id)
if not reward:
return JSONResponse(
{
"success": False,
"error": "Quest not completed, already claimed, or on cooldown",
},
status_code=400,
)
return JSONResponse(
{
"success": True,
"reward": reward,
}
)
@router.post("/api/evaluate")
async def evaluate_quest_api(request: EvaluateQuestRequest) -> JSONResponse:
"""Manually evaluate quest progress with provided context.
This is useful for testing or when the quest completion
needs to be triggered manually.
"""
quest = get_quest_definition(request.quest_id)
if not quest:
return JSONResponse(
{"success": False, "error": "Quest not found"},
status_code=404,
)
# Build evaluation context based on quest type
context = await _build_evaluation_context(quest)
progress = evaluate_quest_progress(request.quest_id, request.agent_id, context)
if not progress:
return JSONResponse(
{"success": False, "error": "Failed to evaluate quest"},
status_code=500,
)
# Auto-claim if completed
reward = None
if progress.status == QuestStatus.COMPLETED:
reward = claim_quest_reward(request.quest_id, request.agent_id)
return JSONResponse(
{
"success": True,
"progress": progress.to_dict(),
"reward": reward,
"completed": progress.status == QuestStatus.COMPLETED,
}
)
@router.get("/api/leaderboard")
async def get_leaderboard_api() -> JSONResponse:
"""Get the quest completion leaderboard.
Returns agents sorted by total tokens earned.
"""
leaderboard = get_quest_leaderboard()
return JSONResponse(
{
"leaderboard": leaderboard,
}
)
@router.post("/api/reload")
async def reload_quest_config_api() -> JSONResponse:
"""Reload quest configuration from quests.yaml.
Useful for applying quest changes without restarting.
"""
definitions, quest_settings = load_quest_config()
return JSONResponse(
{
"success": True,
"quests_loaded": len(definitions),
"settings": quest_settings,
}
)
# ---------------------------------------------------------------------------
# Dashboard UI Endpoints
# ---------------------------------------------------------------------------
@router.get("", response_class=HTMLResponse)
async def quests_dashboard(request: Request) -> HTMLResponse:
"""Main quests dashboard page."""
return templates.TemplateResponse(
request,
"quests.html",
{"agent_id": "current_user"},
)
@router.get("/panel/{agent_id}", response_class=HTMLResponse)
async def quests_panel(request: Request, agent_id: str) -> HTMLResponse:
"""Quest panel for HTMX partial updates."""
status = get_agent_quests_status(agent_id)
return templates.TemplateResponse(
request,
"partials/quests_panel.html",
{
"agent_id": agent_id,
"quests": status["quests"],
"total_tokens": status["total_tokens_earned"],
"completed_count": status["total_quests_completed"],
},
)
# ---------------------------------------------------------------------------
# Internal Functions
# ---------------------------------------------------------------------------
async def _build_evaluation_context(quest) -> dict[str, Any]:
"""Build evaluation context for a quest based on its type."""
context: dict[str, Any] = {}
if quest.quest_type.value == "issue_count":
# Fetch closed issues with relevant labels
context["closed_issues"] = await _fetch_closed_issues(
quest.criteria.get("issue_labels", [])
)
elif quest.quest_type.value == "issue_reduce":
# Fetch current and previous issue counts
labels = quest.criteria.get("issue_labels", [])
context["current_issue_count"] = await _fetch_open_issue_count(labels)
context["previous_issue_count"] = await _fetch_previous_issue_count(
labels, quest.criteria.get("lookback_days", 7)
)
elif quest.quest_type.value == "daily_run":
# Fetch Daily Run metrics
metrics = await _fetch_daily_run_metrics()
context["sessions_completed"] = metrics.get("sessions_completed", 0)
return context
async def _fetch_closed_issues(labels: list[str]) -> list[dict]:
"""Fetch closed issues matching the given labels."""
try:
from dashboard.routes.daily_run import GiteaClient, _load_config
config = _load_config()
token = _get_gitea_token(config)
client = GiteaClient(config, token)
if not client.is_available():
return []
# Build label filter
label_filter = ",".join(labels) if labels else ""
issues = client.get_paginated(
"issues",
{"state": "closed", "labels": label_filter, "limit": 100},
)
return issues
except Exception as exc:
logger.debug("Failed to fetch closed issues: %s", exc)
return []
async def _fetch_open_issue_count(labels: list[str]) -> int:
"""Fetch count of open issues with given labels."""
try:
from dashboard.routes.daily_run import GiteaClient, _load_config
config = _load_config()
token = _get_gitea_token(config)
client = GiteaClient(config, token)
if not client.is_available():
return 0
label_filter = ",".join(labels) if labels else ""
issues = client.get_paginated(
"issues",
{"state": "open", "labels": label_filter, "limit": 100},
)
return len(issues)
except Exception as exc:
logger.debug("Failed to fetch open issue count: %s", exc)
return 0
async def _fetch_previous_issue_count(labels: list[str], lookback_days: int) -> int:
"""Fetch previous issue count (simplified - uses current for now)."""
# This is a simplified implementation
# In production, you'd query historical data
return await _fetch_open_issue_count(labels)
async def _fetch_daily_run_metrics() -> dict[str, Any]:
"""Fetch Daily Run metrics."""
try:
from dashboard.routes.daily_run import _get_metrics
metrics = _get_metrics(lookback_days=7)
if metrics:
return {
"sessions_completed": metrics.sessions_completed,
"sessions_previous": metrics.sessions_previous,
}
except Exception as exc:
logger.debug("Failed to fetch Daily Run metrics: %s", exc)
return {"sessions_completed": 0, "sessions_previous": 0}
def _get_gitea_token(config: dict) -> str | None:
"""Get Gitea token from config."""
if "token" in config:
return config["token"]
from pathlib import Path
token_file = Path(config.get("token_file", "~/.hermes/gitea_token")).expanduser()
if token_file.exists():
return token_file.read_text().strip()
return None
# ---------------------------------------------------------------------------
# Daily Run Integration
# ---------------------------------------------------------------------------
async def check_daily_run_quests(agent_id: str = "system") -> list[dict]:
"""Check and award Daily Run related quests.
Called by the Daily Run system when metrics are updated.
Returns:
List of rewards awarded
"""
# Check if auto-detect is enabled
_, quest_settings = load_quest_config()
if not quest_settings.get("auto_detect_on_daily_run", True):
return []
# Build context from Daily Run metrics
metrics = await _fetch_daily_run_metrics()
context = {
"sessions_completed": metrics.get("sessions_completed", 0),
"sessions_previous": metrics.get("sessions_previous", 0),
}
# Add closed issues for issue_count quests
active_quests = get_active_quests()
for quest in active_quests:
if quest.quest_type.value == "issue_count":
labels = quest.criteria.get("issue_labels", [])
context["closed_issues"] = await _fetch_closed_issues(labels)
break # Only need to fetch once
# Evaluate all quests
rewards = auto_evaluate_all_quests(agent_id, context)
return rewards

View File

@@ -0,0 +1,353 @@
"""Agent scorecard routes — API endpoints for generating and viewing scorecards."""
from __future__ import annotations
import logging
from datetime import datetime
from fastapi import APIRouter, Query, Request
from fastapi.responses import HTMLResponse, JSONResponse
from dashboard.services.scorecard_service import (
PeriodType,
generate_all_scorecards,
generate_scorecard,
get_tracked_agents,
)
from dashboard.templating import templates
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/scorecards", tags=["scorecards"])
def _format_period_label(period_type: PeriodType) -> str:
"""Format a period type for display."""
return "Daily" if period_type == PeriodType.daily else "Weekly"
@router.get("/api/agents")
async def list_tracked_agents() -> dict[str, list[str]]:
"""Return the list of tracked agent IDs.
Returns:
Dict with "agents" key containing list of agent IDs
"""
return {"agents": get_tracked_agents()}
@router.get("/api/{agent_id}")
async def get_agent_scorecard(
agent_id: str,
period: str = Query(default="daily", description="Period type: 'daily' or 'weekly'"),
) -> JSONResponse:
"""Generate a scorecard for a specific agent.
Args:
agent_id: The agent ID (e.g., 'kimi', 'claude')
period: 'daily' or 'weekly' (default: daily)
Returns:
JSON response with scorecard data
"""
try:
period_type = PeriodType(period.lower())
except ValueError:
return JSONResponse(
status_code=400,
content={"error": f"Invalid period '{period}'. Use 'daily' or 'weekly'."},
)
try:
scorecard = generate_scorecard(agent_id, period_type)
if scorecard is None:
return JSONResponse(
status_code=404,
content={"error": f"No scorecard found for agent '{agent_id}'"},
)
return JSONResponse(content=scorecard.to_dict())
except Exception as exc:
logger.error("Failed to generate scorecard for %s: %s", agent_id, exc)
return JSONResponse(
status_code=500,
content={"error": f"Failed to generate scorecard: {str(exc)}"},
)
@router.get("/api")
async def get_all_scorecards(
period: str = Query(default="daily", description="Period type: 'daily' or 'weekly'"),
) -> JSONResponse:
"""Generate scorecards for all tracked agents.
Args:
period: 'daily' or 'weekly' (default: daily)
Returns:
JSON response with list of scorecard data
"""
try:
period_type = PeriodType(period.lower())
except ValueError:
return JSONResponse(
status_code=400,
content={"error": f"Invalid period '{period}'. Use 'daily' or 'weekly'."},
)
try:
scorecards = generate_all_scorecards(period_type)
return JSONResponse(
content={
"period": period_type.value,
"scorecards": [s.to_dict() for s in scorecards],
"count": len(scorecards),
}
)
except Exception as exc:
logger.error("Failed to generate scorecards: %s", exc)
return JSONResponse(
status_code=500,
content={"error": f"Failed to generate scorecards: {str(exc)}"},
)
@router.get("", response_class=HTMLResponse)
async def scorecards_page(request: Request) -> HTMLResponse:
"""Render the scorecards dashboard page.
Returns:
HTML page with scorecard interface
"""
agents = get_tracked_agents()
return templates.TemplateResponse(
request,
"scorecards.html",
{
"agents": agents,
"periods": ["daily", "weekly"],
},
)
@router.get("/panel/{agent_id}", response_class=HTMLResponse)
async def agent_scorecard_panel(
request: Request,
agent_id: str,
period: str = Query(default="daily"),
) -> HTMLResponse:
"""Render an individual agent scorecard panel (for HTMX).
Args:
request: The request object
agent_id: The agent ID
period: 'daily' or 'weekly'
Returns:
HTML panel with scorecard content
"""
try:
period_type = PeriodType(period.lower())
except ValueError:
period_type = PeriodType.daily
try:
scorecard = generate_scorecard(agent_id, period_type)
if scorecard is None:
return HTMLResponse(
content=f"""
<div class="card mc-panel">
<h5 class="card-title">{agent_id.title()}</h5>
<p class="text-muted">No activity recorded for this period.</p>
</div>
""",
status_code=200,
)
data = scorecard.to_dict()
# Build patterns HTML
patterns_html = ""
if data["patterns"]:
patterns_list = "".join([f"<li>{p}</li>" for p in data["patterns"]])
patterns_html = f"""
<div class="mt-3">
<h6>Patterns</h6>
<ul class="list-unstyled text-info">
{patterns_list}
</ul>
</div>
"""
# Build bullets HTML
bullets_html = "".join([f"<li>{b}</li>" for b in data["narrative_bullets"]])
# Build metrics summary
metrics = data["metrics"]
html_content = f"""
<div class="card mc-panel">
<div class="card-header d-flex justify-content-between align-items-center">
<h5 class="card-title mb-0">{agent_id.title()}</h5>
<span class="badge bg-secondary">{_format_period_label(period_type)}</span>
</div>
<div class="card-body">
<ul class="list-unstyled mb-3">
{bullets_html}
</ul>
<div class="row text-center small">
<div class="col">
<div class="text-muted">PRs</div>
<div class="fw-bold">{metrics["prs_opened"]}/{metrics["prs_merged"]}</div>
<div class="text-muted" style="font-size: 0.75rem;">
{int(metrics["pr_merge_rate"] * 100)}% merged
</div>
</div>
<div class="col">
<div class="text-muted">Issues</div>
<div class="fw-bold">{metrics["issues_touched"]}</div>
</div>
<div class="col">
<div class="text-muted">Tests</div>
<div class="fw-bold">{metrics["tests_affected"]}</div>
</div>
<div class="col">
<div class="text-muted">Tokens</div>
<div class="fw-bold {"text-success" if metrics["token_net"] >= 0 else "text-danger"}">
{"+" if metrics["token_net"] > 0 else ""}{metrics["token_net"]}
</div>
</div>
</div>
{patterns_html}
</div>
</div>
"""
return HTMLResponse(content=html_content)
except Exception as exc:
logger.error("Failed to render scorecard panel for %s: %s", agent_id, exc)
return HTMLResponse(
content=f"""
<div class="card mc-panel border-danger">
<h5 class="card-title">{agent_id.title()}</h5>
<p class="text-danger">Error loading scorecard: {str(exc)}</p>
</div>
""",
status_code=200,
)
@router.get("/all/panels", response_class=HTMLResponse)
async def all_scorecard_panels(
request: Request,
period: str = Query(default="daily"),
) -> HTMLResponse:
"""Render all agent scorecard panels (for HTMX).
Args:
request: The request object
period: 'daily' or 'weekly'
Returns:
HTML with all scorecard panels
"""
try:
period_type = PeriodType(period.lower())
except ValueError:
period_type = PeriodType.daily
try:
scorecards = generate_all_scorecards(period_type)
panels: list[str] = []
for scorecard in scorecards:
data = scorecard.to_dict()
# Build patterns HTML
patterns_html = ""
if data["patterns"]:
patterns_list = "".join([f"<li>{p}</li>" for p in data["patterns"]])
patterns_html = f"""
<div class="mt-3">
<h6>Patterns</h6>
<ul class="list-unstyled text-info">
{patterns_list}
</ul>
</div>
"""
# Build bullets HTML
bullets_html = "".join([f"<li>{b}</li>" for b in data["narrative_bullets"]])
metrics = data["metrics"]
panel_html = f"""
<div class="col-md-6 col-lg-4 mb-3">
<div class="card mc-panel">
<div class="card-header d-flex justify-content-between align-items-center">
<h5 class="card-title mb-0">{scorecard.agent_id.title()}</h5>
<span class="badge bg-secondary">{_format_period_label(period_type)}</span>
</div>
<div class="card-body">
<ul class="list-unstyled mb-3">
{bullets_html}
</ul>
<div class="row text-center small">
<div class="col">
<div class="text-muted">PRs</div>
<div class="fw-bold">{metrics["prs_opened"]}/{metrics["prs_merged"]}</div>
<div class="text-muted" style="font-size: 0.75rem;">
{int(metrics["pr_merge_rate"] * 100)}% merged
</div>
</div>
<div class="col">
<div class="text-muted">Issues</div>
<div class="fw-bold">{metrics["issues_touched"]}</div>
</div>
<div class="col">
<div class="text-muted">Tests</div>
<div class="fw-bold">{metrics["tests_affected"]}</div>
</div>
<div class="col">
<div class="text-muted">Tokens</div>
<div class="fw-bold {"text-success" if metrics["token_net"] >= 0 else "text-danger"}">
{"+" if metrics["token_net"] > 0 else ""}{metrics["token_net"]}
</div>
</div>
</div>
{patterns_html}
</div>
</div>
</div>
"""
panels.append(panel_html)
html_content = f"""
<div class="row">
{"".join(panels)}
</div>
<div class="text-muted small mt-2">
Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC")}
</div>
"""
return HTMLResponse(content=html_content)
except Exception as exc:
logger.error("Failed to render all scorecard panels: %s", exc)
return HTMLResponse(
content=f"""
<div class="alert alert-danger">
Error loading scorecards: {str(exc)}
</div>
""",
status_code=200,
)

View File

@@ -0,0 +1,17 @@
"""Dashboard services for business logic."""
from dashboard.services.scorecard_service import (
PeriodType,
ScorecardSummary,
generate_all_scorecards,
generate_scorecard,
get_tracked_agents,
)
__all__ = [
"PeriodType",
"ScorecardSummary",
"generate_all_scorecards",
"generate_scorecard",
"get_tracked_agents",
]

View File

@@ -0,0 +1,515 @@
"""Agent scorecard service — track and summarize agent performance.
Generates daily/weekly scorecards showing:
- Issues touched, PRs opened/merged
- Tests affected, tokens earned/spent
- Pattern highlights (merge rate, activity quality)
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from enum import StrEnum
from typing import Any
from infrastructure.events.bus import Event, get_event_bus
logger = logging.getLogger(__name__)
# Bot/agent usernames to track
TRACKED_AGENTS = frozenset({"hermes", "kimi", "manus", "claude", "gemini"})
class PeriodType(StrEnum):
daily = "daily"
weekly = "weekly"
@dataclass
class AgentMetrics:
"""Raw metrics collected for an agent over a period."""
agent_id: str
issues_touched: set[int] = field(default_factory=set)
prs_opened: set[int] = field(default_factory=set)
prs_merged: set[int] = field(default_factory=set)
tests_affected: set[str] = field(default_factory=set)
tokens_earned: int = 0
tokens_spent: int = 0
commits: int = 0
comments: int = 0
@property
def pr_merge_rate(self) -> float:
"""Calculate PR merge rate (0.0 - 1.0)."""
opened = len(self.prs_opened)
if opened == 0:
return 0.0
return len(self.prs_merged) / opened
@dataclass
class ScorecardSummary:
"""A generated scorecard with narrative summary."""
agent_id: str
period_type: PeriodType
period_start: datetime
period_end: datetime
metrics: AgentMetrics
narrative_bullets: list[str] = field(default_factory=list)
patterns: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
"""Convert scorecard to dictionary for JSON serialization."""
return {
"agent_id": self.agent_id,
"period_type": self.period_type.value,
"period_start": self.period_start.isoformat(),
"period_end": self.period_end.isoformat(),
"metrics": {
"issues_touched": len(self.metrics.issues_touched),
"prs_opened": len(self.metrics.prs_opened),
"prs_merged": len(self.metrics.prs_merged),
"pr_merge_rate": round(self.metrics.pr_merge_rate, 2),
"tests_affected": len(self.tests_affected),
"commits": self.metrics.commits,
"comments": self.metrics.comments,
"tokens_earned": self.metrics.tokens_earned,
"tokens_spent": self.metrics.tokens_spent,
"token_net": self.metrics.tokens_earned - self.metrics.tokens_spent,
},
"narrative_bullets": self.narrative_bullets,
"patterns": self.patterns,
}
@property
def tests_affected(self) -> set[str]:
"""Alias for metrics.tests_affected."""
return self.metrics.tests_affected
def _get_period_bounds(
period_type: PeriodType, reference_date: datetime | None = None
) -> tuple[datetime, datetime]:
"""Calculate start and end timestamps for a period.
Args:
period_type: daily or weekly
reference_date: The date to calculate from (defaults to now)
Returns:
Tuple of (period_start, period_end) in UTC
"""
if reference_date is None:
reference_date = datetime.now(UTC)
# Normalize to start of day
end = reference_date.replace(hour=0, minute=0, second=0, microsecond=0)
if period_type == PeriodType.daily:
start = end - timedelta(days=1)
else: # weekly
start = end - timedelta(days=7)
return start, end
def _collect_events_for_period(
start: datetime, end: datetime, agent_id: str | None = None
) -> list[Event]:
"""Collect events from the event bus for a time period.
Args:
start: Period start time
end: Period end time
agent_id: Optional agent filter
Returns:
List of matching events
"""
bus = get_event_bus()
events: list[Event] = []
# Query persisted events for relevant types
event_types = [
"gitea.push",
"gitea.issue.opened",
"gitea.issue.comment",
"gitea.pull_request",
"agent.task.completed",
"test.execution",
]
for event_type in event_types:
try:
type_events = bus.replay(
event_type=event_type,
source=agent_id,
limit=1000,
)
events.extend(type_events)
except Exception as exc:
logger.debug("Failed to replay events for %s: %s", event_type, exc)
# Filter by timestamp
filtered = []
for event in events:
try:
event_time = datetime.fromisoformat(event.timestamp.replace("Z", "+00:00"))
if start <= event_time < end:
filtered.append(event)
except (ValueError, AttributeError):
continue
return filtered
def _extract_actor_from_event(event: Event) -> str:
"""Extract the actor/agent from an event."""
# Try data fields first
if "actor" in event.data:
return event.data["actor"]
if "agent_id" in event.data:
return event.data["agent_id"]
# Fall back to source
return event.source
def _is_tracked_agent(actor: str) -> bool:
"""Check if an actor is a tracked agent."""
return actor.lower() in TRACKED_AGENTS
def _aggregate_metrics(events: list[Event]) -> dict[str, AgentMetrics]:
"""Aggregate metrics from events grouped by agent.
Args:
events: List of events to process
Returns:
Dict mapping agent_id -> AgentMetrics
"""
metrics_by_agent: dict[str, AgentMetrics] = {}
for event in events:
actor = _extract_actor_from_event(event)
# Skip non-agent events unless they explicitly have an agent_id
if not _is_tracked_agent(actor) and "agent_id" not in event.data:
continue
if actor not in metrics_by_agent:
metrics_by_agent[actor] = AgentMetrics(agent_id=actor)
metrics = metrics_by_agent[actor]
# Process based on event type
event_type = event.type
if event_type == "gitea.push":
metrics.commits += event.data.get("num_commits", 1)
elif event_type == "gitea.issue.opened":
issue_num = event.data.get("issue_number", 0)
if issue_num:
metrics.issues_touched.add(issue_num)
elif event_type == "gitea.issue.comment":
metrics.comments += 1
issue_num = event.data.get("issue_number", 0)
if issue_num:
metrics.issues_touched.add(issue_num)
elif event_type == "gitea.pull_request":
pr_num = event.data.get("pr_number", 0)
action = event.data.get("action", "")
merged = event.data.get("merged", False)
if pr_num:
if action == "opened":
metrics.prs_opened.add(pr_num)
elif action == "closed" and merged:
metrics.prs_merged.add(pr_num)
# Also count as touched issue for tracking
metrics.issues_touched.add(pr_num)
elif event_type == "agent.task.completed":
# Extract test files from task data
affected = event.data.get("tests_affected", [])
for test in affected:
metrics.tests_affected.add(test)
# Token rewards from task completion
reward = event.data.get("token_reward", 0)
if reward:
metrics.tokens_earned += reward
elif event_type == "test.execution":
# Track test files that were executed
test_files = event.data.get("test_files", [])
for test in test_files:
metrics.tests_affected.add(test)
return metrics_by_agent
def _query_token_transactions(agent_id: str, start: datetime, end: datetime) -> tuple[int, int]:
"""Query the lightning ledger for token transactions.
Args:
agent_id: The agent to query for
start: Period start
end: Period end
Returns:
Tuple of (tokens_earned, tokens_spent)
"""
try:
from lightning.ledger import get_transactions
transactions = get_transactions(limit=1000)
earned = 0
spent = 0
for tx in transactions:
# Filter by agent if specified
if tx.agent_id and tx.agent_id != agent_id:
continue
# Filter by timestamp
try:
tx_time = datetime.fromisoformat(tx.created_at.replace("Z", "+00:00"))
if not (start <= tx_time < end):
continue
except (ValueError, AttributeError):
continue
if tx.tx_type.value == "incoming":
earned += tx.amount_sats
else:
spent += tx.amount_sats
return earned, spent
except Exception as exc:
logger.debug("Failed to query token transactions: %s", exc)
return 0, 0
def _generate_narrative_bullets(metrics: AgentMetrics, period_type: PeriodType) -> list[str]:
"""Generate narrative summary bullets for a scorecard.
Args:
metrics: The agent's metrics
period_type: daily or weekly
Returns:
List of narrative bullet points
"""
bullets: list[str] = []
period_label = "day" if period_type == PeriodType.daily else "week"
# Activity summary
activities = []
if metrics.commits:
activities.append(f"{metrics.commits} commit{'s' if metrics.commits != 1 else ''}")
if len(metrics.prs_opened):
activities.append(
f"{len(metrics.prs_opened)} PR{'s' if len(metrics.prs_opened) != 1 else ''} opened"
)
if len(metrics.prs_merged):
activities.append(
f"{len(metrics.prs_merged)} PR{'s' if len(metrics.prs_merged) != 1 else ''} merged"
)
if len(metrics.issues_touched):
activities.append(
f"{len(metrics.issues_touched)} issue{'s' if len(metrics.issues_touched) != 1 else ''} touched"
)
if metrics.comments:
activities.append(f"{metrics.comments} comment{'s' if metrics.comments != 1 else ''}")
if activities:
bullets.append(f"Active across {', '.join(activities)} this {period_label}.")
# Test activity
if len(metrics.tests_affected):
bullets.append(
f"Affected {len(metrics.tests_affected)} test file{'s' if len(metrics.tests_affected) != 1 else ''}."
)
# Token summary
net_tokens = metrics.tokens_earned - metrics.tokens_spent
if metrics.tokens_earned or metrics.tokens_spent:
if net_tokens > 0:
bullets.append(
f"Net earned {net_tokens} tokens ({metrics.tokens_earned} earned, {metrics.tokens_spent} spent)."
)
elif net_tokens < 0:
bullets.append(
f"Net spent {abs(net_tokens)} tokens ({metrics.tokens_earned} earned, {metrics.tokens_spent} spent)."
)
else:
bullets.append(
f"Balanced token flow ({metrics.tokens_earned} earned, {metrics.tokens_spent} spent)."
)
# Handle empty case
if not bullets:
bullets.append(f"No recorded activity this {period_label}.")
return bullets
def _detect_patterns(metrics: AgentMetrics) -> list[str]:
"""Detect interesting patterns in agent behavior.
Args:
metrics: The agent's metrics
Returns:
List of pattern descriptions
"""
patterns: list[str] = []
pr_opened = len(metrics.prs_opened)
merge_rate = metrics.pr_merge_rate
# Merge rate patterns
if pr_opened >= 3:
if merge_rate >= 0.8:
patterns.append("High merge rate with few failures — code quality focus.")
elif merge_rate <= 0.3:
patterns.append("Lots of noisy PRs, low merge rate — may need review support.")
# Activity patterns
if metrics.commits > 10 and pr_opened == 0:
patterns.append("High commit volume without PRs — working directly on main?")
if len(metrics.issues_touched) > 5 and metrics.comments == 0:
patterns.append("Touching many issues but low comment volume — silent worker.")
if metrics.comments > len(metrics.issues_touched) * 2:
patterns.append("Highly communicative — lots of discussion relative to work items.")
# Token patterns
net_tokens = metrics.tokens_earned - metrics.tokens_spent
if net_tokens > 100:
patterns.append("Strong token accumulation — high value delivery.")
elif net_tokens < -50:
patterns.append("High token spend — may be in experimentation phase.")
return patterns
def generate_scorecard(
agent_id: str,
period_type: PeriodType = PeriodType.daily,
reference_date: datetime | None = None,
) -> ScorecardSummary | None:
"""Generate a scorecard for a single agent.
Args:
agent_id: The agent to generate scorecard for
period_type: daily or weekly
reference_date: The date to calculate from (defaults to now)
Returns:
ScorecardSummary or None if agent has no activity
"""
start, end = _get_period_bounds(period_type, reference_date)
# Collect events
events = _collect_events_for_period(start, end, agent_id)
# Aggregate metrics
all_metrics = _aggregate_metrics(events)
# Get metrics for this specific agent
if agent_id not in all_metrics:
# Create empty metrics - still generate a scorecard
metrics = AgentMetrics(agent_id=agent_id)
else:
metrics = all_metrics[agent_id]
# Augment with token data from ledger
tokens_earned, tokens_spent = _query_token_transactions(agent_id, start, end)
metrics.tokens_earned = max(metrics.tokens_earned, tokens_earned)
metrics.tokens_spent = max(metrics.tokens_spent, tokens_spent)
# Generate narrative and patterns
narrative = _generate_narrative_bullets(metrics, period_type)
patterns = _detect_patterns(metrics)
return ScorecardSummary(
agent_id=agent_id,
period_type=period_type,
period_start=start,
period_end=end,
metrics=metrics,
narrative_bullets=narrative,
patterns=patterns,
)
def generate_all_scorecards(
period_type: PeriodType = PeriodType.daily,
reference_date: datetime | None = None,
) -> list[ScorecardSummary]:
"""Generate scorecards for all tracked agents.
Args:
period_type: daily or weekly
reference_date: The date to calculate from (defaults to now)
Returns:
List of ScorecardSummary for all agents with activity
"""
start, end = _get_period_bounds(period_type, reference_date)
# Collect all events
events = _collect_events_for_period(start, end)
# Aggregate metrics for all agents
all_metrics = _aggregate_metrics(events)
# Include tracked agents even if no activity
for agent_id in TRACKED_AGENTS:
if agent_id not in all_metrics:
all_metrics[agent_id] = AgentMetrics(agent_id=agent_id)
# Generate scorecards
scorecards: list[ScorecardSummary] = []
for agent_id, metrics in all_metrics.items():
# Augment with token data
tokens_earned, tokens_spent = _query_token_transactions(agent_id, start, end)
metrics.tokens_earned = max(metrics.tokens_earned, tokens_earned)
metrics.tokens_spent = max(metrics.tokens_spent, tokens_spent)
narrative = _generate_narrative_bullets(metrics, period_type)
patterns = _detect_patterns(metrics)
scorecard = ScorecardSummary(
agent_id=agent_id,
period_type=period_type,
period_start=start,
period_end=end,
metrics=metrics,
narrative_bullets=narrative,
patterns=patterns,
)
scorecards.append(scorecard)
# Sort by agent_id for consistent ordering
scorecards.sort(key=lambda s: s.agent_id)
return scorecards
def get_tracked_agents() -> list[str]:
"""Return the list of tracked agent IDs."""
return sorted(TRACKED_AGENTS)

View File

@@ -51,6 +51,7 @@
<a href="/thinking" class="mc-test-link mc-link-thinking">THINKING</a>
<a href="/swarm/mission-control" class="mc-test-link">MISSION CTRL</a>
<a href="/swarm/live" class="mc-test-link">SWARM</a>
<a href="/scorecards" class="mc-test-link">SCORECARDS</a>
<a href="/bugs" class="mc-test-link mc-link-bugs">BUGS</a>
</div>
</div>
@@ -123,6 +124,7 @@
<a href="/thinking" class="mc-mobile-link">THINKING</a>
<a href="/swarm/mission-control" class="mc-mobile-link">MISSION CONTROL</a>
<a href="/swarm/live" class="mc-mobile-link">SWARM</a>
<a href="/scorecards" class="mc-mobile-link">SCORECARDS</a>
<a href="/bugs" class="mc-mobile-link">BUGS</a>
<div class="mc-mobile-section-label">INTELLIGENCE</div>
<a href="/spark/ui" class="mc-mobile-link">SPARK</a>

View File

@@ -0,0 +1,80 @@
{% from "macros.html" import panel %}
<div class="quests-summary mb-4">
<div class="row">
<div class="col-md-4">
<div class="stat-card">
<div class="stat-value">{{ total_tokens }}</div>
<div class="stat-label">Tokens Earned</div>
</div>
</div>
<div class="col-md-4">
<div class="stat-card">
<div class="stat-value">{{ completed_count }}</div>
<div class="stat-label">Quests Completed</div>
</div>
</div>
<div class="col-md-4">
<div class="stat-card">
<div class="stat-value">{{ quests|selectattr('enabled', 'equalto', true)|list|length }}</div>
<div class="stat-label">Active Quests</div>
</div>
</div>
</div>
</div>
<div class="quests-list">
{% for quest in quests %}
{% if quest.enabled %}
<div class="quest-card quest-status-{{ quest.status }}">
<div class="quest-header">
<h5 class="quest-name">{{ quest.name }}</h5>
<span class="quest-reward">+{{ quest.reward_tokens }} ⚡</span>
</div>
<p class="quest-description">{{ quest.description }}</p>
<div class="quest-progress">
{% if quest.status == 'completed' %}
<div class="progress">
<div class="progress-bar bg-success" style="width: 100%"></div>
</div>
<span class="quest-status-badge completed">Completed</span>
{% elif quest.status == 'claimed' %}
<div class="progress">
<div class="progress-bar bg-success" style="width: 100%"></div>
</div>
<span class="quest-status-badge claimed">Reward Claimed</span>
{% elif quest.on_cooldown %}
<div class="progress">
<div class="progress-bar bg-secondary" style="width: 100%"></div>
</div>
<span class="quest-status-badge cooldown">
Cooldown: {{ quest.cooldown_hours_remaining }}h remaining
</span>
{% else %}
<div class="progress">
<div class="progress-bar" style="width: {{ (quest.current_value / quest.target_value * 100)|int }}%"></div>
</div>
<span class="quest-progress-text">{{ quest.current_value }} / {{ quest.target_value }}</span>
{% endif %}
</div>
<div class="quest-meta">
<span class="quest-type">{{ quest.type }}</span>
{% if quest.repeatable %}
<span class="quest-repeatable">↻ Repeatable</span>
{% endif %}
{% if quest.completion_count > 0 %}
<span class="quest-completions">Completed {{ quest.completion_count }} time{% if quest.completion_count != 1 %}s{% endif %}</span>
{% endif %}
</div>
</div>
{% endif %}
{% endfor %}
</div>
{% if not quests|selectattr('enabled', 'equalto', true)|list|length %}
<div class="alert alert-info">
No active quests available. Check back later or contact an administrator.
</div>
{% endif %}

View File

@@ -0,0 +1,50 @@
{% extends "base.html" %}
{% block title %}Quests — Mission Control{% endblock %}
{% block content %}
<div class="container-fluid">
<div class="row">
<div class="col-12">
<h1 class="mc-title">Token Quests</h1>
<p class="mc-subtitle">Complete quests to earn bonus tokens</p>
</div>
</div>
<div class="row mt-4">
<div class="col-md-8">
<div id="quests-panel" hx-get="/quests/panel/{{ agent_id }}" hx-trigger="load, every 30s">
<div class="mc-loading">Loading quests...</div>
</div>
</div>
<div class="col-md-4">
<div class="card mc-panel">
<div class="card-header">
<h5 class="mb-0">Leaderboard</h5>
</div>
<div class="card-body">
<div id="leaderboard" hx-get="/quests/api/leaderboard" hx-trigger="load, every 60s">
<div class="mc-loading">Loading leaderboard...</div>
</div>
</div>
</div>
<div class="card mc-panel mt-4">
<div class="card-header">
<h5 class="mb-0">About Quests</h5>
</div>
<div class="card-body">
<p class="mb-2">Quests are special objectives that reward tokens upon completion.</p>
<ul class="mc-list mb-0">
<li>Complete Daily Run sessions</li>
<li>Close flaky-test issues</li>
<li>Reduce P1 issue backlog</li>
<li>Improve documentation</li>
</ul>
</div>
</div>
</div>
</div>
</div>
{% endblock %}

View File

@@ -0,0 +1,113 @@
{% extends "base.html" %}
{% block title %}Agent Scorecards - Timmy Time{% endblock %}
{% block extra_styles %}{% endblock %}
{% block content %}
<div class="container-fluid py-4">
<!-- Header -->
<div class="d-flex justify-content-between align-items-center mb-4">
<div>
<h1 class="h3 mb-0">AGENT SCORECARDS</h1>
<p class="text-muted small mb-0">Track agent performance across issues, PRs, tests, and tokens</p>
</div>
<div class="d-flex gap-2">
<select id="period-select" class="form-select form-select-sm" style="width: auto;">
<option value="daily" selected>Daily</option>
<option value="weekly">Weekly</option>
</select>
<button class="btn btn-sm btn-primary" onclick="refreshScorecards()">
<span>Refresh</span>
</button>
</div>
</div>
<!-- Scorecards Grid -->
<div id="scorecards-container"
hx-get="/scorecards/all/panels?period=daily"
hx-trigger="load"
hx-swap="innerHTML">
<div class="text-center py-5">
<div class="spinner-border text-secondary" role="status">
<span class="visually-hidden">Loading...</span>
</div>
<p class="text-muted mt-2">Loading scorecards...</p>
</div>
</div>
<!-- API Reference -->
<div class="mt-5 pt-4 border-top">
<h5 class="text-muted">API Reference</h5>
<div class="row g-3">
<div class="col-md-6">
<div class="card mc-panel">
<div class="card-body">
<h6 class="card-title">List Tracked Agents</h6>
<code>GET /scorecards/api/agents</code>
<p class="small text-muted mt-2">Returns all tracked agent IDs</p>
</div>
</div>
</div>
<div class="col-md-6">
<div class="card mc-panel">
<div class="card-body">
<h6 class="card-title">Get All Scorecards</h6>
<code>GET /scorecards/api?period=daily|weekly</code>
<p class="small text-muted mt-2">Returns scorecards for all agents</p>
</div>
</div>
</div>
<div class="col-md-6">
<div class="card mc-panel">
<div class="card-body">
<h6 class="card-title">Get Agent Scorecard</h6>
<code>GET /scorecards/api/{agent_id}?period=daily|weekly</code>
<p class="small text-muted mt-2">Returns scorecard for a specific agent</p>
</div>
</div>
</div>
<div class="col-md-6">
<div class="card mc-panel">
<div class="card-body">
<h6 class="card-title">HTML Panel (HTMX)</h6>
<code>GET /scorecards/panel/{agent_id}?period=daily|weekly</code>
<p class="small text-muted mt-2">Returns HTML panel for embedding</p>
</div>
</div>
</div>
</div>
</div>
</div>
<script>
// Period selector change handler
document.getElementById('period-select').addEventListener('change', function() {
refreshScorecards();
});
function refreshScorecards() {
var period = document.getElementById('period-select').value;
var container = document.getElementById('scorecards-container');
// Show loading state
container.innerHTML = `
<div class="text-center py-5">
<div class="spinner-border text-secondary" role="status">
<span class="visually-hidden">Loading...</span>
</div>
<p class="text-muted mt-2">Loading scorecards...</p>
</div>
`;
// Trigger HTMX request
htmx.ajax('GET', '/scorecards/all/panels?period=' + period, {
target: '#scorecards-container',
swap: 'innerHTML'
});
}
// Auto-refresh every 5 minutes
setInterval(refreshScorecards, 300000);
</script>
{% endblock %}

581
src/timmy/quest_system.py Normal file
View File

@@ -0,0 +1,581 @@
"""Token Quest System for agent rewards.
Provides quest definitions, progress tracking, completion detection,
and token awards for agent accomplishments.
Quests are defined in config/quests.yaml and loaded at runtime.
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from enum import StrEnum
from pathlib import Path
from typing import Any
import yaml
from config import settings
logger = logging.getLogger(__name__)
# Path to quest configuration
QUEST_CONFIG_PATH = Path(settings.repo_root) / "config" / "quests.yaml"
class QuestType(StrEnum):
"""Types of quests supported by the system."""
ISSUE_COUNT = "issue_count"
ISSUE_REDUCE = "issue_reduce"
DOCS_UPDATE = "docs_update"
TEST_IMPROVE = "test_improve"
DAILY_RUN = "daily_run"
CUSTOM = "custom"
class QuestStatus(StrEnum):
"""Status of a quest for an agent."""
NOT_STARTED = "not_started"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
CLAIMED = "claimed"
EXPIRED = "expired"
@dataclass
class QuestDefinition:
"""Definition of a quest from configuration."""
id: str
name: str
description: str
reward_tokens: int
quest_type: QuestType
enabled: bool
repeatable: bool
cooldown_hours: int
criteria: dict[str, Any]
notification_message: str
@classmethod
def from_dict(cls, data: dict[str, Any]) -> QuestDefinition:
"""Create a QuestDefinition from a dictionary."""
return cls(
id=data["id"],
name=data.get("name", "Unnamed Quest"),
description=data.get("description", ""),
reward_tokens=data.get("reward_tokens", 0),
quest_type=QuestType(data.get("type", "custom")),
enabled=data.get("enabled", True),
repeatable=data.get("repeatable", False),
cooldown_hours=data.get("cooldown_hours", 0),
criteria=data.get("criteria", {}),
notification_message=data.get(
"notification_message", "Quest Complete! You earned {tokens} tokens."
),
)
@dataclass
class QuestProgress:
"""Progress of a quest for a specific agent."""
quest_id: str
agent_id: str
status: QuestStatus
current_value: int = 0
target_value: int = 0
started_at: str = ""
completed_at: str = ""
claimed_at: str = ""
completion_count: int = 0
last_completed_at: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"quest_id": self.quest_id,
"agent_id": self.agent_id,
"status": self.status.value,
"current_value": self.current_value,
"target_value": self.target_value,
"started_at": self.started_at,
"completed_at": self.completed_at,
"claimed_at": self.claimed_at,
"completion_count": self.completion_count,
"last_completed_at": self.last_completed_at,
"metadata": self.metadata,
}
# In-memory storage for quest progress
_quest_progress: dict[str, QuestProgress] = {}
_quest_definitions: dict[str, QuestDefinition] = {}
_quest_settings: dict[str, Any] = {}
def _get_progress_key(quest_id: str, agent_id: str) -> str:
"""Generate a unique key for quest progress."""
return f"{agent_id}:{quest_id}"
def load_quest_config() -> tuple[dict[str, QuestDefinition], dict[str, Any]]:
"""Load quest definitions from quests.yaml.
Returns:
Tuple of (quest definitions dict, settings dict)
"""
global _quest_definitions, _quest_settings
if not QUEST_CONFIG_PATH.exists():
logger.warning("Quest config not found at %s", QUEST_CONFIG_PATH)
return {}, {}
try:
raw = QUEST_CONFIG_PATH.read_text()
config = yaml.safe_load(raw)
if not isinstance(config, dict):
logger.warning("Invalid quest config format")
return {}, {}
# Load quest definitions
quests_data = config.get("quests", {})
definitions = {}
for quest_id, quest_data in quests_data.items():
quest_data["id"] = quest_id
try:
definition = QuestDefinition.from_dict(quest_data)
definitions[quest_id] = definition
except (ValueError, KeyError) as exc:
logger.warning("Failed to load quest %s: %s", quest_id, exc)
# Load settings
_quest_settings = config.get("settings", {})
_quest_definitions = definitions
logger.debug("Loaded %d quest definitions", len(definitions))
return definitions, _quest_settings
except (OSError, yaml.YAMLError) as exc:
logger.warning("Failed to load quest config: %s", exc)
return {}, {}
def get_quest_definitions() -> dict[str, QuestDefinition]:
"""Get all quest definitions, loading if necessary."""
global _quest_definitions
if not _quest_definitions:
_quest_definitions, _ = load_quest_config()
return _quest_definitions
def get_quest_definition(quest_id: str) -> QuestDefinition | None:
"""Get a specific quest definition by ID."""
definitions = get_quest_definitions()
return definitions.get(quest_id)
def get_active_quests() -> list[QuestDefinition]:
"""Get all enabled quest definitions."""
definitions = get_quest_definitions()
return [q for q in definitions.values() if q.enabled]
def get_quest_progress(quest_id: str, agent_id: str) -> QuestProgress | None:
"""Get progress for a specific quest and agent."""
key = _get_progress_key(quest_id, agent_id)
return _quest_progress.get(key)
def get_or_create_progress(quest_id: str, agent_id: str) -> QuestProgress:
"""Get existing progress or create new for quest/agent."""
key = _get_progress_key(quest_id, agent_id)
if key not in _quest_progress:
quest = get_quest_definition(quest_id)
if not quest:
raise ValueError(f"Quest {quest_id} not found")
target = _get_target_value(quest)
_quest_progress[key] = QuestProgress(
quest_id=quest_id,
agent_id=agent_id,
status=QuestStatus.NOT_STARTED,
current_value=0,
target_value=target,
started_at=datetime.now(UTC).isoformat(),
)
return _quest_progress[key]
def _get_target_value(quest: QuestDefinition) -> int:
"""Extract target value from quest criteria."""
criteria = quest.criteria
if quest.quest_type == QuestType.ISSUE_COUNT:
return criteria.get("target_count", 1)
elif quest.quest_type == QuestType.ISSUE_REDUCE:
return criteria.get("target_reduction", 1)
elif quest.quest_type == QuestType.DAILY_RUN:
return criteria.get("min_sessions", 1)
elif quest.quest_type == QuestType.DOCS_UPDATE:
return criteria.get("min_files_changed", 1)
elif quest.quest_type == QuestType.TEST_IMPROVE:
return criteria.get("min_new_tests", 1)
return 1
def update_quest_progress(
quest_id: str,
agent_id: str,
current_value: int,
metadata: dict[str, Any] | None = None,
) -> QuestProgress:
"""Update progress for a quest."""
progress = get_or_create_progress(quest_id, agent_id)
progress.current_value = current_value
if metadata:
progress.metadata.update(metadata)
# Check if quest is now complete
if progress.current_value >= progress.target_value:
if progress.status not in (QuestStatus.COMPLETED, QuestStatus.CLAIMED):
progress.status = QuestStatus.COMPLETED
progress.completed_at = datetime.now(UTC).isoformat()
logger.info("Quest %s completed for agent %s", quest_id, agent_id)
return progress
def _is_on_cooldown(progress: QuestProgress, quest: QuestDefinition) -> bool:
"""Check if a repeatable quest is on cooldown."""
if not quest.repeatable or not progress.last_completed_at:
return False
if quest.cooldown_hours <= 0:
return False
try:
last_completed = datetime.fromisoformat(progress.last_completed_at)
cooldown_end = last_completed + timedelta(hours=quest.cooldown_hours)
return datetime.now(UTC) < cooldown_end
except (ValueError, TypeError):
return False
def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
"""Claim the token reward for a completed quest.
Returns:
Reward info dict if successful, None if not claimable
"""
progress = get_quest_progress(quest_id, agent_id)
if not progress:
return None
quest = get_quest_definition(quest_id)
if not quest:
return None
# Check if quest is completed but not yet claimed
if progress.status != QuestStatus.COMPLETED:
return None
# Check cooldown for repeatable quests
if _is_on_cooldown(progress, quest):
return None
try:
# Award tokens via ledger
from lightning.ledger import create_invoice_entry, mark_settled
# Create a mock invoice for the reward
invoice_entry = create_invoice_entry(
payment_hash=f"quest_{quest_id}_{agent_id}_{int(time.time())}",
amount_sats=quest.reward_tokens,
memo=f"Quest reward: {quest.name}",
source="quest_reward",
agent_id=agent_id,
)
# Mark as settled immediately (quest rewards are auto-settled)
mark_settled(invoice_entry.payment_hash, preimage=f"quest_{quest_id}")
# Update progress
progress.status = QuestStatus.CLAIMED
progress.claimed_at = datetime.now(UTC).isoformat()
progress.completion_count += 1
progress.last_completed_at = progress.claimed_at
# Reset for repeatable quests
if quest.repeatable:
progress.status = QuestStatus.NOT_STARTED
progress.current_value = 0
progress.completed_at = ""
progress.claimed_at = ""
notification = quest.notification_message.format(tokens=quest.reward_tokens)
return {
"quest_id": quest_id,
"agent_id": agent_id,
"tokens_awarded": quest.reward_tokens,
"notification": notification,
"completion_count": progress.completion_count,
}
except Exception as exc:
logger.error("Failed to award quest reward: %s", exc)
return None
def check_issue_count_quest(
quest: QuestDefinition,
agent_id: str,
closed_issues: list[dict],
) -> QuestProgress | None:
"""Check progress for issue_count type quest."""
criteria = quest.criteria
target_labels = set(criteria.get("issue_labels", []))
# target_count is available in criteria but not used directly here
# Count matching issues
matching_count = 0
for issue in closed_issues:
issue_labels = {label.get("name", "") for label in issue.get("labels", [])}
if target_labels.issubset(issue_labels) or (not target_labels and issue_labels):
matching_count += 1
progress = update_quest_progress(
quest.id, agent_id, matching_count, {"matching_issues": matching_count}
)
return progress
def check_issue_reduce_quest(
quest: QuestDefinition,
agent_id: str,
previous_count: int,
current_count: int,
) -> QuestProgress | None:
"""Check progress for issue_reduce type quest."""
# target_reduction available in quest.criteria but we track actual reduction
reduction = max(0, previous_count - current_count)
progress = update_quest_progress(quest.id, agent_id, reduction, {"reduction": reduction})
return progress
def check_daily_run_quest(
quest: QuestDefinition,
agent_id: str,
sessions_completed: int,
) -> QuestProgress | None:
"""Check progress for daily_run type quest."""
# min_sessions available in quest.criteria but we track actual sessions
progress = update_quest_progress(
quest.id, agent_id, sessions_completed, {"sessions": sessions_completed}
)
return progress
def evaluate_quest_progress(
quest_id: str,
agent_id: str,
context: dict[str, Any],
) -> QuestProgress | None:
"""Evaluate quest progress based on quest type and context.
Args:
quest_id: The quest to evaluate
agent_id: The agent to evaluate for
context: Context data for evaluation (issues, metrics, etc.)
Returns:
Updated QuestProgress or None if evaluation failed
"""
quest = get_quest_definition(quest_id)
if not quest or not quest.enabled:
return None
progress = get_quest_progress(quest_id, agent_id)
# Check cooldown for repeatable quests
if progress and _is_on_cooldown(progress, quest):
return progress
try:
if quest.quest_type == QuestType.ISSUE_COUNT:
closed_issues = context.get("closed_issues", [])
return check_issue_count_quest(quest, agent_id, closed_issues)
elif quest.quest_type == QuestType.ISSUE_REDUCE:
prev_count = context.get("previous_issue_count", 0)
curr_count = context.get("current_issue_count", 0)
return check_issue_reduce_quest(quest, agent_id, prev_count, curr_count)
elif quest.quest_type == QuestType.DAILY_RUN:
sessions = context.get("sessions_completed", 0)
return check_daily_run_quest(quest, agent_id, sessions)
elif quest.quest_type == QuestType.CUSTOM:
# Custom quests require manual completion
return progress
else:
logger.debug("Quest type %s not yet implemented", quest.quest_type)
return progress
except Exception as exc:
logger.warning("Quest evaluation failed for %s: %s", quest_id, exc)
return progress
def auto_evaluate_all_quests(agent_id: str, context: dict[str, Any]) -> list[dict]:
"""Evaluate all active quests for an agent and award rewards.
Returns:
List of reward info for newly completed quests
"""
rewards = []
active_quests = get_active_quests()
for quest in active_quests:
progress = evaluate_quest_progress(quest.id, agent_id, context)
if progress and progress.status == QuestStatus.COMPLETED:
# Auto-claim the reward
reward = claim_quest_reward(quest.id, agent_id)
if reward:
rewards.append(reward)
return rewards
def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
"""Get complete quest status for an agent."""
definitions = get_quest_definitions()
quests_status = []
total_rewards = 0
completed_count = 0
for quest_id, quest in definitions.items():
progress = get_quest_progress(quest_id, agent_id)
if not progress:
progress = get_or_create_progress(quest_id, agent_id)
is_on_cooldown = _is_on_cooldown(progress, quest) if quest.repeatable else False
quest_info = {
"quest_id": quest_id,
"name": quest.name,
"description": quest.description,
"reward_tokens": quest.reward_tokens,
"type": quest.quest_type.value,
"enabled": quest.enabled,
"repeatable": quest.repeatable,
"status": progress.status.value,
"current_value": progress.current_value,
"target_value": progress.target_value,
"completion_count": progress.completion_count,
"on_cooldown": is_on_cooldown,
"cooldown_hours_remaining": 0,
}
if is_on_cooldown and progress.last_completed_at:
try:
last = datetime.fromisoformat(progress.last_completed_at)
cooldown_end = last + timedelta(hours=quest.cooldown_hours)
hours_remaining = (cooldown_end - datetime.now(UTC)).total_seconds() / 3600
quest_info["cooldown_hours_remaining"] = round(max(0, hours_remaining), 1)
except (ValueError, TypeError):
pass
quests_status.append(quest_info)
total_rewards += progress.completion_count * quest.reward_tokens
completed_count += progress.completion_count
return {
"agent_id": agent_id,
"quests": quests_status,
"total_tokens_earned": total_rewards,
"total_quests_completed": completed_count,
"active_quests_count": len([q for q in quests_status if q["enabled"]]),
}
def reset_quest_progress(quest_id: str | None = None, agent_id: str | None = None) -> int:
"""Reset quest progress. Useful for testing.
Args:
quest_id: Specific quest to reset, or None for all
agent_id: Specific agent to reset, or None for all
Returns:
Number of progress entries reset
"""
global _quest_progress
count = 0
keys_to_reset = []
for key, _progress in _quest_progress.items():
key_agent, key_quest = key.split(":", 1)
if (quest_id is None or key_quest == quest_id) and (
agent_id is None or key_agent == agent_id
):
keys_to_reset.append(key)
for key in keys_to_reset:
del _quest_progress[key]
count += 1
return count
def get_quest_leaderboard() -> list[dict[str, Any]]:
"""Get a leaderboard of agents by quest completion."""
agent_stats: dict[str, dict[str, Any]] = {}
for _key, progress in _quest_progress.items():
agent_id = progress.agent_id
if agent_id not in agent_stats:
agent_stats[agent_id] = {
"agent_id": agent_id,
"total_completions": 0,
"total_tokens": 0,
"quests_completed": set(),
}
quest = get_quest_definition(progress.quest_id)
if quest:
agent_stats[agent_id]["total_completions"] += progress.completion_count
agent_stats[agent_id]["total_tokens"] += progress.completion_count * quest.reward_tokens
if progress.completion_count > 0:
agent_stats[agent_id]["quests_completed"].add(quest.id)
leaderboard = []
for stats in agent_stats.values():
leaderboard.append(
{
"agent_id": stats["agent_id"],
"total_completions": stats["total_completions"],
"total_tokens": stats["total_tokens"],
"unique_quests_completed": len(stats["quests_completed"]),
}
)
# Sort by total tokens (descending)
leaderboard.sort(key=lambda x: x["total_tokens"], reverse=True)
return leaderboard
# Initialize on module load
load_quest_config()

View File

@@ -0,0 +1,680 @@
"""Tests for agent scorecard functionality."""
from datetime import UTC, datetime, timedelta
from unittest.mock import MagicMock, patch
from dashboard.services.scorecard_service import (
AgentMetrics,
PeriodType,
ScorecardSummary,
_aggregate_metrics,
_detect_patterns,
_extract_actor_from_event,
_generate_narrative_bullets,
_get_period_bounds,
_is_tracked_agent,
_query_token_transactions,
generate_all_scorecards,
generate_scorecard,
get_tracked_agents,
)
from infrastructure.events.bus import Event
class TestPeriodBounds:
"""Test period boundary calculations."""
def test_daily_period_bounds(self):
"""Test daily period returns correct 24-hour window."""
reference = datetime(2026, 3, 21, 12, 30, 45, tzinfo=UTC)
start, end = _get_period_bounds(PeriodType.daily, reference)
assert end == datetime(2026, 3, 21, 0, 0, 0, tzinfo=UTC)
assert start == datetime(2026, 3, 20, 0, 0, 0, tzinfo=UTC)
assert (end - start) == timedelta(days=1)
def test_weekly_period_bounds(self):
"""Test weekly period returns correct 7-day window."""
reference = datetime(2026, 3, 21, 12, 30, 45, tzinfo=UTC)
start, end = _get_period_bounds(PeriodType.weekly, reference)
assert end == datetime(2026, 3, 21, 0, 0, 0, tzinfo=UTC)
assert start == datetime(2026, 3, 14, 0, 0, 0, tzinfo=UTC)
assert (end - start) == timedelta(days=7)
def test_default_reference_date(self):
"""Test default reference date uses current time."""
start, end = _get_period_bounds(PeriodType.daily)
now = datetime.now(UTC)
# End should be start of current day (midnight)
expected_end = now.replace(hour=0, minute=0, second=0, microsecond=0)
assert end == expected_end
# Start should be 24 hours before end
assert (end - start) == timedelta(days=1)
class TestTrackedAgents:
"""Test agent tracking functions."""
def test_get_tracked_agents(self):
"""Test get_tracked_agents returns sorted list."""
agents = get_tracked_agents()
assert isinstance(agents, list)
assert "kimi" in agents
assert "claude" in agents
assert "gemini" in agents
assert "hermes" in agents
assert "manus" in agents
assert agents == sorted(agents)
def test_is_tracked_agent_true(self):
"""Test _is_tracked_agent returns True for tracked agents."""
assert _is_tracked_agent("kimi") is True
assert _is_tracked_agent("KIMI") is True # case insensitive
assert _is_tracked_agent("claude") is True
assert _is_tracked_agent("hermes") is True
def test_is_tracked_agent_false(self):
"""Test _is_tracked_agent returns False for untracked agents."""
assert _is_tracked_agent("unknown") is False
assert _is_tracked_agent("rockachopa") is False
assert _is_tracked_agent("") is False
class TestExtractActor:
"""Test actor extraction from events."""
def test_extract_from_actor_field(self):
"""Test extraction from data.actor field."""
event = Event(type="test", source="system", data={"actor": "kimi"})
assert _extract_actor_from_event(event) == "kimi"
def test_extract_from_agent_id_field(self):
"""Test extraction from data.agent_id field."""
event = Event(type="test", source="system", data={"agent_id": "claude"})
assert _extract_actor_from_event(event) == "claude"
def test_extract_from_source_fallback(self):
"""Test fallback to event.source."""
event = Event(type="test", source="gemini", data={})
assert _extract_actor_from_event(event) == "gemini"
def test_actor_priority_over_agent_id(self):
"""Test actor field takes priority over agent_id."""
event = Event(type="test", source="system", data={"actor": "kimi", "agent_id": "claude"})
assert _extract_actor_from_event(event) == "kimi"
class TestAggregateMetrics:
"""Test metrics aggregation from events."""
def test_empty_events(self):
"""Test aggregation with no events returns empty dict."""
result = _aggregate_metrics([])
assert result == {}
def test_push_event_aggregation(self):
"""Test push events aggregate commits correctly."""
events = [
Event(type="gitea.push", source="gitea", data={"actor": "kimi", "num_commits": 3}),
Event(type="gitea.push", source="gitea", data={"actor": "kimi", "num_commits": 2}),
]
result = _aggregate_metrics(events)
assert "kimi" in result
assert result["kimi"].commits == 5
def test_issue_opened_aggregation(self):
"""Test issue opened events aggregate correctly."""
events = [
Event(
type="gitea.issue.opened",
source="gitea",
data={"actor": "claude", "issue_number": 100},
),
Event(
type="gitea.issue.opened",
source="gitea",
data={"actor": "claude", "issue_number": 101},
),
]
result = _aggregate_metrics(events)
assert "claude" in result
assert len(result["claude"].issues_touched) == 2
assert 100 in result["claude"].issues_touched
assert 101 in result["claude"].issues_touched
def test_comment_aggregation(self):
"""Test comment events aggregate correctly."""
events = [
Event(
type="gitea.issue.comment",
source="gitea",
data={"actor": "gemini", "issue_number": 100},
),
Event(
type="gitea.issue.comment",
source="gitea",
data={"actor": "gemini", "issue_number": 101},
),
]
result = _aggregate_metrics(events)
assert "gemini" in result
assert result["gemini"].comments == 2
assert len(result["gemini"].issues_touched) == 2 # Comments touch issues too
def test_pr_events_aggregation(self):
"""Test PR open and merge events aggregate correctly."""
events = [
Event(
type="gitea.pull_request",
source="gitea",
data={"actor": "kimi", "pr_number": 50, "action": "opened"},
),
Event(
type="gitea.pull_request",
source="gitea",
data={"actor": "kimi", "pr_number": 50, "action": "closed", "merged": True},
),
Event(
type="gitea.pull_request",
source="gitea",
data={"actor": "kimi", "pr_number": 51, "action": "opened"},
),
]
result = _aggregate_metrics(events)
assert "kimi" in result
assert len(result["kimi"].prs_opened) == 2
assert len(result["kimi"].prs_merged) == 1
assert 50 in result["kimi"].prs_merged
def test_untracked_agent_filtered(self):
"""Test events from untracked agents are filtered out."""
events = [
Event(
type="gitea.push", source="gitea", data={"actor": "rockachopa", "num_commits": 5}
),
]
result = _aggregate_metrics(events)
assert "rockachopa" not in result
def test_task_completion_aggregation(self):
"""Test task completion events aggregate test files."""
events = [
Event(
type="agent.task.completed",
source="gitea",
data={
"agent_id": "kimi",
"tests_affected": ["test_foo.py", "test_bar.py"],
"token_reward": 10,
},
),
]
result = _aggregate_metrics(events)
assert "kimi" in result
assert len(result["kimi"].tests_affected) == 2
assert "test_foo.py" in result["kimi"].tests_affected
assert result["kimi"].tokens_earned == 10
class TestAgentMetrics:
"""Test AgentMetrics class."""
def test_merge_rate_zero_prs(self):
"""Test merge rate is 0 when no PRs opened."""
metrics = AgentMetrics(agent_id="kimi")
assert metrics.pr_merge_rate == 0.0
def test_merge_rate_perfect(self):
"""Test 100% merge rate calculation."""
metrics = AgentMetrics(agent_id="kimi", prs_opened={1, 2, 3}, prs_merged={1, 2, 3})
assert metrics.pr_merge_rate == 1.0
def test_merge_rate_partial(self):
"""Test partial merge rate calculation."""
metrics = AgentMetrics(agent_id="kimi", prs_opened={1, 2, 3, 4}, prs_merged={1, 2})
assert metrics.pr_merge_rate == 0.5
class TestDetectPatterns:
"""Test pattern detection logic."""
def test_high_merge_rate_pattern(self):
"""Test detection of high merge rate pattern."""
metrics = AgentMetrics(
agent_id="kimi",
prs_opened={1, 2, 3, 4, 5},
prs_merged={1, 2, 3, 4}, # 80% merge rate
)
patterns = _detect_patterns(metrics)
assert any("High merge rate" in p for p in patterns)
def test_low_merge_rate_pattern(self):
"""Test detection of low merge rate pattern."""
metrics = AgentMetrics(
agent_id="kimi",
prs_opened={1, 2, 3, 4, 5},
prs_merged={1}, # 20% merge rate
)
patterns = _detect_patterns(metrics)
assert any("low merge rate" in p for p in patterns)
def test_high_commits_no_prs_pattern(self):
"""Test detection of direct-to-main commits pattern."""
metrics = AgentMetrics(
agent_id="kimi",
commits=15,
prs_opened=set(),
)
patterns = _detect_patterns(metrics)
assert any("High commit volume without PRs" in p for p in patterns)
def test_silent_worker_pattern(self):
"""Test detection of silent worker pattern."""
metrics = AgentMetrics(
agent_id="kimi",
issues_touched={1, 2, 3, 4, 5, 6},
comments=0,
)
patterns = _detect_patterns(metrics)
assert any("silent worker" in p for p in patterns)
def test_communicative_pattern(self):
"""Test detection of highly communicative pattern."""
metrics = AgentMetrics(
agent_id="kimi",
issues_touched={1, 2}, # 2 issues
comments=10, # 5x comments per issue
)
patterns = _detect_patterns(metrics)
assert any("Highly communicative" in p for p in patterns)
def test_token_accumulation_pattern(self):
"""Test detection of token accumulation pattern."""
metrics = AgentMetrics(
agent_id="kimi",
tokens_earned=150,
tokens_spent=10,
)
patterns = _detect_patterns(metrics)
assert any("Strong token accumulation" in p for p in patterns)
def test_token_spend_pattern(self):
"""Test detection of high token spend pattern."""
metrics = AgentMetrics(
agent_id="kimi",
tokens_earned=10,
tokens_spent=100,
)
patterns = _detect_patterns(metrics)
assert any("High token spend" in p for p in patterns)
class TestGenerateNarrative:
"""Test narrative bullet generation."""
def test_empty_metrics_narrative(self):
"""Test narrative for empty metrics mentions no activity."""
metrics = AgentMetrics(agent_id="kimi")
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
assert len(bullets) == 1
assert "No recorded activity" in bullets[0]
def test_activity_summary_narrative(self):
"""Test narrative includes activity summary."""
metrics = AgentMetrics(
agent_id="kimi",
commits=5,
prs_opened={1, 2},
prs_merged={1},
)
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
activity_bullet = next((b for b in bullets if "Active across" in b), None)
assert activity_bullet is not None
assert "5 commits" in activity_bullet
assert "2 PRs opened" in activity_bullet
assert "1 PR merged" in activity_bullet
def test_tests_affected_narrative(self):
"""Test narrative includes tests affected."""
metrics = AgentMetrics(
agent_id="kimi",
tests_affected={"test_a.py", "test_b.py"},
)
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
assert any("2 test files" in b for b in bullets)
def test_tokens_earned_narrative(self):
"""Test narrative includes token earnings."""
metrics = AgentMetrics(
agent_id="kimi",
tokens_earned=100,
tokens_spent=20,
)
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
assert any("Net earned 80 tokens" in b for b in bullets)
def test_tokens_spent_narrative(self):
"""Test narrative includes token spending."""
metrics = AgentMetrics(
agent_id="kimi",
tokens_earned=20,
tokens_spent=100,
)
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
assert any("Net spent 80 tokens" in b for b in bullets)
def test_balanced_tokens_narrative(self):
"""Test narrative for balanced token flow."""
metrics = AgentMetrics(
agent_id="kimi",
tokens_earned=100,
tokens_spent=100,
)
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
assert any("Balanced token flow" in b for b in bullets)
class TestScorecardSummary:
"""Test ScorecardSummary dataclass."""
def test_to_dict_structure(self):
"""Test to_dict returns expected structure."""
metrics = AgentMetrics(
agent_id="kimi",
issues_touched={1, 2},
prs_opened={10, 11},
prs_merged={10},
tokens_earned=100,
tokens_spent=20,
)
summary = ScorecardSummary(
agent_id="kimi",
period_type=PeriodType.daily,
period_start=datetime.now(UTC),
period_end=datetime.now(UTC),
metrics=metrics,
narrative_bullets=["Test bullet"],
patterns=["Test pattern"],
)
data = summary.to_dict()
assert data["agent_id"] == "kimi"
assert data["period_type"] == "daily"
assert "metrics" in data
assert data["metrics"]["issues_touched"] == 2
assert data["metrics"]["prs_opened"] == 2
assert data["metrics"]["prs_merged"] == 1
assert data["metrics"]["pr_merge_rate"] == 0.5
assert data["metrics"]["tokens_earned"] == 100
assert data["metrics"]["token_net"] == 80
assert data["narrative_bullets"] == ["Test bullet"]
assert data["patterns"] == ["Test pattern"]
class TestQueryTokenTransactions:
"""Test token transaction querying."""
def test_empty_ledger(self):
"""Test empty ledger returns zero values."""
with patch("lightning.ledger.get_transactions", return_value=[]):
earned, spent = _query_token_transactions("kimi", datetime.now(UTC), datetime.now(UTC))
assert earned == 0
assert spent == 0
def test_ledger_with_transactions(self):
"""Test ledger aggregation of transactions."""
now = datetime.now(UTC)
mock_tx = [
MagicMock(
agent_id="kimi",
tx_type=MagicMock(value="incoming"),
amount_sats=100,
created_at=now.isoformat(),
),
MagicMock(
agent_id="kimi",
tx_type=MagicMock(value="outgoing"),
amount_sats=30,
created_at=now.isoformat(),
),
]
with patch("lightning.ledger.get_transactions", return_value=mock_tx):
earned, spent = _query_token_transactions(
"kimi", now - timedelta(hours=1), now + timedelta(hours=1)
)
assert earned == 100
assert spent == 30
def test_ledger_filters_by_agent(self):
"""Test ledger filters transactions by agent_id."""
now = datetime.now(UTC)
mock_tx = [
MagicMock(
agent_id="claude",
tx_type=MagicMock(value="incoming"),
amount_sats=100,
created_at=now.isoformat(),
),
]
with patch("lightning.ledger.get_transactions", return_value=mock_tx):
earned, spent = _query_token_transactions(
"kimi", now - timedelta(hours=1), now + timedelta(hours=1)
)
assert earned == 0 # Transaction was for claude, not kimi
def test_ledger_filters_by_time(self):
"""Test ledger filters transactions by time range."""
now = datetime.now(UTC)
old_time = now - timedelta(days=2)
mock_tx = [
MagicMock(
agent_id="kimi",
tx_type=MagicMock(value="incoming"),
amount_sats=100,
created_at=old_time.isoformat(),
),
]
with patch("lightning.ledger.get_transactions", return_value=mock_tx):
# Query for today only
earned, spent = _query_token_transactions(
"kimi", now - timedelta(hours=1), now + timedelta(hours=1)
)
assert earned == 0 # Transaction was 2 days ago
class TestGenerateScorecard:
"""Test scorecard generation."""
def test_generate_scorecard_no_activity(self):
"""Test scorecard generation for agent with no activity."""
with patch(
"dashboard.services.scorecard_service._collect_events_for_period", return_value=[]
):
with patch(
"dashboard.services.scorecard_service._query_token_transactions",
return_value=(0, 0),
):
scorecard = generate_scorecard("kimi", PeriodType.daily)
assert scorecard is not None
assert scorecard.agent_id == "kimi"
assert scorecard.period_type == PeriodType.daily
assert len(scorecard.narrative_bullets) == 1
assert "No recorded activity" in scorecard.narrative_bullets[0]
def test_generate_scorecard_with_activity(self):
"""Test scorecard generation includes activity."""
events = [
Event(type="gitea.push", source="gitea", data={"actor": "kimi", "num_commits": 5}),
]
with patch(
"dashboard.services.scorecard_service._collect_events_for_period", return_value=events
):
with patch(
"dashboard.services.scorecard_service._query_token_transactions",
return_value=(100, 20),
):
scorecard = generate_scorecard("kimi", PeriodType.daily)
assert scorecard is not None
assert scorecard.metrics.commits == 5
assert scorecard.metrics.tokens_earned == 100
assert scorecard.metrics.tokens_spent == 20
class TestGenerateAllScorecards:
"""Test generating scorecards for all agents."""
def test_generates_for_all_tracked_agents(self):
"""Test all tracked agents get scorecards even with no activity."""
with patch(
"dashboard.services.scorecard_service._collect_events_for_period", return_value=[]
):
with patch(
"dashboard.services.scorecard_service._query_token_transactions",
return_value=(0, 0),
):
scorecards = generate_all_scorecards(PeriodType.daily)
agent_ids = {s.agent_id for s in scorecards}
expected = {"kimi", "claude", "gemini", "hermes", "manus"}
assert expected.issubset(agent_ids)
def test_scorecards_sorted(self):
"""Test scorecards are sorted by agent_id."""
with patch(
"dashboard.services.scorecard_service._collect_events_for_period", return_value=[]
):
with patch(
"dashboard.services.scorecard_service._query_token_transactions",
return_value=(0, 0),
):
scorecards = generate_all_scorecards(PeriodType.daily)
agent_ids = [s.agent_id for s in scorecards]
assert agent_ids == sorted(agent_ids)
class TestScorecardRoutes:
"""Test scorecard API routes."""
def test_list_agents_endpoint(self, client):
"""Test GET /scorecards/api/agents returns tracked agents."""
response = client.get("/scorecards/api/agents")
assert response.status_code == 200
data = response.json()
assert "agents" in data
assert "kimi" in data["agents"]
assert "claude" in data["agents"]
def test_get_scorecard_endpoint(self, client):
"""Test GET /scorecards/api/{agent_id} returns scorecard."""
with patch("dashboard.routes.scorecards.generate_scorecard") as mock_generate:
mock_generate.return_value = ScorecardSummary(
agent_id="kimi",
period_type=PeriodType.daily,
period_start=datetime.now(UTC),
period_end=datetime.now(UTC),
metrics=AgentMetrics(agent_id="kimi"),
narrative_bullets=["Test bullet"],
patterns=[],
)
response = client.get("/scorecards/api/kimi?period=daily")
assert response.status_code == 200
data = response.json()
assert data["agent_id"] == "kimi"
assert data["period_type"] == "daily"
def test_get_scorecard_invalid_period(self, client):
"""Test GET with invalid period returns 400."""
response = client.get("/scorecards/api/kimi?period=invalid")
assert response.status_code == 400
assert "error" in response.json()
def test_get_all_scorecards_endpoint(self, client):
"""Test GET /scorecards/api returns all scorecards."""
with patch("dashboard.routes.scorecards.generate_all_scorecards") as mock_generate:
mock_generate.return_value = [
ScorecardSummary(
agent_id="kimi",
period_type=PeriodType.daily,
period_start=datetime.now(UTC),
period_end=datetime.now(UTC),
metrics=AgentMetrics(agent_id="kimi"),
narrative_bullets=[],
patterns=[],
),
]
response = client.get("/scorecards/api?period=daily")
assert response.status_code == 200
data = response.json()
assert data["period"] == "daily"
assert "scorecards" in data
assert len(data["scorecards"]) == 1
def test_scorecards_page_renders(self, client):
"""Test GET /scorecards returns HTML page."""
response = client.get("/scorecards")
assert response.status_code == 200
assert "text/html" in response.headers.get("content-type", "")
assert "AGENT SCORECARDS" in response.text
def test_scorecard_panel_renders(self, client):
"""Test GET /scorecards/panel/{agent_id} returns HTML."""
with patch("dashboard.routes.scorecards.generate_scorecard") as mock_generate:
mock_generate.return_value = ScorecardSummary(
agent_id="kimi",
period_type=PeriodType.daily,
period_start=datetime.now(UTC),
period_end=datetime.now(UTC),
metrics=AgentMetrics(agent_id="kimi", commits=5),
narrative_bullets=["Active across 5 commits this day."],
patterns=["High activity"],
)
response = client.get("/scorecards/panel/kimi?period=daily")
assert response.status_code == 200
assert "text/html" in response.headers.get("content-type", "")
assert "Kimi" in response.text
def test_all_panels_renders(self, client):
"""Test GET /scorecards/all/panels returns HTML with all panels."""
with patch("dashboard.routes.scorecards.generate_all_scorecards") as mock_generate:
mock_generate.return_value = [
ScorecardSummary(
agent_id="kimi",
period_type=PeriodType.daily,
period_start=datetime.now(UTC),
period_end=datetime.now(UTC),
metrics=AgentMetrics(agent_id="kimi"),
narrative_bullets=[],
patterns=[],
),
]
response = client.get("/scorecards/all/panels?period=daily")
assert response.status_code == 200
assert "text/html" in response.headers.get("content-type", "")

View File

@@ -0,0 +1,489 @@
"""Unit tests for the quest system.
Tests quest definitions, progress tracking, completion detection,
and token rewards.
"""
from __future__ import annotations
import pytest
from timmy.quest_system import (
QuestDefinition,
QuestProgress,
QuestStatus,
QuestType,
_is_on_cooldown,
claim_quest_reward,
evaluate_quest_progress,
get_or_create_progress,
get_quest_definition,
get_quest_leaderboard,
load_quest_config,
reset_quest_progress,
update_quest_progress,
)
@pytest.fixture(autouse=True)
def clean_quest_state():
"""Reset quest progress between tests."""
reset_quest_progress()
yield
reset_quest_progress()
@pytest.fixture
def sample_issue_count_quest():
"""Create a sample issue_count quest definition."""
return QuestDefinition(
id="test_close_issues",
name="Test Issue Closer",
description="Close 3 test issues",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=False,
cooldown_hours=0,
criteria={"target_count": 3, "issue_labels": ["test"]},
notification_message="Test quest complete! Earned {tokens} tokens.",
)
@pytest.fixture
def sample_daily_run_quest():
"""Create a sample daily_run quest definition."""
return QuestDefinition(
id="test_daily_run",
name="Test Daily Runner",
description="Complete 5 sessions",
reward_tokens=250,
quest_type=QuestType.DAILY_RUN,
enabled=True,
repeatable=True,
cooldown_hours=24,
criteria={"min_sessions": 5},
notification_message="Daily run quest complete! Earned {tokens} tokens.",
)
# ── Quest Definition Tests ───────────────────────────────────────────────
class TestQuestDefinition:
def test_from_dict_minimal(self):
data = {"id": "test_quest", "name": "Test Quest"}
quest = QuestDefinition.from_dict(data)
assert quest.id == "test_quest"
assert quest.name == "Test Quest"
assert quest.quest_type == QuestType.CUSTOM
assert quest.enabled is True
def test_from_dict_full(self):
data = {
"id": "full_quest",
"name": "Full Quest",
"description": "A test quest",
"reward_tokens": 500,
"type": "issue_count",
"enabled": False,
"repeatable": True,
"cooldown_hours": 12,
"criteria": {"target_count": 5},
"notification_message": "Done!",
}
quest = QuestDefinition.from_dict(data)
assert quest.id == "full_quest"
assert quest.reward_tokens == 500
assert quest.quest_type == QuestType.ISSUE_COUNT
assert quest.enabled is False
assert quest.repeatable is True
assert quest.cooldown_hours == 12
# ── Quest Progress Tests ─────────────────────────────────────────────────
class TestQuestProgress:
def test_progress_creation(self):
progress = QuestProgress(
quest_id="test_quest",
agent_id="test_agent",
status=QuestStatus.NOT_STARTED,
)
assert progress.quest_id == "test_quest"
assert progress.agent_id == "test_agent"
assert progress.current_value == 0
def test_progress_to_dict(self):
progress = QuestProgress(
quest_id="test_quest",
agent_id="test_agent",
status=QuestStatus.IN_PROGRESS,
current_value=2,
target_value=5,
)
data = progress.to_dict()
assert data["quest_id"] == "test_quest"
assert data["status"] == "in_progress"
assert data["current_value"] == 2
# ── Quest Loading Tests ──────────────────────────────────────────────────
class TestQuestLoading:
def test_load_quest_config(self):
definitions, settings = load_quest_config()
assert isinstance(definitions, dict)
assert isinstance(settings, dict)
def test_get_quest_definition_exists(self):
# Should return None for non-existent quest in fresh state
quest = get_quest_definition("nonexistent")
# The function returns from loaded config, which may have quests
# or be empty if config doesn't exist
assert quest is None or isinstance(quest, QuestDefinition)
def test_get_quest_definition_not_found(self):
quest = get_quest_definition("definitely_not_a_real_quest_12345")
assert quest is None
# ── Quest Progress Management Tests ─────────────────────────────────────
class TestQuestProgressManagement:
def test_get_or_create_progress_new(self):
# First create a quest definition
quest = QuestDefinition(
id="progress_test",
name="Progress Test",
description="Test quest",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=False,
cooldown_hours=0,
criteria={"target_count": 3},
notification_message="Done!",
)
# Need to inject into the definitions dict
from timmy.quest_system import _quest_definitions
_quest_definitions["progress_test"] = quest
progress = get_or_create_progress("progress_test", "agent1")
assert progress.quest_id == "progress_test"
assert progress.agent_id == "agent1"
assert progress.status == QuestStatus.NOT_STARTED
assert progress.target_value == 3
del _quest_definitions["progress_test"]
def test_update_quest_progress(self):
quest = QuestDefinition(
id="update_test",
name="Update Test",
description="Test quest",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=False,
cooldown_hours=0,
criteria={"target_count": 3},
notification_message="Done!",
)
from timmy.quest_system import _quest_definitions
_quest_definitions["update_test"] = quest
# Create initial progress
progress = get_or_create_progress("update_test", "agent1")
assert progress.current_value == 0
# Update progress
updated = update_quest_progress("update_test", "agent1", 2)
assert updated.current_value == 2
assert updated.status == QuestStatus.NOT_STARTED
# Complete the quest
completed = update_quest_progress("update_test", "agent1", 3)
assert completed.current_value == 3
assert completed.status == QuestStatus.COMPLETED
assert completed.completed_at != ""
del _quest_definitions["update_test"]
# ── Quest Evaluation Tests ───────────────────────────────────────────────
class TestQuestEvaluation:
def test_evaluate_issue_count_quest(self):
quest = QuestDefinition(
id="eval_test",
name="Eval Test",
description="Test quest",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=False,
cooldown_hours=0,
criteria={"target_count": 2, "issue_labels": ["test"]},
notification_message="Done!",
)
from timmy.quest_system import _quest_definitions
_quest_definitions["eval_test"] = quest
# Simulate closed issues
closed_issues = [
{"id": 1, "labels": [{"name": "test"}]},
{"id": 2, "labels": [{"name": "test"}, {"name": "bug"}]},
{"id": 3, "labels": [{"name": "other"}]},
]
context = {"closed_issues": closed_issues}
progress = evaluate_quest_progress("eval_test", "agent1", context)
assert progress is not None
assert progress.current_value == 2 # Two issues with 'test' label
del _quest_definitions["eval_test"]
def test_evaluate_issue_reduce_quest(self):
quest = QuestDefinition(
id="reduce_test",
name="Reduce Test",
description="Test quest",
reward_tokens=200,
quest_type=QuestType.ISSUE_REDUCE,
enabled=True,
repeatable=False,
cooldown_hours=0,
criteria={"target_reduction": 2},
notification_message="Done!",
)
from timmy.quest_system import _quest_definitions
_quest_definitions["reduce_test"] = quest
context = {"previous_issue_count": 10, "current_issue_count": 7}
progress = evaluate_quest_progress("reduce_test", "agent1", context)
assert progress is not None
assert progress.current_value == 3 # Reduced by 3
del _quest_definitions["reduce_test"]
def test_evaluate_daily_run_quest(self):
quest = QuestDefinition(
id="daily_test",
name="Daily Test",
description="Test quest",
reward_tokens=250,
quest_type=QuestType.DAILY_RUN,
enabled=True,
repeatable=True,
cooldown_hours=24,
criteria={"min_sessions": 5},
notification_message="Done!",
)
from timmy.quest_system import _quest_definitions
_quest_definitions["daily_test"] = quest
context = {"sessions_completed": 5}
progress = evaluate_quest_progress("daily_test", "agent1", context)
assert progress is not None
assert progress.current_value == 5
assert progress.status == QuestStatus.COMPLETED
del _quest_definitions["daily_test"]
# ── Quest Cooldown Tests ─────────────────────────────────────────────────
class TestQuestCooldown:
def test_is_on_cooldown_no_cooldown(self):
quest = QuestDefinition(
id="cooldown_test",
name="Cooldown Test",
description="Test quest",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=True,
cooldown_hours=24,
criteria={},
notification_message="Done!",
)
progress = QuestProgress(
quest_id="cooldown_test",
agent_id="agent1",
status=QuestStatus.CLAIMED,
)
# No last_completed_at means no cooldown
assert _is_on_cooldown(progress, quest) is False
# ── Quest Reward Tests ───────────────────────────────────────────────────
class TestQuestReward:
def test_claim_quest_reward_not_completed(self):
quest = QuestDefinition(
id="reward_test",
name="Reward Test",
description="Test quest",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=False,
cooldown_hours=0,
criteria={"target_count": 3},
notification_message="Done!",
)
from timmy.quest_system import _quest_definitions, _quest_progress
_quest_definitions["reward_test"] = quest
# Create progress but don't complete
progress = get_or_create_progress("reward_test", "agent1")
_quest_progress["agent1:reward_test"] = progress
# Try to claim - should fail
reward = claim_quest_reward("reward_test", "agent1")
assert reward is None
del _quest_definitions["reward_test"]
# ── Leaderboard Tests ────────────────────────────────────────────────────
class TestQuestLeaderboard:
def test_get_quest_leaderboard_empty(self):
reset_quest_progress()
leaderboard = get_quest_leaderboard()
assert leaderboard == []
def test_get_quest_leaderboard_with_data(self):
# Create and complete a quest for two agents
quest = QuestDefinition(
id="leaderboard_test",
name="Leaderboard Test",
description="Test quest",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=True,
cooldown_hours=0,
criteria={"target_count": 1},
notification_message="Done!",
)
from timmy.quest_system import _quest_definitions, _quest_progress
_quest_definitions["leaderboard_test"] = quest
# Create progress for agent1 with 2 completions
progress1 = QuestProgress(
quest_id="leaderboard_test",
agent_id="agent1",
status=QuestStatus.NOT_STARTED,
completion_count=2,
)
_quest_progress["agent1:leaderboard_test"] = progress1
# Create progress for agent2 with 1 completion
progress2 = QuestProgress(
quest_id="leaderboard_test",
agent_id="agent2",
status=QuestStatus.NOT_STARTED,
completion_count=1,
)
_quest_progress["agent2:leaderboard_test"] = progress2
leaderboard = get_quest_leaderboard()
assert len(leaderboard) == 2
# agent1 should be first (more tokens)
assert leaderboard[0]["agent_id"] == "agent1"
assert leaderboard[0]["total_tokens"] == 200
assert leaderboard[1]["agent_id"] == "agent2"
assert leaderboard[1]["total_tokens"] == 100
del _quest_definitions["leaderboard_test"]
# ── Quest Reset Tests ─────────────────────────────────────────────────────
class TestQuestReset:
def test_reset_quest_progress_all(self):
# Create some progress entries
progress1 = QuestProgress(
quest_id="quest1", agent_id="agent1", status=QuestStatus.NOT_STARTED
)
progress2 = QuestProgress(
quest_id="quest2", agent_id="agent2", status=QuestStatus.NOT_STARTED
)
from timmy.quest_system import _quest_progress
_quest_progress["agent1:quest1"] = progress1
_quest_progress["agent2:quest2"] = progress2
assert len(_quest_progress) == 2
count = reset_quest_progress()
assert count == 2
assert len(_quest_progress) == 0
def test_reset_quest_progress_specific_quest(self):
progress1 = QuestProgress(
quest_id="quest1", agent_id="agent1", status=QuestStatus.NOT_STARTED
)
progress2 = QuestProgress(
quest_id="quest2", agent_id="agent1", status=QuestStatus.NOT_STARTED
)
from timmy.quest_system import _quest_progress
_quest_progress["agent1:quest1"] = progress1
_quest_progress["agent1:quest2"] = progress2
count = reset_quest_progress(quest_id="quest1")
assert count == 1
assert "agent1:quest1" not in _quest_progress
assert "agent1:quest2" in _quest_progress
def test_reset_quest_progress_specific_agent(self):
progress1 = QuestProgress(
quest_id="quest1", agent_id="agent1", status=QuestStatus.NOT_STARTED
)
progress2 = QuestProgress(
quest_id="quest1", agent_id="agent2", status=QuestStatus.NOT_STARTED
)
from timmy.quest_system import _quest_progress
_quest_progress["agent1:quest1"] = progress1
_quest_progress["agent2:quest1"] = progress2
count = reset_quest_progress(agent_id="agent1")
assert count == 1
assert "agent1:quest1" not in _quest_progress
assert "agent2:quest1" in _quest_progress