Compare commits

...

5 Commits

Author SHA1 Message Date
kimi
919a011cae feat: adapt token rewards based on system stress signals (#714)
Implements adaptive token rewards that respond to system stress:

- StressDetector module (timmy/stress_detector.py):
  - Monitors 4 stress signals: flaky test rate, P1 backlog growth,
    CI failure rate, open bug count
  - Calculates weighted stress score (0-1) and determines mode:
    calm (<0.3), elevated (0.3-0.6), high (>0.6)
  - Applies quest-specific multipliers based on current mode

- Configuration (config/stress_modes.yaml):
  - Thresholds for mode transitions
  - Signal weights and thresholds
  - Multipliers per mode (e.g., test_improve: 1.5x in high stress)

- Quest system integration:
  - Rewards now include stress bonus/penalty in notification
  - Quest status API includes adjusted_reward and multiplier
  - Agent can see current stress mode and why rewards changed

- API endpoints:
  - GET /quests/api/stress - current stress mode and signals
  - POST /quests/api/stress/refresh - force refresh stress detection

Fixes #714
2026-03-21 17:26:40 -04:00
a95cf806c8 [kimi] Implement token quest system for agents (#713) (#789)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-21 20:45:35 +00:00
19367d6e41 [kimi] OpenClaw architecture and deployment research report (#721) (#788)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-21 20:36:23 +00:00
7e983fcdb3 [kimi] Add dashboard card for Daily Run and triage metrics (#718) (#786)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-21 19:58:25 +00:00
46f89d59db [kimi] Add Golden Path generator for longer sessions (#717) (#785)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-21 19:41:34 +00:00
17 changed files with 5379 additions and 0 deletions

178
config/quests.yaml Normal file
View File

@@ -0,0 +1,178 @@
# ── Token Quest System Configuration ─────────────────────────────────────────
#
# Quests are special objectives that agents (and humans) can complete for
# bonus tokens. Each quest has:
# - id: Unique identifier
# - name: Display name
# - description: What the quest requires
# - reward_tokens: Number of tokens awarded on completion
# - criteria: Detection rules for completion
# - enabled: Whether this quest is active
# - repeatable: Whether this quest can be completed multiple times
# - cooldown_hours: Minimum hours between completions (if repeatable)
#
# Quest Types:
# - issue_count: Complete when N issues matching criteria are closed
# - issue_reduce: Complete when open issue count drops by N
# - docs_update: Complete when documentation files are updated
# - test_improve: Complete when test coverage/cases improve
# - daily_run: Complete Daily Run session objectives
# - custom: Special quests with manual completion
#
# ── Active Quests ─────────────────────────────────────────────────────────────
quests:
# ── Daily Run & Test Improvement Quests ───────────────────────────────────
close_flaky_tests:
id: close_flaky_tests
name: Flaky Test Hunter
description: Close 3 issues labeled "flaky-test"
reward_tokens: 150
type: issue_count
enabled: true
repeatable: true
cooldown_hours: 24
criteria:
issue_labels:
- flaky-test
target_count: 3
issue_state: closed
lookback_days: 7
notification_message: "Quest Complete! You closed 3 flaky-test issues and earned {tokens} tokens."
reduce_p1_issues:
id: reduce_p1_issues
name: Priority Firefighter
description: Reduce open P1 Daily Run issues by 2
reward_tokens: 200
type: issue_reduce
enabled: true
repeatable: true
cooldown_hours: 48
criteria:
issue_labels:
- layer:triage
- P1
target_reduction: 2
lookback_days: 3
notification_message: "Quest Complete! You reduced P1 issues by 2 and earned {tokens} tokens."
improve_test_coverage:
id: improve_test_coverage
name: Coverage Champion
description: Improve test coverage by 5% or add 10 new test cases
reward_tokens: 300
type: test_improve
enabled: true
repeatable: false
criteria:
coverage_increase_percent: 5
min_new_tests: 10
notification_message: "Quest Complete! You improved test coverage and earned {tokens} tokens."
complete_daily_run_session:
id: complete_daily_run_session
name: Daily Runner
description: Successfully complete 5 Daily Run sessions in a week
reward_tokens: 250
type: daily_run
enabled: true
repeatable: true
cooldown_hours: 168 # 1 week
criteria:
min_sessions: 5
lookback_days: 7
notification_message: "Quest Complete! You completed 5 Daily Run sessions and earned {tokens} tokens."
# ── Documentation & Maintenance Quests ────────────────────────────────────
improve_automation_docs:
id: improve_automation_docs
name: Documentation Hero
description: Improve documentation for automations (update 3+ doc files)
reward_tokens: 100
type: docs_update
enabled: true
repeatable: true
cooldown_hours: 72
criteria:
file_patterns:
- "docs/**/*.md"
- "**/README.md"
- "timmy_automations/**/*.md"
min_files_changed: 3
lookback_days: 7
notification_message: "Quest Complete! You improved automation docs and earned {tokens} tokens."
close_micro_fixes:
id: close_micro_fixes
name: Micro Fix Master
description: Close 5 issues labeled "layer:micro-fix"
reward_tokens: 125
type: issue_count
enabled: true
repeatable: true
cooldown_hours: 24
criteria:
issue_labels:
- layer:micro-fix
target_count: 5
issue_state: closed
lookback_days: 7
notification_message: "Quest Complete! You closed 5 micro-fix issues and earned {tokens} tokens."
# ── Special Achievements ──────────────────────────────────────────────────
first_contribution:
id: first_contribution
name: First Steps
description: Make your first contribution (close any issue)
reward_tokens: 50
type: issue_count
enabled: true
repeatable: false
criteria:
target_count: 1
issue_state: closed
lookback_days: 30
notification_message: "Welcome! You completed your first contribution and earned {tokens} tokens."
bug_squasher:
id: bug_squasher
name: Bug Squasher
description: Close 10 issues labeled "bug"
reward_tokens: 500
type: issue_count
enabled: true
repeatable: true
cooldown_hours: 168 # 1 week
criteria:
issue_labels:
- bug
target_count: 10
issue_state: closed
lookback_days: 7
notification_message: "Quest Complete! You squashed 10 bugs and earned {tokens} tokens."
# ── Quest System Settings ───────────────────────────────────────────────────
settings:
# Enable/disable quest notifications
notifications_enabled: true
# Maximum number of concurrent active quests per agent
max_concurrent_quests: 5
# Auto-detect quest completions on Daily Run metrics update
auto_detect_on_daily_run: true
# Gitea issue labels that indicate quest-related work
quest_work_labels:
- layer:triage
- layer:micro-fix
- layer:tests
- layer:economy
- flaky-test
- bug
- documentation

98
config/stress_modes.yaml Normal file
View File

@@ -0,0 +1,98 @@
# ── System Stress Modes Configuration ────────────────────────────────────────
#
# This configuration defines how token rewards adapt based on system stress.
# When the system detects elevated stress (flaky tests, growing backlog,
# CI failures), quest rewards are adjusted to incentivize agents to focus
# on the most critical areas.
#
# ── How It Works ─────────────────────────────────────────────────────────────
#
# 1. SIGNALS: System metrics are monitored continuously
# 2. SCORE: Weighted contributions from triggered signals create a stress score
# 3. MODE: Score determines the stress mode (calm, elevated, high)
# 4. MULTIPLIERS: Token rewards are multiplied based on the current mode
#
# ── Stress Thresholds ────────────────────────────────────────────────────────
thresholds:
# Minimum score to enter elevated mode (0.0 - 1.0)
elevated_min: 0.3
# Minimum score to enter high stress mode (0.0 - 1.0)
high_min: 0.6
# ── Stress Signals ───────────────────────────────────────────────────────────
#
# Each signal has:
# - threshold: Value at which signal is considered "triggered"
# - weight: Contribution to overall stress score (should sum to ~1.0)
signals:
flaky_test_rate:
threshold: 0.15 # 15% of tests showing flakiness
weight: 0.30
description: "Percentage of test runs that are flaky"
p1_backlog_growth:
threshold: 5 # 5 new P1 issues in lookback period
weight: 0.25
description: "Net growth in P1 priority issues over 7 days"
ci_failure_rate:
threshold: 0.20 # 20% of CI runs failing
weight: 0.25
description: "Percentage of CI runs failing in lookback period"
open_bug_count:
threshold: 20 # 20 open bugs
weight: 0.20
description: "Total open issues labeled as 'bug'"
# ── Token Multipliers ────────────────────────────────────────────────────────
#
# Multipliers are applied to quest rewards based on current stress mode.
# Values > 1.0 increase rewards, < 1.0 decrease rewards.
#
# Quest types:
# - test_improve: Test coverage/quality improvements
# - docs_update: Documentation updates
# - issue_count: Closing specific issue types
# - issue_reduce: Reducing overall issue backlog
# - daily_run: Daily Run session completion
# - custom: Special/manual quests
# - exploration: Exploratory work
# - refactor: Code refactoring
multipliers:
calm:
# Calm periods: incentivize maintenance and exploration
test_improve: 1.0
docs_update: 1.2
issue_count: 1.0
issue_reduce: 1.0
daily_run: 1.0
custom: 1.0
exploration: 1.3
refactor: 1.2
elevated:
# Elevated stress: start emphasizing stability
test_improve: 1.2
docs_update: 1.0
issue_count: 1.1
issue_reduce: 1.1
daily_run: 1.0
custom: 1.0
exploration: 1.0
refactor: 0.9 # Discourage risky changes
high:
# High stress: crisis mode, focus on stabilization
test_improve: 1.5 # Strongly incentivize testing
docs_update: 0.8 # Deprioritize docs
issue_count: 1.3 # Reward closing issues
issue_reduce: 1.4 # Strongly reward reducing backlog
daily_run: 1.1
custom: 1.0
exploration: 0.7 # Discourage exploration
refactor: 0.6 # Discourage refactors during crisis

View File

@@ -0,0 +1,912 @@
# OpenClaw Architecture, Deployment Modes, and Ollama Integration
## Research Report for Timmy Time Dashboard Project
**Issue:** #721 — [Kimi Research] OpenClaw architecture, deployment modes, and Ollama integration
**Date:** 2026-03-21
**Author:** Kimi (Moonshot AI)
**Status:** Complete
---
## Executive Summary
OpenClaw is an open-source AI agent framework that bridges messaging platforms (WhatsApp, Telegram, Slack, Discord, iMessage) to AI coding agents through a centralized gateway. Originally known as Clawdbot and Moltbot, it was rebranded to OpenClaw in early 2026. This report provides a comprehensive analysis of OpenClaw's architecture, deployment options, Ollama integration capabilities, and suitability for deployment on resource-constrained VPS environments like the Hermes DigitalOcean droplet (2GB RAM / 1 vCPU).
**Key Finding:** Running OpenClaw with local LLMs on a 2GB RAM VPS is **not recommended**. The absolute minimum for a text-only agent with external API models is 4GB RAM. For local model inference via Ollama, 8-16GB RAM is the practical minimum. A hybrid approach using OpenRouter as the primary provider with Ollama as fallback is the most viable configuration for small VPS deployments.
---
## 1. Architecture Overview
### 1.1 Core Components
OpenClaw follows a **hub-and-spoke (轴辐式)** architecture optimized for multi-agent task execution:
```
┌─────────────────────────────────────────────────────────────────────────┐
│ OPENCLAW ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ WhatsApp │ │ Telegram │ │ Discord │ │
│ │ Channel │ │ Channel │ │ Channel │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └────────────────────┼────────────────────┘ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Gateway │◄─────── WebSocket/API │
│ │ (Port 18789) │ Control Plane │
│ └────────┬─────────┘ │
│ │ │
│ ┌──────────────┼──────────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Agent A │ │ Agent B │ │ Pi Agent│ │
│ │ (main) │ │ (coder) │ │(delegate)│ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ ▼ │
│ ┌────────────────────────┐ │
│ │ LLM Router │ │
│ │ (Primary/Fallback) │ │
│ └───────────┬────────────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Ollama │ │ OpenAI │ │Anthropic│ │
│ │(local) │ │(cloud) │ │(cloud) │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │ ┌─────┐ │
│ └────────────────────────────────────────────────────►│ MCP │ │
│ │Tools│ │
│ └─────┘ │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Memory │ │ Skills │ │ Workspace │ │
│ │ (SOUL.md) │ │ (SKILL.md) │ │ (sessions) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
### 1.2 Component Deep Dive
| Component | Purpose | Configuration File |
|-----------|---------|-------------------|
| **Gateway** | Central control plane, WebSocket/API server, session management | `gateway` section in `openclaw.json` |
| **Pi Agent** | Core agent runner, "指挥中心" - schedules LLM calls, tool execution, error handling | `agents` section in `openclaw.json` |
| **Channels** | Messaging platform integrations (Telegram, WhatsApp, Slack, Discord, iMessage) | `channels` section in `openclaw.json` |
| **SOUL.md** | Agent persona definition - personality, communication style, behavioral guidelines | `~/.openclaw/workspace/SOUL.md` |
| **AGENTS.md** | Multi-agent configuration, routing rules, agent specialization definitions | `~/.openclaw/workspace/AGENTS.md` |
| **Workspace** | File system for agent state, session data, temporary files | `~/.openclaw/workspace/` |
| **Skills** | Bundled tools, prompts, configurations that teach agents specific tasks | `~/.openclaw/workspace/skills/` |
| **Sessions** | Conversation history, context persistence between interactions | `~/.openclaw/agents/<agent>/sessions/` |
| **MCP Tools** | Model Context Protocol integration for external tool access | Via `mcporter` or native MCP |
### 1.3 Agent Runner Execution Flow
According to OpenClaw documentation, a complete agent run follows these stages:
1. **Queuing** - Session-level queue (serializes same-session requests) → Global queue (controls total concurrency)
2. **Preparation** - Parse workspace, provider/model, thinking level parameters
3. **Plugin Loading** - Load relevant skills based on task context
4. **Memory Retrieval** - Fetch relevant context from SOUL.md and conversation history
5. **LLM Inference** - Send prompt to configured provider with tool definitions
6. **Tool Execution** - Execute any tool calls returned by the LLM
7. **Response Generation** - Format and return final response to the channel
8. **Memory Storage** - Persist conversation and results to session storage
---
## 2. Deployment Modes
### 2.1 Comparison Matrix
| Deployment Mode | Best For | Setup Complexity | Resource Overhead | Stability |
|----------------|----------|------------------|-------------------|-----------|
| **npm global** | Development, quick testing | Low | Minimal (~200MB) | Moderate |
| **Docker** | Production, isolation, reproducibility | Medium | Higher (~2.5GB base image) | High |
| **Docker Compose** | Multi-service stacks, complex setups | Medium-High | Higher | High |
| **Bare metal/systemd** | Maximum performance, dedicated hardware | High | Minimal | Moderate |
### 2.2 NPM Global Installation (Recommended for Quick Start)
```bash
# One-line installer
curl -fsSL https://openclaw.ai/install.sh | bash
# Or manual npm install
npm install -g openclaw
# Initialize configuration
openclaw onboard
# Start gateway
openclaw gateway
```
**Pros:**
- Fastest setup (~30 seconds)
- Direct access to host resources
- Easy updates via `npm update -g openclaw`
**Cons:**
- Node.js 22+ dependency required
- No process isolation
- Manual dependency management
### 2.3 Docker Deployment (Recommended for Production)
```bash
# Pull and run
docker pull openclaw/openclaw:latest
docker run -d \
--name openclaw \
-p 127.0.0.1:18789:18789 \
-v ~/.openclaw:/root/.openclaw \
-e ANTHROPIC_API_KEY=sk-ant-... \
openclaw/openclaw:latest
# Or with Docker Compose
docker compose -f compose.yml --env-file .env up -d --build
```
**Docker Compose Configuration (production-ready):**
```yaml
version: '3.8'
services:
openclaw:
image: openclaw/openclaw:latest
container_name: openclaw
restart: unless-stopped
ports:
- "127.0.0.1:18789:18789" # Never expose to 0.0.0.0
volumes:
- ./openclaw-data:/root/.openclaw
- ./workspace:/root/.openclaw/workspace
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- OPENROUTER_API_KEY=${OPENROUTER_API_KEY}
- OLLAMA_API_KEY=ollama-local
networks:
- openclaw-net
# Resource limits for small VPS
deploy:
resources:
limits:
cpus: '1.5'
memory: 3G
reservations:
cpus: '0.5'
memory: 1G
networks:
openclaw-net:
driver: bridge
```
### 2.4 Bare Metal / Systemd Installation
For running as a system service on Linux:
```bash
# Create systemd service
sudo tee /etc/systemd/system/openclaw.service > /dev/null <<EOF
[Unit]
Description=OpenClaw Gateway
After=network.target
[Service]
Type=simple
User=openclaw
Group=openclaw
WorkingDirectory=/home/openclaw
Environment="PATH=/usr/local/bin:/usr/bin:/bin"
Environment="NODE_ENV=production"
Environment="ANTHROPIC_API_KEY=sk-ant-..."
ExecStart=/usr/local/bin/openclaw gateway
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
sudo systemctl daemon-reload
sudo systemctl enable openclaw
sudo systemctl start openclaw
```
### 2.5 Recommended Deployment for 2GB RAM VPS
**⚠️ Critical Finding:** OpenClaw's official minimum is 4GB RAM. On a 2GB VPS:
1. **Do NOT run local LLMs** - Use external API providers exclusively
2. **Use npm installation** - Docker overhead is too heavy
3. **Disable browser automation** - Chromium requires 2-4GB alone
4. **Enable swap** - Critical for preventing OOM kills
5. **Use OpenRouter** - Cheap/free tier models reduce costs
**Setup script for 2GB VPS:**
```bash
#!/bin/bash
# openclaw-minimal-vps.sh
# Setup for 2GB RAM VPS - EXTERNAL API ONLY
# Create 4GB swap
sudo fallocate -l 4G /swapfile
sudo chmod 600 /swapfile
sudo mkswap /swapfile
sudo swapon /swapfile
echo '/swapfile none swap sw 0 0' | sudo tee -a /etc/fstab
# Install Node.js 22
curl -fsSL https://deb.nodesource.com/setup_22.x | sudo bash -
sudo apt-get install -y nodejs
# Install OpenClaw
npm install -g openclaw
# Configure for minimal resource usage
mkdir -p ~/.openclaw
cat > ~/.openclaw/openclaw.json <<'EOF'
{
"gateway": {
"bind": "127.0.0.1",
"port": 18789,
"mode": "local"
},
"agents": {
"defaults": {
"model": {
"primary": "openrouter/google/gemma-3-4b-it:free",
"fallbacks": [
"openrouter/meta/llama-3.1-8b-instruct:free"
]
},
"maxIterations": 15,
"timeout": 120
}
},
"channels": {
"telegram": {
"enabled": true,
"dmPolicy": "pairing"
}
}
}
EOF
# Set OpenRouter API key
export OPENROUTER_API_KEY="sk-or-v1-..."
# Start gateway
openclaw gateway &
```
---
## 3. Ollama Integration
### 3.1 Architecture
OpenClaw integrates with Ollama through its native `/api/chat` endpoint, supporting both streaming responses and tool calling simultaneously:
```
┌──────────────┐ HTTP/JSON ┌──────────────┐ GGUF/CPU/GPU ┌──────────┐
│ OpenClaw │◄───────────────────►│ Ollama │◄────────────────────►│ Local │
│ Gateway │ /api/chat │ Server │ Model inference │ LLM │
│ │ Port 11434 │ Port 11434 │ │ │
└──────────────┘ └──────────────┘ └──────────┘
```
### 3.2 Configuration
**Basic Ollama Setup:**
```bash
# Install Ollama
curl -fsSL https://ollama.com/install.sh | sh
# Start server
ollama serve
# Pull a tool-capable model
ollama pull qwen2.5-coder:7b
ollama pull llama3.1:8b
# Configure OpenClaw
export OLLAMA_API_KEY="ollama-local" # Any non-empty string works
```
**OpenClaw Configuration for Ollama:**
```json
{
"models": {
"providers": {
"ollama": {
"baseUrl": "http://localhost:11434",
"apiKey": "ollama-local",
"api": "ollama",
"models": [
{
"id": "qwen2.5-coder:7b",
"name": "Qwen 2.5 Coder 7B",
"contextWindow": 32768,
"maxTokens": 8192,
"cost": { "input": 0, "output": 0 }
},
{
"id": "llama3.1:8b",
"name": "Llama 3.1 8B",
"contextWindow": 128000,
"maxTokens": 8192,
"cost": { "input": 0, "output": 0 }
}
]
}
}
},
"agents": {
"defaults": {
"model": {
"primary": "ollama/qwen2.5-coder:7b",
"fallbacks": ["ollama/llama3.1:8b"]
}
}
}
}
```
### 3.3 Context Window Requirements
**⚠️ Critical Requirement:** OpenClaw requires a minimum **64K token context window** for reliable multi-step task execution.
| Model | Parameters | Context Window | Tool Support | OpenClaw Compatible |
|-------|-----------|----------------|--------------|---------------------|
| **llama3.1** | 8B | 128K | ✅ Yes | ✅ Yes |
| **qwen2.5-coder** | 7B | 32K | ✅ Yes | ⚠️ Below minimum |
| **qwen2.5-coder** | 32B | 128K | ✅ Yes | ✅ Yes |
| **gpt-oss** | 20B | 128K | ✅ Yes | ✅ Yes |
| **glm-4.7-flash** | - | 128K | ✅ Yes | ✅ Yes |
| **deepseek-coder-v2** | 33B | 128K | ✅ Yes | ✅ Yes |
| **mistral-small3.1** | - | 128K | ✅ Yes | ✅ Yes |
**Context Window Configuration:**
For models that don't report context window via Ollama's API:
```bash
# Create custom Modelfile with extended context
cat > ~/qwen-custom.modelfile <<EOF
FROM qwen2.5-coder:7b
PARAMETER num_ctx 65536
PARAMETER temperature 0.7
EOF
# Create custom model
ollama create qwen2.5-coder-64k -f ~/qwen-custom.modelfile
```
### 3.4 Models for Small VPS (≤8B Parameters)
For resource-constrained environments (2-4GB RAM):
| Model | Quantization | RAM Required | VRAM Required | Performance |
|-------|-------------|--------------|---------------|-------------|
| **Llama 3.1 8B** | Q4_K_M | ~5GB | ~6GB | Good |
| **Llama 3.2 3B** | Q4_K_M | ~2.5GB | ~3GB | Basic |
| **Qwen 2.5 7B** | Q4_K_M | ~5GB | ~6GB | Good |
| **Qwen 2.5 3B** | Q4_K_M | ~2.5GB | ~3GB | Basic |
| **DeepSeek 7B** | Q4_K_M | ~5GB | ~6GB | Good |
| **Phi-4 4B** | Q4_K_M | ~3GB | ~4GB | Moderate |
**⚠️ Verdict for 2GB VPS:** Running local LLMs is **NOT viable**. Use external APIs only.
---
## 4. OpenRouter Integration (Fallback Strategy)
### 4.1 Overview
OpenRouter provides a unified API gateway to multiple LLM providers, enabling:
- Single API key access to 200+ models
- Automatic failover between providers
- Free tier models for cost-conscious deployments
- Unified billing and usage tracking
### 4.2 Configuration
**Environment Variable Setup:**
```bash
export OPENROUTER_API_KEY="sk-or-v1-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
```
**OpenClaw Configuration:**
```json
{
"models": {
"providers": {
"openrouter": {
"apiKey": "${OPENROUTER_API_KEY}",
"baseUrl": "https://openrouter.ai/api/v1"
}
}
},
"agents": {
"defaults": {
"model": {
"primary": "openrouter/anthropic/claude-sonnet-4-6",
"fallbacks": [
"openrouter/google/gemini-3.1-pro",
"openrouter/meta/llama-3.3-70b-instruct",
"openrouter/google/gemma-3-4b-it:free"
]
}
}
}
}
```
### 4.3 Recommended Free/Cheap Models on OpenRouter
For cost-conscious VPS deployments:
| Model | Cost | Context | Best For |
|-------|------|---------|----------|
| **google/gemma-3-4b-it:free** | Free | 128K | General tasks, simple automation |
| **meta/llama-3.1-8b-instruct:free** | Free | 128K | General tasks, longer contexts |
| **deepseek/deepseek-chat-v3.2** | $0.53/M | 64K | Code generation, reasoning |
| **xiaomi/mimo-v2-flash** | $0.40/M | 128K | Fast responses, basic tasks |
| **qwen/qwen3-coder-next** | $1.20/M | 128K | Code-focused tasks |
### 4.4 Hybrid Configuration (Recommended for Timmy)
A production-ready configuration for the Hermes VPS:
```json
{
"models": {
"providers": {
"openrouter": {
"apiKey": "${OPENROUTER_API_KEY}",
"models": [
{
"id": "google/gemma-3-4b-it:free",
"name": "Gemma 3 4B (Free)",
"contextWindow": 131072,
"maxTokens": 8192,
"cost": { "input": 0, "output": 0 }
},
{
"id": "deepseek/deepseek-chat-v3.2",
"name": "DeepSeek V3.2",
"contextWindow": 64000,
"maxTokens": 8192,
"cost": { "input": 0.00053, "output": 0.00053 }
}
]
},
"ollama": {
"baseUrl": "http://localhost:11434",
"apiKey": "ollama-local",
"models": [
{
"id": "llama3.2:3b",
"name": "Llama 3.2 3B (Local Fallback)",
"contextWindow": 128000,
"maxTokens": 4096,
"cost": { "input": 0, "output": 0 }
}
]
}
}
},
"agents": {
"defaults": {
"model": {
"primary": "openrouter/google/gemma-3-4b-it:free",
"fallbacks": [
"openrouter/deepseek/deepseek-chat-v3.2",
"ollama/llama3.2:3b"
]
},
"maxIterations": 10,
"timeout": 90
}
}
}
```
---
## 5. Hardware Constraints & VPS Viability
### 5.1 System Requirements Summary
| Component | Minimum | Recommended | Notes |
|-----------|---------|-------------|-------|
| **CPU** | 2 vCPU | 4 vCPU | Dedicated preferred over shared |
| **RAM** | 4 GB | 8 GB | 2GB causes OOM with external APIs |
| **Storage** | 40 GB SSD | 80 GB NVMe | Docker images are ~10-15GB |
| **Network** | 100 Mbps | 1 Gbps | For API calls and model downloads |
| **OS** | Ubuntu 22.04/Debian 12 | Ubuntu 24.04 LTS | Linux required for production |
### 5.2 2GB RAM VPS Analysis
**Can it work?** Yes, with severe limitations:
**What works:**
- Text-only agents with external API providers
- Single Telegram/Discord channel
- Basic file operations and shell commands
- No browser automation
**What doesn't work:**
- Local LLM inference via Ollama
- Browser automation (Chromium needs 2-4GB)
- Multiple concurrent channels
- Python environment-heavy skills
**Required mitigations for 2GB VPS:**
```bash
# 1. Create substantial swap
sudo fallocate -l 4G /swapfile
sudo chmod 600 /swapfile
sudo mkswap /swapfile
sudo swapon /swapfile
# 2. Configure swappiness
echo 'vm.swappiness=60' | sudo tee -a /etc/sysctl.conf
sudo sysctl -p
# 3. Limit Node.js memory
export NODE_OPTIONS="--max-old-space-size=1536"
# 4. Use external APIs only - NO OLLAMA
# 5. Disable browser skills
# 6. Set conservative concurrency limits
```
### 5.3 4-bit Quantization Viability
**Qwen 2.5 7B Q4_K_M on 2GB VPS:**
- Model size: ~4.5GB
- RAM required at runtime: ~5-6GB
- **Verdict:** Will cause immediate OOM on 2GB VPS
- **Even with 4GB VPS:** Marginal, heavy swap usage, poor performance
**Viable models for 4GB VPS with Ollama:**
- Llama 3.2 3B Q4_K_M (~2.5GB RAM)
- Qwen 2.5 3B Q4_K_M (~2.5GB RAM)
- Phi-4 4B Q4_K_M (~3GB RAM)
---
## 6. Security Configuration
### 6.1 Network Ports
| Port | Purpose | Exposure |
|------|---------|----------|
| **18789/tcp** | OpenClaw Gateway (WebSocket/HTTP) | **NEVER expose to internet** |
| **11434/tcp** | Ollama API (if running locally) | Localhost only |
| **22/tcp** | SSH | Restrict to known IPs |
**⚠️ CRITICAL:** Never expose port 18789 to the public internet. Use Tailscale or SSH tunnels for remote access.
### 6.2 Tailscale Integration
Tailscale provides zero-configuration VPN mesh for secure remote access:
```bash
# Install Tailscale
curl -fsSL https://tailscale.com/install.sh | sh
sudo tailscale up
# Get Tailscale IP
tailscale ip
# Returns: 100.x.y.z
# Configure OpenClaw to bind to Tailscale
cat > ~/.openclaw/openclaw.json <<EOF
{
"gateway": {
"bind": "tailnet",
"port": 18789
},
"tailscale": {
"mode": "on",
"resetOnExit": false
}
}
EOF
```
**Tailscale vs SSH Tunnel:**
| Feature | Tailscale | SSH Tunnel |
|---------|-----------|------------|
| Setup | Very easy | Moderate |
| Persistence | Automatic | Requires autossh |
| Multiple devices | Built-in | One tunnel per connection |
| NAT traversal | Works | Requires exposed SSH |
| Access control | Tailscale ACL | SSH keys |
### 6.3 Firewall Configuration (UFW)
```bash
# Default deny
sudo ufw default deny incoming
sudo ufw default allow outgoing
# Allow SSH
sudo ufw allow 22/tcp
# Allow Tailscale only (if using)
sudo ufw allow in on tailscale0 to any port 18789
# Block public access to OpenClaw
# (bind is 127.0.0.1, so this is defense in depth)
sudo ufw enable
```
### 6.4 Authentication Configuration
```json
{
"gateway": {
"bind": "127.0.0.1",
"port": 18789,
"auth": {
"mode": "token",
"token": "your-64-char-hex-token-here"
},
"controlUi": {
"allowedOrigins": [
"http://localhost:18789",
"https://your-domain.tailnet-name.ts.net"
],
"allowInsecureAuth": false,
"dangerouslyDisableDeviceAuth": false
}
}
}
```
**Generate secure token:**
```bash
openssl rand -hex 32
```
### 6.5 Sandboxing Considerations
OpenClaw executes arbitrary shell commands and file operations by default. For production:
1. **Run as non-root user:**
```bash
sudo useradd -r -s /bin/false openclaw
sudo mkdir -p /home/openclaw/.openclaw
sudo chown -R openclaw:openclaw /home/openclaw
```
2. **Use Docker for isolation:**
```bash
docker run --security-opt=no-new-privileges \
--cap-drop=ALL \
--read-only \
--tmpfs /tmp:noexec,nosuid,size=100m \
openclaw/openclaw:latest
```
3. **Enable dmPolicy for channels:**
```json
{
"channels": {
"telegram": {
"dmPolicy": "pairing" // Require one-time code for new contacts
}
}
}
```
---
## 7. MCP (Model Context Protocol) Tools
### 7.1 Overview
MCP is an open standard created by Anthropic (donated to Linux Foundation in Dec 2025) that lets AI applications connect to external tools through a universal interface. Think of it as "USB-C for AI."
### 7.2 MCP vs OpenClaw Skills
| Aspect | MCP | OpenClaw Skills |
|--------|-----|-----------------|
| **Protocol** | Standardized (Anthropic) | OpenClaw-specific |
| **Isolation** | Process-isolated | Runs in agent context |
| **Security** | Higher (sandboxed) | Lower (full system access) |
| **Discovery** | Automatic via protocol | Manual via SKILL.md |
| **Ecosystem** | 10,000+ servers | 5400+ skills |
**Note:** OpenClaw currently has limited native MCP support. Use `mcporter` tool for MCP integration.
### 7.3 Using MCPorter (MCP Bridge)
```bash
# Install mcporter
clawhub install mcporter
# Configure MCP server
mcporter config add github \
--url "https://api.github.com/mcp" \
--token "ghp_..."
# List available tools
mcporter list
# Call MCP tool
mcporter call github.list_repos --owner "rockachopa"
```
### 7.4 Popular MCP Servers
| Server | Purpose | Integration |
|--------|---------|-------------|
| **GitHub** | Repo management, PRs, issues | `mcp-github` |
| **Slack** | Messaging, channel management | `mcp-slack` |
| **PostgreSQL** | Database queries | `mcp-postgres` |
| **Filesystem** | File operations (sandboxed) | `mcp-filesystem` |
| **Brave Search** | Web search | `mcp-brave` |
---
## 8. Recommendations for Timmy Time Dashboard
### 8.1 Deployment Strategy for Hermes VPS (2GB RAM)
Given the hardware constraints, here's the recommended approach:
**Option A: External API Only (Recommended)**
```
┌─────────────────────────────────────────┐
│ Hermes VPS (2GB RAM) │
│ ┌─────────────────────────────────┐ │
│ │ OpenClaw Gateway │ │
│ │ (npm global install) │ │
│ └─────────────┬───────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ OpenRouter API (Free Tier) │ │
│ │ google/gemma-3-4b-it:free │ │
│ └─────────────────────────────────┘ │
│ │
│ NO OLLAMA - insufficient RAM │
└─────────────────────────────────────────┘
```
**Option B: Hybrid with External Ollama**
```
┌──────────────────────┐ ┌──────────────────────────┐
│ Hermes VPS (2GB) │ │ Separate Ollama Host │
│ ┌────────────────┐ │ │ ┌────────────────────┐ │
│ │ OpenClaw │ │◄────►│ │ Ollama Server │ │
│ │ (external API) │ │ │ │ (8GB+ RAM required)│ │
│ └────────────────┘ │ │ └────────────────────┘ │
└──────────────────────┘ └──────────────────────────┘
```
### 8.2 Configuration Summary
```json
{
"gateway": {
"bind": "127.0.0.1",
"port": 18789,
"auth": {
"mode": "token",
"token": "GENERATE_WITH_OPENSSL_RAND"
}
},
"models": {
"providers": {
"openrouter": {
"apiKey": "${OPENROUTER_API_KEY}",
"models": [
{
"id": "google/gemma-3-4b-it:free",
"contextWindow": 131072,
"maxTokens": 4096
}
]
}
}
},
"agents": {
"defaults": {
"model": {
"primary": "openrouter/google/gemma-3-4b-it:free"
},
"maxIterations": 10,
"timeout": 90,
"maxConcurrent": 2
}
},
"channels": {
"telegram": {
"enabled": true,
"dmPolicy": "pairing"
}
}
}
```
### 8.3 Migration Path (Future)
When upgrading to a larger VPS (4-8GB RAM):
1. **Phase 1:** Enable Ollama with Llama 3.2 3B as fallback
2. **Phase 2:** Add browser automation skills (requires 4GB+ RAM)
3. **Phase 3:** Enable multi-agent routing with specialized agents
4. **Phase 4:** Add MCP server integration for external tools
---
## 9. References
1. OpenClaw Official Documentation: https://docs.openclaw.ai
2. Ollama Integration Guide: https://docs.ollama.com/integrations/openclaw
3. OpenRouter Documentation: https://openrouter.ai/docs
4. MCP Specification: https://modelcontextprotocol.io
5. OpenClaw Community Discord: https://discord.gg/openclaw
6. GitHub Repository: https://github.com/openclaw/openclaw
---
## 10. Appendix: Quick Command Reference
```bash
# Installation
curl -fsSL https://openclaw.ai/install.sh | bash
# Configuration
openclaw onboard # Interactive setup
openclaw configure # Edit config
openclaw config set <key> <value> # Set specific value
# Gateway management
openclaw gateway # Start gateway
openclaw gateway --verbose # Start with logs
openclaw gateway status # Check status
openclaw gateway restart # Restart gateway
openclaw gateway stop # Stop gateway
# Model management
openclaw models list # List available models
openclaw models set <model> # Set default model
openclaw models status # Check model status
# Diagnostics
openclaw doctor # System health check
openclaw doctor --repair # Auto-fix issues
openclaw security audit # Security check
# Dashboard
openclaw dashboard # Open web UI
```
---
*End of Research Report*

View File

@@ -32,6 +32,7 @@ from dashboard.routes.briefing import router as briefing_router
from dashboard.routes.calm import router as calm_router
from dashboard.routes.chat_api import router as chat_api_router
from dashboard.routes.chat_api_v1 import router as chat_api_v1_router
from dashboard.routes.daily_run import router as daily_run_router
from dashboard.routes.db_explorer import router as db_explorer_router
from dashboard.routes.discord import router as discord_router
from dashboard.routes.experiments import router as experiments_router
@@ -42,6 +43,7 @@ from dashboard.routes.memory import router as memory_router
from dashboard.routes.mobile import router as mobile_router
from dashboard.routes.models import api_router as models_api_router
from dashboard.routes.models import router as models_router
from dashboard.routes.quests import router as quests_router
from dashboard.routes.spark import router as spark_router
from dashboard.routes.system import router as system_router
from dashboard.routes.tasks import router as tasks_router
@@ -625,6 +627,8 @@ app.include_router(db_explorer_router)
app.include_router(world_router)
app.include_router(matrix_router)
app.include_router(tower_router)
app.include_router(daily_run_router)
app.include_router(quests_router)
@app.websocket("/ws")

View File

@@ -0,0 +1,435 @@
"""Daily Run metrics routes — dashboard card for triage and session metrics."""
from __future__ import annotations
import json
import logging
import os
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from pathlib import Path
from urllib.error import HTTPError, URLError
from urllib.request import Request as UrlRequest
from urllib.request import urlopen
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse, JSONResponse
from config import settings
from dashboard.templating import templates
logger = logging.getLogger(__name__)
router = APIRouter(tags=["daily-run"])
REPO_ROOT = Path(settings.repo_root)
CONFIG_PATH = REPO_ROOT / "timmy_automations" / "config" / "daily_run.json"
DEFAULT_CONFIG = {
"gitea_api": "http://localhost:3000/api/v1",
"repo_slug": "rockachopa/Timmy-time-dashboard",
"token_file": "~/.hermes/gitea_token",
"layer_labels_prefix": "layer:",
}
LAYER_LABELS = ["layer:triage", "layer:micro-fix", "layer:tests", "layer:economy"]
def _load_config() -> dict:
"""Load configuration from config file with fallback to defaults."""
config = DEFAULT_CONFIG.copy()
if CONFIG_PATH.exists():
try:
file_config = json.loads(CONFIG_PATH.read_text())
if "orchestrator" in file_config:
config.update(file_config["orchestrator"])
except (json.JSONDecodeError, OSError) as exc:
logger.debug("Could not load daily_run config: %s", exc)
# Environment variable overrides
if os.environ.get("TIMMY_GITEA_API"):
config["gitea_api"] = os.environ.get("TIMMY_GITEA_API")
if os.environ.get("TIMMY_REPO_SLUG"):
config["repo_slug"] = os.environ.get("TIMMY_REPO_SLUG")
if os.environ.get("TIMMY_GITEA_TOKEN"):
config["token"] = os.environ.get("TIMMY_GITEA_TOKEN")
return config
def _get_token(config: dict) -> str | None:
"""Get Gitea token from environment or file."""
if "token" in config:
return config["token"]
token_file = Path(config["token_file"]).expanduser()
if token_file.exists():
return token_file.read_text().strip()
return None
class GiteaClient:
"""Simple Gitea API client with graceful degradation."""
def __init__(self, config: dict, token: str | None):
self.api_base = config["gitea_api"].rstrip("/")
self.repo_slug = config["repo_slug"]
self.token = token
self._available: bool | None = None
def _headers(self) -> dict:
headers = {"Accept": "application/json"}
if self.token:
headers["Authorization"] = f"token {self.token}"
return headers
def _api_url(self, path: str) -> str:
return f"{self.api_base}/repos/{self.repo_slug}/{path}"
def is_available(self) -> bool:
"""Check if Gitea API is reachable."""
if self._available is not None:
return self._available
try:
req = UrlRequest(
f"{self.api_base}/version",
headers=self._headers(),
method="GET",
)
with urlopen(req, timeout=5) as resp:
self._available = resp.status == 200
return self._available
except (HTTPError, URLError, TimeoutError):
self._available = False
return False
def get_paginated(self, path: str, params: dict | None = None) -> list:
"""Fetch all pages of a paginated endpoint."""
all_items = []
page = 1
limit = 50
while True:
url = self._api_url(path)
query_parts = [f"limit={limit}", f"page={page}"]
if params:
for key, val in params.items():
query_parts.append(f"{key}={val}")
url = f"{url}?{'&'.join(query_parts)}"
req = UrlRequest(url, headers=self._headers(), method="GET")
with urlopen(req, timeout=15) as resp:
batch = json.loads(resp.read())
if not batch:
break
all_items.extend(batch)
if len(batch) < limit:
break
page += 1
return all_items
@dataclass
class LayerMetrics:
"""Metrics for a single layer."""
name: str
label: str
current_count: int
previous_count: int
@property
def trend(self) -> str:
"""Return trend indicator."""
if self.previous_count == 0:
return "" if self.current_count == 0 else ""
diff = self.current_count - self.previous_count
pct = (diff / self.previous_count) * 100
if pct > 20:
return "↑↑"
elif pct > 5:
return ""
elif pct < -20:
return "↓↓"
elif pct < -5:
return ""
return ""
@property
def trend_color(self) -> str:
"""Return color for trend (CSS variable name)."""
trend = self.trend
if trend in ("↑↑", ""):
return "var(--green)" # More work = positive
elif trend in ("↓↓", ""):
return "var(--amber)" # Less work = caution
return "var(--text-dim)"
@dataclass
class DailyRunMetrics:
"""Complete Daily Run metrics."""
sessions_completed: int
sessions_previous: int
layers: list[LayerMetrics]
total_touched_current: int
total_touched_previous: int
lookback_days: int
generated_at: str
@property
def sessions_trend(self) -> str:
"""Return sessions trend indicator."""
if self.sessions_previous == 0:
return "" if self.sessions_completed == 0 else ""
diff = self.sessions_completed - self.sessions_previous
pct = (diff / self.sessions_previous) * 100
if pct > 20:
return "↑↑"
elif pct > 5:
return ""
elif pct < -20:
return "↓↓"
elif pct < -5:
return ""
return ""
@property
def sessions_trend_color(self) -> str:
"""Return color for sessions trend."""
trend = self.sessions_trend
if trend in ("↑↑", ""):
return "var(--green)"
elif trend in ("↓↓", ""):
return "var(--amber)"
return "var(--text-dim)"
def _extract_layer(labels: list[dict]) -> str | None:
"""Extract layer label from issue labels."""
for label in labels:
name = label.get("name", "")
if name.startswith("layer:"):
return name.replace("layer:", "")
return None
def _load_cycle_data(days: int = 14) -> dict:
"""Load cycle retrospective data for session counting."""
retro_file = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
if not retro_file.exists():
return {"current": 0, "previous": 0}
try:
entries = []
for line in retro_file.read_text().strip().splitlines():
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
now = datetime.now(UTC)
current_cutoff = now - timedelta(days=days)
previous_cutoff = now - timedelta(days=days * 2)
current_count = 0
previous_count = 0
for entry in entries:
ts_str = entry.get("timestamp", "")
if not ts_str:
continue
try:
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
if ts >= current_cutoff:
if entry.get("success", False):
current_count += 1
elif ts >= previous_cutoff:
if entry.get("success", False):
previous_count += 1
except (ValueError, TypeError):
continue
return {"current": current_count, "previous": previous_count}
except (OSError, ValueError) as exc:
logger.debug("Failed to load cycle data: %s", exc)
return {"current": 0, "previous": 0}
def _fetch_layer_metrics(
client: GiteaClient, lookback_days: int = 7
) -> tuple[list[LayerMetrics], int, int]:
"""Fetch metrics for each layer from Gitea issues."""
now = datetime.now(UTC)
current_cutoff = now - timedelta(days=lookback_days)
previous_cutoff = now - timedelta(days=lookback_days * 2)
layers = []
total_current = 0
total_previous = 0
for layer_label in LAYER_LABELS:
layer_name = layer_label.replace("layer:", "")
try:
# Fetch all issues with this layer label (both open and closed)
issues = client.get_paginated(
"issues",
{"state": "all", "labels": layer_label, "limit": 100},
)
current_count = 0
previous_count = 0
for issue in issues:
updated_at = issue.get("updated_at", "")
if not updated_at:
continue
try:
updated = datetime.fromisoformat(updated_at.replace("Z", "+00:00"))
if updated >= current_cutoff:
current_count += 1
elif updated >= previous_cutoff:
previous_count += 1
except (ValueError, TypeError):
continue
layers.append(
LayerMetrics(
name=layer_name,
label=layer_label,
current_count=current_count,
previous_count=previous_count,
)
)
total_current += current_count
total_previous += previous_count
except (HTTPError, URLError) as exc:
logger.debug("Failed to fetch issues for %s: %s", layer_label, exc)
layers.append(
LayerMetrics(
name=layer_name,
label=layer_label,
current_count=0,
previous_count=0,
)
)
return layers, total_current, total_previous
def _get_metrics(lookback_days: int = 7) -> DailyRunMetrics | None:
"""Get Daily Run metrics from Gitea API."""
config = _load_config()
token = _get_token(config)
client = GiteaClient(config, token)
if not client.is_available():
logger.debug("Gitea API not available for Daily Run metrics")
return None
try:
# Get layer metrics from issues
layers, total_current, total_previous = _fetch_layer_metrics(client, lookback_days)
# Get session data from cycle retrospectives
cycle_data = _load_cycle_data(days=lookback_days)
return DailyRunMetrics(
sessions_completed=cycle_data["current"],
sessions_previous=cycle_data["previous"],
layers=layers,
total_touched_current=total_current,
total_touched_previous=total_previous,
lookback_days=lookback_days,
generated_at=datetime.now(UTC).isoformat(),
)
except Exception as exc:
logger.debug("Error fetching Daily Run metrics: %s", exc)
return None
@router.get("/daily-run/metrics", response_class=JSONResponse)
async def daily_run_metrics_api(lookback_days: int = 7):
"""Return Daily Run metrics as JSON API."""
metrics = _get_metrics(lookback_days)
if not metrics:
return JSONResponse(
{"error": "Gitea API unavailable", "status": "unavailable"},
status_code=503,
)
# Check for quest completions based on Daily Run metrics
quest_rewards = []
try:
from dashboard.routes.quests import check_daily_run_quests
quest_rewards = await check_daily_run_quests(agent_id="system")
except Exception as exc:
logger.debug("Quest checking failed: %s", exc)
return JSONResponse(
{
"status": "ok",
"lookback_days": metrics.lookback_days,
"sessions": {
"completed": metrics.sessions_completed,
"previous": metrics.sessions_previous,
"trend": metrics.sessions_trend,
},
"layers": [
{
"name": layer.name,
"label": layer.label,
"current": layer.current_count,
"previous": layer.previous_count,
"trend": layer.trend,
}
for layer in metrics.layers
],
"totals": {
"current": metrics.total_touched_current,
"previous": metrics.total_touched_previous,
},
"generated_at": metrics.generated_at,
"quest_rewards": quest_rewards,
}
)
@router.get("/daily-run/panel", response_class=HTMLResponse)
async def daily_run_panel(request: Request, lookback_days: int = 7):
"""Return Daily Run metrics panel HTML for HTMX polling."""
metrics = _get_metrics(lookback_days)
# Build Gitea URLs for filtered issue lists
config = _load_config()
repo_slug = config.get("repo_slug", "rockachopa/Timmy-time-dashboard")
gitea_base = config.get("gitea_api", "http://localhost:3000/api/v1").replace("/api/v1", "")
# Logbook URL (link to issues with any layer label)
layer_labels = ",".join(LAYER_LABELS)
logbook_url = f"{gitea_base}/{repo_slug}/issues?labels={layer_labels}&state=all"
# Layer-specific URLs
layer_urls = {
layer: f"{gitea_base}/{repo_slug}/issues?labels=layer:{layer}&state=all"
for layer in ["triage", "micro-fix", "tests", "economy"]
}
return templates.TemplateResponse(
request,
"partials/daily_run_panel.html",
{
"metrics": metrics,
"logbook_url": logbook_url,
"layer_urls": layer_urls,
"gitea_available": metrics is not None,
},
)

View File

@@ -0,0 +1,447 @@
"""Quest system routes for agent token rewards.
Provides API endpoints for:
- Listing quests and their status
- Claiming quest rewards
- Getting quest leaderboard
- Quest progress tracking
"""
from __future__ import annotations
import logging
from typing import Any
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse, JSONResponse
from pydantic import BaseModel
from dashboard.templating import templates
from timmy.quest_system import (
QuestStatus,
auto_evaluate_all_quests,
claim_quest_reward,
evaluate_quest_progress,
get_active_quests,
get_agent_quests_status,
get_quest_definition,
get_quest_leaderboard,
load_quest_config,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/quests", tags=["quests"])
class ClaimQuestRequest(BaseModel):
"""Request to claim a quest reward."""
agent_id: str
quest_id: str
class EvaluateQuestRequest(BaseModel):
"""Request to manually evaluate quest progress."""
agent_id: str
quest_id: str
# ---------------------------------------------------------------------------
# API Endpoints
# ---------------------------------------------------------------------------
@router.get("/api/definitions")
async def get_quest_definitions_api() -> JSONResponse:
"""Get all quest definitions.
Returns:
JSON list of all quest definitions with their criteria.
"""
definitions = get_active_quests()
return JSONResponse(
{
"quests": [
{
"id": q.id,
"name": q.name,
"description": q.description,
"reward_tokens": q.reward_tokens,
"type": q.quest_type.value,
"repeatable": q.repeatable,
"cooldown_hours": q.cooldown_hours,
"criteria": q.criteria,
}
for q in definitions
]
}
)
@router.get("/api/status/{agent_id}")
async def get_agent_quest_status(agent_id: str) -> JSONResponse:
"""Get quest status for a specific agent.
Returns:
Complete quest status including progress, completion counts,
and tokens earned.
"""
status = get_agent_quests_status(agent_id)
return JSONResponse(status)
@router.post("/api/claim")
async def claim_quest_reward_api(request: ClaimQuestRequest) -> JSONResponse:
"""Claim a quest reward for an agent.
The quest must be completed but not yet claimed.
"""
reward = claim_quest_reward(request.quest_id, request.agent_id)
if not reward:
return JSONResponse(
{
"success": False,
"error": "Quest not completed, already claimed, or on cooldown",
},
status_code=400,
)
return JSONResponse(
{
"success": True,
"reward": reward,
}
)
@router.post("/api/evaluate")
async def evaluate_quest_api(request: EvaluateQuestRequest) -> JSONResponse:
"""Manually evaluate quest progress with provided context.
This is useful for testing or when the quest completion
needs to be triggered manually.
"""
quest = get_quest_definition(request.quest_id)
if not quest:
return JSONResponse(
{"success": False, "error": "Quest not found"},
status_code=404,
)
# Build evaluation context based on quest type
context = await _build_evaluation_context(quest)
progress = evaluate_quest_progress(request.quest_id, request.agent_id, context)
if not progress:
return JSONResponse(
{"success": False, "error": "Failed to evaluate quest"},
status_code=500,
)
# Auto-claim if completed
reward = None
if progress.status == QuestStatus.COMPLETED:
reward = claim_quest_reward(request.quest_id, request.agent_id)
return JSONResponse(
{
"success": True,
"progress": progress.to_dict(),
"reward": reward,
"completed": progress.status == QuestStatus.COMPLETED,
}
)
@router.get("/api/leaderboard")
async def get_leaderboard_api() -> JSONResponse:
"""Get the quest completion leaderboard.
Returns agents sorted by total tokens earned.
"""
leaderboard = get_quest_leaderboard()
return JSONResponse(
{
"leaderboard": leaderboard,
}
)
@router.post("/api/reload")
async def reload_quest_config_api() -> JSONResponse:
"""Reload quest configuration from quests.yaml.
Useful for applying quest changes without restarting.
"""
definitions, quest_settings = load_quest_config()
return JSONResponse(
{
"success": True,
"quests_loaded": len(definitions),
"settings": quest_settings,
}
)
# ---------------------------------------------------------------------------
# Stress Mode Endpoints
# ---------------------------------------------------------------------------
@router.get("/api/stress")
async def get_stress_status_api() -> JSONResponse:
"""Get current stress mode status and multipliers.
Returns:
Current stress mode, score, active signals, and multipliers
"""
try:
from timmy.stress_detector import (
detect_stress_mode,
get_stress_summary,
)
snapshot = detect_stress_mode()
summary = get_stress_summary()
return JSONResponse(
{
"status": "ok",
"stress": summary,
"raw": snapshot.to_dict(),
}
)
except Exception as exc:
logger.warning("Failed to get stress status: %s", exc)
return JSONResponse(
{
"status": "error",
"error": str(exc),
},
status_code=500,
)
@router.post("/api/stress/refresh")
async def refresh_stress_detection_api() -> JSONResponse:
"""Force a fresh stress detection check.
Normally stress is cached for 60 seconds. This endpoint
bypasses the cache for immediate results.
"""
try:
from timmy.stress_detector import detect_stress_mode, get_stress_summary
snapshot = detect_stress_mode(force_refresh=True)
summary = get_stress_summary()
return JSONResponse(
{
"status": "ok",
"stress": summary,
"raw": snapshot.to_dict(),
}
)
except Exception as exc:
logger.warning("Failed to refresh stress detection: %s", exc)
return JSONResponse(
{
"status": "error",
"error": str(exc),
},
status_code=500,
)
# ---------------------------------------------------------------------------
# Dashboard UI Endpoints
# ---------------------------------------------------------------------------
@router.get("", response_class=HTMLResponse)
async def quests_dashboard(request: Request) -> HTMLResponse:
"""Main quests dashboard page."""
return templates.TemplateResponse(
request,
"quests.html",
{"agent_id": "current_user"},
)
@router.get("/panel/{agent_id}", response_class=HTMLResponse)
async def quests_panel(request: Request, agent_id: str) -> HTMLResponse:
"""Quest panel for HTMX partial updates."""
status = get_agent_quests_status(agent_id)
return templates.TemplateResponse(
request,
"partials/quests_panel.html",
{
"agent_id": agent_id,
"quests": status["quests"],
"total_tokens": status["total_tokens_earned"],
"completed_count": status["total_quests_completed"],
},
)
# ---------------------------------------------------------------------------
# Internal Functions
# ---------------------------------------------------------------------------
async def _build_evaluation_context(quest) -> dict[str, Any]:
"""Build evaluation context for a quest based on its type."""
context: dict[str, Any] = {}
if quest.quest_type.value == "issue_count":
# Fetch closed issues with relevant labels
context["closed_issues"] = await _fetch_closed_issues(
quest.criteria.get("issue_labels", [])
)
elif quest.quest_type.value == "issue_reduce":
# Fetch current and previous issue counts
labels = quest.criteria.get("issue_labels", [])
context["current_issue_count"] = await _fetch_open_issue_count(labels)
context["previous_issue_count"] = await _fetch_previous_issue_count(
labels, quest.criteria.get("lookback_days", 7)
)
elif quest.quest_type.value == "daily_run":
# Fetch Daily Run metrics
metrics = await _fetch_daily_run_metrics()
context["sessions_completed"] = metrics.get("sessions_completed", 0)
return context
async def _fetch_closed_issues(labels: list[str]) -> list[dict]:
"""Fetch closed issues matching the given labels."""
try:
from dashboard.routes.daily_run import GiteaClient, _load_config
config = _load_config()
token = _get_gitea_token(config)
client = GiteaClient(config, token)
if not client.is_available():
return []
# Build label filter
label_filter = ",".join(labels) if labels else ""
issues = client.get_paginated(
"issues",
{"state": "closed", "labels": label_filter, "limit": 100},
)
return issues
except Exception as exc:
logger.debug("Failed to fetch closed issues: %s", exc)
return []
async def _fetch_open_issue_count(labels: list[str]) -> int:
"""Fetch count of open issues with given labels."""
try:
from dashboard.routes.daily_run import GiteaClient, _load_config
config = _load_config()
token = _get_gitea_token(config)
client = GiteaClient(config, token)
if not client.is_available():
return 0
label_filter = ",".join(labels) if labels else ""
issues = client.get_paginated(
"issues",
{"state": "open", "labels": label_filter, "limit": 100},
)
return len(issues)
except Exception as exc:
logger.debug("Failed to fetch open issue count: %s", exc)
return 0
async def _fetch_previous_issue_count(labels: list[str], lookback_days: int) -> int:
"""Fetch previous issue count (simplified - uses current for now)."""
# This is a simplified implementation
# In production, you'd query historical data
return await _fetch_open_issue_count(labels)
async def _fetch_daily_run_metrics() -> dict[str, Any]:
"""Fetch Daily Run metrics."""
try:
from dashboard.routes.daily_run import _get_metrics
metrics = _get_metrics(lookback_days=7)
if metrics:
return {
"sessions_completed": metrics.sessions_completed,
"sessions_previous": metrics.sessions_previous,
}
except Exception as exc:
logger.debug("Failed to fetch Daily Run metrics: %s", exc)
return {"sessions_completed": 0, "sessions_previous": 0}
def _get_gitea_token(config: dict) -> str | None:
"""Get Gitea token from config."""
if "token" in config:
return config["token"]
from pathlib import Path
token_file = Path(config.get("token_file", "~/.hermes/gitea_token")).expanduser()
if token_file.exists():
return token_file.read_text().strip()
return None
# ---------------------------------------------------------------------------
# Daily Run Integration
# ---------------------------------------------------------------------------
async def check_daily_run_quests(agent_id: str = "system") -> list[dict]:
"""Check and award Daily Run related quests.
Called by the Daily Run system when metrics are updated.
Returns:
List of rewards awarded
"""
# Check if auto-detect is enabled
_, quest_settings = load_quest_config()
if not quest_settings.get("auto_detect_on_daily_run", True):
return []
# Build context from Daily Run metrics
metrics = await _fetch_daily_run_metrics()
context = {
"sessions_completed": metrics.get("sessions_completed", 0),
"sessions_previous": metrics.get("sessions_previous", 0),
}
# Add closed issues for issue_count quests
active_quests = get_active_quests()
for quest in active_quests:
if quest.quest_type.value == "issue_count":
labels = quest.criteria.get("issue_labels", [])
context["closed_issues"] = await _fetch_closed_issues(labels)
break # Only need to fetch once
# Evaluate all quests
rewards = auto_evaluate_all_quests(agent_id, context)
return rewards

View File

@@ -21,6 +21,11 @@
</div>
{% endcall %}
<!-- Daily Run Metrics (HTMX polled) -->
{% call panel("DAILY RUN", hx_get="/daily-run/panel", hx_trigger="every 60s") %}
<div class="mc-loading-placeholder">LOADING...</div>
{% endcall %}
</div>
<!-- Main panel — swappable via HTMX; defaults to Timmy on load -->

View File

@@ -0,0 +1,54 @@
<div class="card-header mc-panel-header">// DAILY RUN METRICS</div>
<div class="card-body p-3">
{% if not gitea_available %}
<div class="mc-muted" style="font-size: 0.85rem; padding: 8px 0;">
<span style="color: var(--amber);"></span> Gitea API unavailable
</div>
{% else %}
{% set m = metrics %}
<!-- Sessions summary -->
<div class="dr-section" style="margin-bottom: 16px;">
<div class="dr-row" style="display: flex; justify-content: space-between; align-items: center; margin-bottom: 8px;">
<span class="dr-label" style="font-size: 0.85rem; color: var(--text-dim);">Sessions ({{ m.lookback_days }}d)</span>
<a href="{{ logbook_url }}" target="_blank" class="dr-link" style="font-size: 0.75rem; color: var(--green); text-decoration: none;">
Logbook →
</a>
</div>
<div class="dr-stat" style="display: flex; align-items: baseline; gap: 8px;">
<span class="dr-value" style="font-size: 1.5rem; font-weight: 600; color: var(--text-bright);">{{ m.sessions_completed }}</span>
<span class="dr-trend" style="font-size: 0.9rem; color: {{ m.sessions_trend_color }};">{{ m.sessions_trend }}</span>
<span class="dr-prev" style="font-size: 0.75rem; color: var(--text-dim);">vs {{ m.sessions_previous }} prev</span>
</div>
</div>
<!-- Layer breakdown -->
<div class="dr-section">
<div class="dr-label" style="font-size: 0.85rem; color: var(--text-dim); margin-bottom: 8px;">Issues by Layer</div>
<div class="dr-layers" style="display: flex; flex-direction: column; gap: 6px;">
{% for layer in m.layers %}
<div class="dr-layer-row" style="display: flex; justify-content: space-between; align-items: center;">
<a href="{{ layer_urls[layer.name] }}" target="_blank" class="dr-layer-name" style="font-size: 0.8rem; color: var(--text); text-decoration: none; text-transform: capitalize;">
{{ layer.name.replace('-', ' ') }}
</a>
<div class="dr-layer-stat" style="display: flex; align-items: center; gap: 6px;">
<span class="dr-layer-value" style="font-size: 0.9rem; font-weight: 500; color: var(--text-bright);">{{ layer.current_count }}</span>
<span class="dr-layer-trend" style="font-size: 0.75rem; color: {{ layer.trend_color }}; width: 18px; text-align: center;">{{ layer.trend }}</span>
</div>
</div>
{% endfor %}
</div>
</div>
<!-- Total touched -->
<div class="dr-section" style="margin-top: 12px; padding-top: 12px; border-top: 1px solid var(--border);">
<div class="dr-row" style="display: flex; justify-content: space-between; align-items: center;">
<span class="dr-label" style="font-size: 0.8rem; color: var(--text-dim);">Total Issues Touched</span>
<div class="dr-total-stat" style="display: flex; align-items: center; gap: 6px;">
<span class="dr-total-value" style="font-size: 1rem; font-weight: 600; color: var(--text-bright);">{{ m.total_touched_current }}</span>
<span class="dr-total-prev" style="font-size: 0.7rem; color: var(--text-dim);">/ {{ m.total_touched_previous }} prev</span>
</div>
</div>
</div>
{% endif %}
</div>

View File

@@ -0,0 +1,80 @@
{% from "macros.html" import panel %}
<div class="quests-summary mb-4">
<div class="row">
<div class="col-md-4">
<div class="stat-card">
<div class="stat-value">{{ total_tokens }}</div>
<div class="stat-label">Tokens Earned</div>
</div>
</div>
<div class="col-md-4">
<div class="stat-card">
<div class="stat-value">{{ completed_count }}</div>
<div class="stat-label">Quests Completed</div>
</div>
</div>
<div class="col-md-4">
<div class="stat-card">
<div class="stat-value">{{ quests|selectattr('enabled', 'equalto', true)|list|length }}</div>
<div class="stat-label">Active Quests</div>
</div>
</div>
</div>
</div>
<div class="quests-list">
{% for quest in quests %}
{% if quest.enabled %}
<div class="quest-card quest-status-{{ quest.status }}">
<div class="quest-header">
<h5 class="quest-name">{{ quest.name }}</h5>
<span class="quest-reward">+{{ quest.reward_tokens }} ⚡</span>
</div>
<p class="quest-description">{{ quest.description }}</p>
<div class="quest-progress">
{% if quest.status == 'completed' %}
<div class="progress">
<div class="progress-bar bg-success" style="width: 100%"></div>
</div>
<span class="quest-status-badge completed">Completed</span>
{% elif quest.status == 'claimed' %}
<div class="progress">
<div class="progress-bar bg-success" style="width: 100%"></div>
</div>
<span class="quest-status-badge claimed">Reward Claimed</span>
{% elif quest.on_cooldown %}
<div class="progress">
<div class="progress-bar bg-secondary" style="width: 100%"></div>
</div>
<span class="quest-status-badge cooldown">
Cooldown: {{ quest.cooldown_hours_remaining }}h remaining
</span>
{% else %}
<div class="progress">
<div class="progress-bar" style="width: {{ (quest.current_value / quest.target_value * 100)|int }}%"></div>
</div>
<span class="quest-progress-text">{{ quest.current_value }} / {{ quest.target_value }}</span>
{% endif %}
</div>
<div class="quest-meta">
<span class="quest-type">{{ quest.type }}</span>
{% if quest.repeatable %}
<span class="quest-repeatable">↻ Repeatable</span>
{% endif %}
{% if quest.completion_count > 0 %}
<span class="quest-completions">Completed {{ quest.completion_count }} time{% if quest.completion_count != 1 %}s{% endif %}</span>
{% endif %}
</div>
</div>
{% endif %}
{% endfor %}
</div>
{% if not quests|selectattr('enabled', 'equalto', true)|list|length %}
<div class="alert alert-info">
No active quests available. Check back later or contact an administrator.
</div>
{% endif %}

View File

@@ -0,0 +1,50 @@
{% extends "base.html" %}
{% block title %}Quests — Mission Control{% endblock %}
{% block content %}
<div class="container-fluid">
<div class="row">
<div class="col-12">
<h1 class="mc-title">Token Quests</h1>
<p class="mc-subtitle">Complete quests to earn bonus tokens</p>
</div>
</div>
<div class="row mt-4">
<div class="col-md-8">
<div id="quests-panel" hx-get="/quests/panel/{{ agent_id }}" hx-trigger="load, every 30s">
<div class="mc-loading">Loading quests...</div>
</div>
</div>
<div class="col-md-4">
<div class="card mc-panel">
<div class="card-header">
<h5 class="mb-0">Leaderboard</h5>
</div>
<div class="card-body">
<div id="leaderboard" hx-get="/quests/api/leaderboard" hx-trigger="load, every 60s">
<div class="mc-loading">Loading leaderboard...</div>
</div>
</div>
</div>
<div class="card mc-panel mt-4">
<div class="card-header">
<h5 class="mb-0">About Quests</h5>
</div>
<div class="card-body">
<p class="mb-2">Quests are special objectives that reward tokens upon completion.</p>
<ul class="mc-list mb-0">
<li>Complete Daily Run sessions</li>
<li>Close flaky-test issues</li>
<li>Reduce P1 issue backlog</li>
<li>Improve documentation</li>
</ul>
</div>
</div>
</div>
</div>
</div>
{% endblock %}

632
src/timmy/quest_system.py Normal file
View File

@@ -0,0 +1,632 @@
"""Token Quest System for agent rewards.
Provides quest definitions, progress tracking, completion detection,
and token awards for agent accomplishments.
Quests are defined in config/quests.yaml and loaded at runtime.
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from enum import StrEnum
from pathlib import Path
from typing import Any
import yaml
from config import settings
logger = logging.getLogger(__name__)
# Path to quest configuration
QUEST_CONFIG_PATH = Path(settings.repo_root) / "config" / "quests.yaml"
class QuestType(StrEnum):
"""Types of quests supported by the system."""
ISSUE_COUNT = "issue_count"
ISSUE_REDUCE = "issue_reduce"
DOCS_UPDATE = "docs_update"
TEST_IMPROVE = "test_improve"
DAILY_RUN = "daily_run"
CUSTOM = "custom"
class QuestStatus(StrEnum):
"""Status of a quest for an agent."""
NOT_STARTED = "not_started"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
CLAIMED = "claimed"
EXPIRED = "expired"
@dataclass
class QuestDefinition:
"""Definition of a quest from configuration."""
id: str
name: str
description: str
reward_tokens: int
quest_type: QuestType
enabled: bool
repeatable: bool
cooldown_hours: int
criteria: dict[str, Any]
notification_message: str
@classmethod
def from_dict(cls, data: dict[str, Any]) -> QuestDefinition:
"""Create a QuestDefinition from a dictionary."""
return cls(
id=data["id"],
name=data.get("name", "Unnamed Quest"),
description=data.get("description", ""),
reward_tokens=data.get("reward_tokens", 0),
quest_type=QuestType(data.get("type", "custom")),
enabled=data.get("enabled", True),
repeatable=data.get("repeatable", False),
cooldown_hours=data.get("cooldown_hours", 0),
criteria=data.get("criteria", {}),
notification_message=data.get(
"notification_message", "Quest Complete! You earned {tokens} tokens."
),
)
@dataclass
class QuestProgress:
"""Progress of a quest for a specific agent."""
quest_id: str
agent_id: str
status: QuestStatus
current_value: int = 0
target_value: int = 0
started_at: str = ""
completed_at: str = ""
claimed_at: str = ""
completion_count: int = 0
last_completed_at: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"quest_id": self.quest_id,
"agent_id": self.agent_id,
"status": self.status.value,
"current_value": self.current_value,
"target_value": self.target_value,
"started_at": self.started_at,
"completed_at": self.completed_at,
"claimed_at": self.claimed_at,
"completion_count": self.completion_count,
"last_completed_at": self.last_completed_at,
"metadata": self.metadata,
}
# In-memory storage for quest progress
_quest_progress: dict[str, QuestProgress] = {}
_quest_definitions: dict[str, QuestDefinition] = {}
_quest_settings: dict[str, Any] = {}
def _get_progress_key(quest_id: str, agent_id: str) -> str:
"""Generate a unique key for quest progress."""
return f"{agent_id}:{quest_id}"
def load_quest_config() -> tuple[dict[str, QuestDefinition], dict[str, Any]]:
"""Load quest definitions from quests.yaml.
Returns:
Tuple of (quest definitions dict, settings dict)
"""
global _quest_definitions, _quest_settings
if not QUEST_CONFIG_PATH.exists():
logger.warning("Quest config not found at %s", QUEST_CONFIG_PATH)
return {}, {}
try:
raw = QUEST_CONFIG_PATH.read_text()
config = yaml.safe_load(raw)
if not isinstance(config, dict):
logger.warning("Invalid quest config format")
return {}, {}
# Load quest definitions
quests_data = config.get("quests", {})
definitions = {}
for quest_id, quest_data in quests_data.items():
quest_data["id"] = quest_id
try:
definition = QuestDefinition.from_dict(quest_data)
definitions[quest_id] = definition
except (ValueError, KeyError) as exc:
logger.warning("Failed to load quest %s: %s", quest_id, exc)
# Load settings
_quest_settings = config.get("settings", {})
_quest_definitions = definitions
logger.debug("Loaded %d quest definitions", len(definitions))
return definitions, _quest_settings
except (OSError, yaml.YAMLError) as exc:
logger.warning("Failed to load quest config: %s", exc)
return {}, {}
def get_quest_definitions() -> dict[str, QuestDefinition]:
"""Get all quest definitions, loading if necessary."""
global _quest_definitions
if not _quest_definitions:
_quest_definitions, _ = load_quest_config()
return _quest_definitions
def get_quest_definition(quest_id: str) -> QuestDefinition | None:
"""Get a specific quest definition by ID."""
definitions = get_quest_definitions()
return definitions.get(quest_id)
def get_active_quests() -> list[QuestDefinition]:
"""Get all enabled quest definitions."""
definitions = get_quest_definitions()
return [q for q in definitions.values() if q.enabled]
def get_quest_progress(quest_id: str, agent_id: str) -> QuestProgress | None:
"""Get progress for a specific quest and agent."""
key = _get_progress_key(quest_id, agent_id)
return _quest_progress.get(key)
def get_or_create_progress(quest_id: str, agent_id: str) -> QuestProgress:
"""Get existing progress or create new for quest/agent."""
key = _get_progress_key(quest_id, agent_id)
if key not in _quest_progress:
quest = get_quest_definition(quest_id)
if not quest:
raise ValueError(f"Quest {quest_id} not found")
target = _get_target_value(quest)
_quest_progress[key] = QuestProgress(
quest_id=quest_id,
agent_id=agent_id,
status=QuestStatus.NOT_STARTED,
current_value=0,
target_value=target,
started_at=datetime.now(UTC).isoformat(),
)
return _quest_progress[key]
def _get_target_value(quest: QuestDefinition) -> int:
"""Extract target value from quest criteria."""
criteria = quest.criteria
if quest.quest_type == QuestType.ISSUE_COUNT:
return criteria.get("target_count", 1)
elif quest.quest_type == QuestType.ISSUE_REDUCE:
return criteria.get("target_reduction", 1)
elif quest.quest_type == QuestType.DAILY_RUN:
return criteria.get("min_sessions", 1)
elif quest.quest_type == QuestType.DOCS_UPDATE:
return criteria.get("min_files_changed", 1)
elif quest.quest_type == QuestType.TEST_IMPROVE:
return criteria.get("min_new_tests", 1)
return 1
def update_quest_progress(
quest_id: str,
agent_id: str,
current_value: int,
metadata: dict[str, Any] | None = None,
) -> QuestProgress:
"""Update progress for a quest."""
progress = get_or_create_progress(quest_id, agent_id)
progress.current_value = current_value
if metadata:
progress.metadata.update(metadata)
# Check if quest is now complete
if progress.current_value >= progress.target_value:
if progress.status not in (QuestStatus.COMPLETED, QuestStatus.CLAIMED):
progress.status = QuestStatus.COMPLETED
progress.completed_at = datetime.now(UTC).isoformat()
logger.info("Quest %s completed for agent %s", quest_id, agent_id)
return progress
def _is_on_cooldown(progress: QuestProgress, quest: QuestDefinition) -> bool:
"""Check if a repeatable quest is on cooldown."""
if not quest.repeatable or not progress.last_completed_at:
return False
if quest.cooldown_hours <= 0:
return False
try:
last_completed = datetime.fromisoformat(progress.last_completed_at)
cooldown_end = last_completed + timedelta(hours=quest.cooldown_hours)
return datetime.now(UTC) < cooldown_end
except (ValueError, TypeError):
return False
def _apply_stress_multiplier(base_reward: int, quest_type: QuestType) -> tuple[int, float]:
"""Apply stress-based multiplier to quest reward.
Returns:
Tuple of (adjusted_reward, multiplier_used)
"""
try:
from timmy.stress_detector import apply_multiplier
multiplier = apply_multiplier(base_reward, quest_type.value)
return multiplier, multiplier / max(base_reward, 1)
except Exception as exc:
logger.debug("Failed to apply stress multiplier: %s", exc)
return base_reward, 1.0
def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
"""Claim the token reward for a completed quest.
Returns:
Reward info dict if successful, None if not claimable
"""
progress = get_quest_progress(quest_id, agent_id)
if not progress:
return None
quest = get_quest_definition(quest_id)
if not quest:
return None
# Check if quest is completed but not yet claimed
if progress.status != QuestStatus.COMPLETED:
return None
# Check cooldown for repeatable quests
if _is_on_cooldown(progress, quest):
return None
try:
# Apply stress-based multiplier
adjusted_reward, multiplier = _apply_stress_multiplier(
quest.reward_tokens, quest.quest_type
)
# Award tokens via ledger
from lightning.ledger import create_invoice_entry, mark_settled
# Create a mock invoice for the reward
invoice_entry = create_invoice_entry(
payment_hash=f"quest_{quest_id}_{agent_id}_{int(time.time())}",
amount_sats=adjusted_reward,
memo=f"Quest reward: {quest.name}",
source="quest_reward",
agent_id=agent_id,
)
# Mark as settled immediately (quest rewards are auto-settled)
mark_settled(invoice_entry.payment_hash, preimage=f"quest_{quest_id}")
# Update progress
progress.status = QuestStatus.CLAIMED
progress.claimed_at = datetime.now(UTC).isoformat()
progress.completion_count += 1
progress.last_completed_at = progress.claimed_at
# Reset for repeatable quests
if quest.repeatable:
progress.status = QuestStatus.NOT_STARTED
progress.current_value = 0
progress.completed_at = ""
progress.claimed_at = ""
# Build notification with multiplier info
notification = quest.notification_message.format(tokens=adjusted_reward)
if multiplier != 1.0:
pct = int((multiplier - 1.0) * 100)
if pct > 0:
notification += f" (+{pct}% stress bonus)"
else:
notification += f" ({pct}% stress adjustment)"
return {
"quest_id": quest_id,
"agent_id": agent_id,
"tokens_awarded": adjusted_reward,
"base_reward": quest.reward_tokens,
"multiplier": round(multiplier, 2),
"notification": notification,
"completion_count": progress.completion_count,
}
except Exception as exc:
logger.error("Failed to award quest reward: %s", exc)
return None
def check_issue_count_quest(
quest: QuestDefinition,
agent_id: str,
closed_issues: list[dict],
) -> QuestProgress | None:
"""Check progress for issue_count type quest."""
criteria = quest.criteria
target_labels = set(criteria.get("issue_labels", []))
# target_count is available in criteria but not used directly here
# Count matching issues
matching_count = 0
for issue in closed_issues:
issue_labels = {label.get("name", "") for label in issue.get("labels", [])}
if target_labels.issubset(issue_labels) or (not target_labels and issue_labels):
matching_count += 1
progress = update_quest_progress(
quest.id, agent_id, matching_count, {"matching_issues": matching_count}
)
return progress
def check_issue_reduce_quest(
quest: QuestDefinition,
agent_id: str,
previous_count: int,
current_count: int,
) -> QuestProgress | None:
"""Check progress for issue_reduce type quest."""
# target_reduction available in quest.criteria but we track actual reduction
reduction = max(0, previous_count - current_count)
progress = update_quest_progress(quest.id, agent_id, reduction, {"reduction": reduction})
return progress
def check_daily_run_quest(
quest: QuestDefinition,
agent_id: str,
sessions_completed: int,
) -> QuestProgress | None:
"""Check progress for daily_run type quest."""
# min_sessions available in quest.criteria but we track actual sessions
progress = update_quest_progress(
quest.id, agent_id, sessions_completed, {"sessions": sessions_completed}
)
return progress
def evaluate_quest_progress(
quest_id: str,
agent_id: str,
context: dict[str, Any],
) -> QuestProgress | None:
"""Evaluate quest progress based on quest type and context.
Args:
quest_id: The quest to evaluate
agent_id: The agent to evaluate for
context: Context data for evaluation (issues, metrics, etc.)
Returns:
Updated QuestProgress or None if evaluation failed
"""
quest = get_quest_definition(quest_id)
if not quest or not quest.enabled:
return None
progress = get_quest_progress(quest_id, agent_id)
# Check cooldown for repeatable quests
if progress and _is_on_cooldown(progress, quest):
return progress
try:
if quest.quest_type == QuestType.ISSUE_COUNT:
closed_issues = context.get("closed_issues", [])
return check_issue_count_quest(quest, agent_id, closed_issues)
elif quest.quest_type == QuestType.ISSUE_REDUCE:
prev_count = context.get("previous_issue_count", 0)
curr_count = context.get("current_issue_count", 0)
return check_issue_reduce_quest(quest, agent_id, prev_count, curr_count)
elif quest.quest_type == QuestType.DAILY_RUN:
sessions = context.get("sessions_completed", 0)
return check_daily_run_quest(quest, agent_id, sessions)
elif quest.quest_type == QuestType.CUSTOM:
# Custom quests require manual completion
return progress
else:
logger.debug("Quest type %s not yet implemented", quest.quest_type)
return progress
except Exception as exc:
logger.warning("Quest evaluation failed for %s: %s", quest_id, exc)
return progress
def auto_evaluate_all_quests(agent_id: str, context: dict[str, Any]) -> list[dict]:
"""Evaluate all active quests for an agent and award rewards.
Returns:
List of reward info for newly completed quests
"""
rewards = []
active_quests = get_active_quests()
for quest in active_quests:
progress = evaluate_quest_progress(quest.id, agent_id, context)
if progress and progress.status == QuestStatus.COMPLETED:
# Auto-claim the reward
reward = claim_quest_reward(quest.id, agent_id)
if reward:
rewards.append(reward)
return rewards
def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
"""Get complete quest status for an agent."""
definitions = get_quest_definitions()
quests_status = []
total_rewards = 0
completed_count = 0
# Get current stress mode for adjusted rewards display
try:
from timmy.stress_detector import get_current_stress_mode, get_multiplier
current_mode = get_current_stress_mode()
except Exception:
current_mode = None
for quest_id, quest in definitions.items():
progress = get_quest_progress(quest_id, agent_id)
if not progress:
progress = get_or_create_progress(quest_id, agent_id)
is_on_cooldown = _is_on_cooldown(progress, quest) if quest.repeatable else False
# Calculate adjusted reward with stress multiplier
adjusted_reward = quest.reward_tokens
multiplier = 1.0
if current_mode:
try:
multiplier = get_multiplier(quest.quest_type.value, current_mode)
adjusted_reward = int(quest.reward_tokens * multiplier)
except Exception:
pass
quest_info = {
"quest_id": quest_id,
"name": quest.name,
"description": quest.description,
"reward_tokens": quest.reward_tokens,
"adjusted_reward": adjusted_reward,
"multiplier": round(multiplier, 2),
"type": quest.quest_type.value,
"enabled": quest.enabled,
"repeatable": quest.repeatable,
"status": progress.status.value,
"current_value": progress.current_value,
"target_value": progress.target_value,
"completion_count": progress.completion_count,
"on_cooldown": is_on_cooldown,
"cooldown_hours_remaining": 0,
}
if is_on_cooldown and progress.last_completed_at:
try:
last = datetime.fromisoformat(progress.last_completed_at)
cooldown_end = last + timedelta(hours=quest.cooldown_hours)
hours_remaining = (cooldown_end - datetime.now(UTC)).total_seconds() / 3600
quest_info["cooldown_hours_remaining"] = round(max(0, hours_remaining), 1)
except (ValueError, TypeError):
pass
quests_status.append(quest_info)
total_rewards += progress.completion_count * quest.reward_tokens
completed_count += progress.completion_count
return {
"agent_id": agent_id,
"quests": quests_status,
"total_tokens_earned": total_rewards,
"total_quests_completed": completed_count,
"active_quests_count": len([q for q in quests_status if q["enabled"]]),
"stress_mode": current_mode.value if current_mode else None,
}
def reset_quest_progress(quest_id: str | None = None, agent_id: str | None = None) -> int:
"""Reset quest progress. Useful for testing.
Args:
quest_id: Specific quest to reset, or None for all
agent_id: Specific agent to reset, or None for all
Returns:
Number of progress entries reset
"""
global _quest_progress
count = 0
keys_to_reset = []
for key, _progress in _quest_progress.items():
key_agent, key_quest = key.split(":", 1)
if (quest_id is None or key_quest == quest_id) and (
agent_id is None or key_agent == agent_id
):
keys_to_reset.append(key)
for key in keys_to_reset:
del _quest_progress[key]
count += 1
return count
def get_quest_leaderboard() -> list[dict[str, Any]]:
"""Get a leaderboard of agents by quest completion."""
agent_stats: dict[str, dict[str, Any]] = {}
for _key, progress in _quest_progress.items():
agent_id = progress.agent_id
if agent_id not in agent_stats:
agent_stats[agent_id] = {
"agent_id": agent_id,
"total_completions": 0,
"total_tokens": 0,
"quests_completed": set(),
}
quest = get_quest_definition(progress.quest_id)
if quest:
agent_stats[agent_id]["total_completions"] += progress.completion_count
agent_stats[agent_id]["total_tokens"] += progress.completion_count * quest.reward_tokens
if progress.completion_count > 0:
agent_stats[agent_id]["quests_completed"].add(quest.id)
leaderboard = []
for stats in agent_stats.values():
leaderboard.append(
{
"agent_id": stats["agent_id"],
"total_completions": stats["total_completions"],
"total_tokens": stats["total_tokens"],
"unique_quests_completed": len(stats["quests_completed"]),
}
)
# Sort by total tokens (descending)
leaderboard.sort(key=lambda x: x["total_tokens"], reverse=True)
return leaderboard
# Initialize on module load
load_quest_config()

View File

@@ -0,0 +1,565 @@
"""System stress detection for adaptive token rewards.
Monitors system signals like flakiness, backlog growth, and CI failures
to determine the current stress mode. Token rewards are then adjusted
based on the stress mode to incentivize agents to focus on critical areas.
"""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from enum import StrEnum
from pathlib import Path
from typing import Any
import yaml
from config import settings
logger = logging.getLogger(__name__)
# Path to stress mode configuration
STRESS_CONFIG_PATH = Path(settings.repo_root) / "config" / "stress_modes.yaml"
class StressMode(StrEnum):
"""System stress modes.
- CALM: Normal operations, incentivize exploration and refactoring
- ELEVATED: Some stress signals detected, balance incentives
- HIGH: Critical stress, strongly incentivize bug fixes and stabilization
"""
CALM = "calm"
ELEVATED = "elevated"
HIGH = "high"
@dataclass
class StressSignal:
"""A single stress signal reading."""
name: str
value: float
threshold: float
weight: float
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
@property
def is_triggered(self) -> bool:
"""Whether this signal exceeds its threshold."""
return self.value >= self.threshold
@property
def contribution(self) -> float:
"""Calculate this signal's contribution to stress score."""
if not self.is_triggered:
return 0.0
# Contribution is weighted ratio of value to threshold
return min(1.0, (self.value / max(self.threshold, 1.0))) * self.weight
@dataclass
class StressSnapshot:
"""Complete stress assessment at a point in time."""
mode: StressMode
score: float
signals: list[StressSignal]
multipliers: dict[str, float]
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"mode": self.mode.value,
"score": round(self.score, 3),
"signals": [
{
"name": s.name,
"value": s.value,
"threshold": s.threshold,
"triggered": s.is_triggered,
"contribution": round(s.contribution, 3),
}
for s in self.signals
],
"multipliers": self.multipliers,
"timestamp": self.timestamp,
}
@dataclass
class StressThresholds:
"""Thresholds for entering/exiting stress modes."""
elevated_min: float = 0.3
high_min: float = 0.6
def get_mode_for_score(self, score: float) -> StressMode:
"""Determine stress mode based on score."""
if score >= self.high_min:
return StressMode.HIGH
elif score >= self.elevated_min:
return StressMode.ELEVATED
return StressMode.CALM
# In-memory storage for stress state
_current_snapshot: StressSnapshot | None = None
_last_check_time: datetime | None = None
_config_cache: dict[str, Any] | None = None
_config_mtime: float = 0.0
def _load_stress_config() -> dict[str, Any]:
"""Load stress mode configuration from YAML.
Returns:
Configuration dictionary with default fallbacks
"""
global _config_cache, _config_mtime
# Check if config file has been modified
if STRESS_CONFIG_PATH.exists():
mtime = STRESS_CONFIG_PATH.stat().st_mtime
if mtime != _config_mtime or _config_cache is None:
try:
raw = STRESS_CONFIG_PATH.read_text()
_config_cache = yaml.safe_load(raw) or {}
_config_mtime = mtime
logger.debug("Loaded stress config from %s", STRESS_CONFIG_PATH)
except (OSError, yaml.YAMLError) as exc:
logger.warning("Failed to load stress config: %s", exc)
_config_cache = {}
if _config_cache is None:
_config_cache = {}
return _config_cache
def get_default_config() -> dict[str, Any]:
"""Get default stress configuration."""
return {
"thresholds": {
"elevated_min": 0.3,
"high_min": 0.6,
},
"signals": {
"flaky_test_rate": {
"threshold": 0.15, # 15% flaky test rate
"weight": 0.3,
"description": "Percentage of tests that are flaky",
},
"p1_backlog_growth": {
"threshold": 5, # 5 new P1 issues
"weight": 0.25,
"description": "Net growth in P1 priority issues",
},
"ci_failure_rate": {
"threshold": 0.2, # 20% CI failure rate
"weight": 0.25,
"description": "Percentage of CI runs failing",
},
"open_bug_count": {
"threshold": 20, # 20 open bugs
"weight": 0.2,
"description": "Total open issues labeled as bugs",
},
},
"multipliers": {
StressMode.CALM.value: {
"test_improve": 1.0,
"docs_update": 1.2, # Calm periods good for docs
"issue_count": 1.0,
"issue_reduce": 1.0,
"daily_run": 1.0,
"custom": 1.0,
"exploration": 1.3, # Encourage exploration
"refactor": 1.2, # Encourage refactoring
},
StressMode.ELEVATED.value: {
"test_improve": 1.2, # Start emphasizing tests
"docs_update": 1.0,
"issue_count": 1.1,
"issue_reduce": 1.1,
"daily_run": 1.0,
"custom": 1.0,
"exploration": 1.0,
"refactor": 0.9, # Discourage risky refactors
},
StressMode.HIGH.value: {
"test_improve": 1.5, # Strongly incentivize testing
"docs_update": 0.8, # Deprioritize docs
"issue_count": 1.3, # Reward closing issues
"issue_reduce": 1.4, # Strongly reward reducing backlog
"daily_run": 1.1,
"custom": 1.0,
"exploration": 0.7, # Discourage exploration
"refactor": 0.6, # Discourage refactors during crisis
},
},
}
def _get_config_value(key_path: str, default: Any = None) -> Any:
"""Get a value from config using dot notation path."""
config = _load_stress_config()
keys = key_path.split(".")
value = config
for key in keys:
if isinstance(value, dict):
value = value.get(key)
else:
return default
return value if value is not None else default
def _calculate_flaky_test_rate() -> float:
"""Calculate current flaky test rate from available data."""
try:
# Try to load from daily run metrics or test results
test_results_path = Path(settings.repo_root) / ".loop" / "test_results.jsonl"
if not test_results_path.exists():
return 0.0
# Count recent test runs and flaky results
now = datetime.now(UTC)
cutoff = now - timedelta(days=7)
total_runs = 0
flaky_runs = 0
if test_results_path.exists():
for line in test_results_path.read_text().strip().splitlines():
try:
entry = json.loads(line)
ts_str = entry.get("timestamp", "")
if not ts_str:
continue
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
if ts >= cutoff:
total_runs += 1
if entry.get("is_flaky", False):
flaky_runs += 1
except (json.JSONDecodeError, ValueError):
continue
return flaky_runs / max(total_runs, 1)
except Exception as exc:
logger.debug("Failed to calculate flaky test rate: %s", exc)
return 0.0
def _calculate_p1_backlog_growth() -> float:
"""Calculate P1 issue backlog growth."""
try:
from dashboard.routes.daily_run import GiteaClient, _load_config
config = _load_config()
token = config.get("token")
client = GiteaClient(config, token)
if not client.is_available():
return 0.0
# Get current P1 issues
now = datetime.now(UTC)
cutoff_current = now - timedelta(days=7)
cutoff_previous = now - timedelta(days=14)
issues = client.get_paginated("issues", {"state": "all", "labels": "P1", "limit": 100})
current_count = 0
previous_count = 0
for issue in issues:
created_at = issue.get("created_at", "")
if not created_at:
continue
try:
created = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
if created >= cutoff_current:
current_count += 1
elif created >= cutoff_previous:
previous_count += 1
except (ValueError, TypeError):
continue
# Return net growth (positive means growing backlog)
return max(0, current_count - previous_count)
except Exception as exc:
logger.debug("Failed to calculate P1 backlog growth: %s", exc)
return 0.0
def _calculate_ci_failure_rate() -> float:
"""Calculate CI failure rate from recent runs."""
try:
# Try to get CI metrics from Gitea or local files
ci_results_path = Path(settings.repo_root) / ".loop" / "ci_results.jsonl"
if not ci_results_path.exists():
return 0.0
now = datetime.now(UTC)
cutoff = now - timedelta(days=7)
total_runs = 0
failed_runs = 0
for line in ci_results_path.read_text().strip().splitlines():
try:
entry = json.loads(line)
ts_str = entry.get("timestamp", "")
if not ts_str:
continue
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
if ts >= cutoff:
total_runs += 1
if entry.get("status") != "success":
failed_runs += 1
except (json.JSONDecodeError, ValueError):
continue
return failed_runs / max(total_runs, 1)
except Exception as exc:
logger.debug("Failed to calculate CI failure rate: %s", exc)
return 0.0
def _calculate_open_bug_count() -> float:
"""Calculate current open bug count."""
try:
from dashboard.routes.daily_run import GiteaClient, _load_config
config = _load_config()
token = config.get("token")
client = GiteaClient(config, token)
if not client.is_available():
return 0.0
issues = client.get_paginated("issues", {"state": "open", "labels": "bug", "limit": 100})
return float(len(issues))
except Exception as exc:
logger.debug("Failed to calculate open bug count: %s", exc)
return 0.0
def _collect_stress_signals() -> list[StressSignal]:
"""Collect all stress signals from the system."""
config = _load_stress_config()
default_config = get_default_config()
signals_config = config.get("signals", default_config["signals"])
signals = []
# Define signal collectors
collectors = {
"flaky_test_rate": _calculate_flaky_test_rate,
"p1_backlog_growth": _calculate_p1_backlog_growth,
"ci_failure_rate": _calculate_ci_failure_rate,
"open_bug_count": _calculate_open_bug_count,
}
for signal_name, collector in collectors.items():
signal_cfg = signals_config.get(signal_name, {})
default_cfg = default_config["signals"].get(signal_name, {})
try:
value = collector()
threshold = signal_cfg.get("threshold", default_cfg.get("threshold", 1.0))
weight = signal_cfg.get("weight", default_cfg.get("weight", 0.25))
signals.append(
StressSignal(
name=signal_name,
value=value,
threshold=threshold,
weight=weight,
)
)
except Exception as exc:
logger.debug("Failed to collect signal %s: %s", signal_name, exc)
return signals
def _calculate_stress_score(signals: list[StressSignal]) -> float:
"""Calculate overall stress score from signals.
Score is weighted sum of triggered signal contributions,
normalized to 0-1 range.
"""
if not signals:
return 0.0
total_weight = sum(s.weight for s in signals)
if total_weight == 0:
return 0.0
triggered_contribution = sum(s.contribution for s in signals)
return min(1.0, triggered_contribution / total_weight)
def _get_multipliers_for_mode(mode: StressMode) -> dict[str, float]:
"""Get token multipliers for a specific stress mode."""
config = _load_stress_config()
default_config = get_default_config()
multipliers = config.get("multipliers", default_config["multipliers"])
mode_multipliers = multipliers.get(mode.value, {})
default_mode_multipliers = default_config["multipliers"].get(mode.value, {})
# Merge with defaults
result = default_mode_multipliers.copy()
result.update(mode_multipliers)
return result
def detect_stress_mode(
force_refresh: bool = False,
min_check_interval_seconds: int = 60,
) -> StressSnapshot:
"""Detect current system stress mode.
Args:
force_refresh: Force a new check even if recently checked
min_check_interval_seconds: Minimum seconds between checks
Returns:
StressSnapshot with mode, score, signals, and multipliers
"""
global _current_snapshot, _last_check_time
now = datetime.now(UTC)
# Return cached snapshot if recent and not forced
if not force_refresh and _current_snapshot is not None and _last_check_time is not None:
elapsed = (now - _last_check_time).total_seconds()
if elapsed < min_check_interval_seconds:
return _current_snapshot
# Collect signals and calculate stress
signals = _collect_stress_signals()
score = _calculate_stress_score(signals)
# Determine mode from score
config = _load_stress_config()
default_config = get_default_config()
thresholds_cfg = config.get("thresholds", default_config["thresholds"])
thresholds = StressThresholds(
elevated_min=thresholds_cfg.get("elevated_min", 0.3),
high_min=thresholds_cfg.get("high_min", 0.6),
)
mode = thresholds.get_mode_for_score(score)
# Get multipliers for this mode
multipliers = _get_multipliers_for_mode(mode)
# Create snapshot
snapshot = StressSnapshot(
mode=mode,
score=score,
signals=signals,
multipliers=multipliers,
timestamp=now.isoformat(),
)
# Cache result
_current_snapshot = snapshot
_last_check_time = now
# Log mode changes
if _current_snapshot is not None and _current_snapshot.mode != mode:
logger.info(
"Stress mode changed: %s -> %s (score: %.2f)",
_current_snapshot.mode.value if _current_snapshot else "none",
mode.value,
score,
)
return snapshot
def get_current_stress_mode() -> StressMode:
"""Get current stress mode (uses cached or fresh detection)."""
snapshot = detect_stress_mode()
return snapshot.mode
def get_multiplier(quest_type: str, mode: StressMode | None = None) -> float:
"""Get token multiplier for a quest type.
Args:
quest_type: Type of quest (test_improve, issue_count, etc.)
mode: Specific mode to get multiplier for, or None for current
Returns:
Multiplier value (1.0 = normal, 1.5 = 50% bonus, etc.)
"""
if mode is None:
mode = get_current_stress_mode()
multipliers = _get_multipliers_for_mode(mode)
return multipliers.get(quest_type, 1.0)
def apply_multiplier(base_reward: int, quest_type: str) -> int:
"""Apply stress-based multiplier to a base reward.
Args:
base_reward: Base token reward amount
quest_type: Type of quest for multiplier lookup
Returns:
Adjusted reward amount (always >= 1)
"""
multiplier = get_multiplier(quest_type)
adjusted = int(base_reward * multiplier)
return max(1, adjusted)
def get_stress_summary() -> dict[str, Any]:
"""Get a human-readable summary of current stress state."""
snapshot = detect_stress_mode()
# Generate explanation
explanations = {
StressMode.CALM: "System is calm. Good time for exploration and refactoring.",
StressMode.ELEVATED: "Elevated stress detected. Focus on stability and tests.",
StressMode.HIGH: "HIGH STRESS MODE. Prioritize bug fixes and test hardening.",
}
triggered_signals = [s for s in snapshot.signals if s.is_triggered]
return {
"mode": snapshot.mode.value,
"score": round(snapshot.score, 3),
"explanation": explanations.get(snapshot.mode, "Unknown mode"),
"active_signals": [
{
"name": s.name,
"value": round(s.value, 3),
"threshold": s.threshold,
}
for s in triggered_signals
],
"current_multipliers": snapshot.multipliers,
"last_updated": snapshot.timestamp,
}
def reset_stress_state() -> None:
"""Reset stress state cache (useful for testing)."""
global _current_snapshot, _last_check_time, _config_cache, _config_mtime
_current_snapshot = None
_last_check_time = None
_config_cache = None
_config_mtime = 0.0

View File

@@ -0,0 +1,536 @@
"""Tests for the Golden Path generator."""
import json
from datetime import UTC, datetime
from unittest.mock import MagicMock, patch
from timmy_automations.daily_run.golden_path import (
TIME_ESTIMATES,
TYPE_PATTERNS,
GiteaClient,
GoldenPath,
PathItem,
build_golden_path,
classify_issue_type,
estimate_time,
extract_size,
generate_golden_path,
get_token,
group_issues_by_type,
load_config,
score_issue_for_path,
)
class TestLoadConfig:
"""Tests for configuration loading."""
def test_load_config_defaults(self):
"""Config should have sensible defaults."""
config = load_config()
assert "gitea_api" in config
assert "repo_slug" in config
assert "size_labels" in config
def test_load_config_env_override(self, monkeypatch):
"""Environment variables should override defaults."""
monkeypatch.setenv("TIMMY_GITEA_API", "http://custom:3000/api/v1")
monkeypatch.setenv("TIMMY_REPO_SLUG", "custom/repo")
monkeypatch.setenv("TIMMY_GITEA_TOKEN", "test-token")
config = load_config()
assert config["gitea_api"] == "http://custom:3000/api/v1"
assert config["repo_slug"] == "custom/repo"
assert config["token"] == "test-token"
class TestGetToken:
"""Tests for token retrieval."""
def test_get_token_from_config(self):
"""Token from config takes precedence."""
config = {"token": "config-token", "token_file": "~/.test"}
assert get_token(config) == "config-token"
@patch("pathlib.Path.exists")
@patch("pathlib.Path.read_text")
def test_get_token_from_file(self, mock_read, mock_exists):
"""Token can be read from file."""
mock_exists.return_value = True
mock_read.return_value = "file-token\n"
config = {"token_file": "~/.hermes/test_token"}
assert get_token(config) == "file-token"
def test_get_token_none(self):
"""Returns None if no token available."""
config = {"token_file": "/nonexistent/path"}
assert get_token(config) is None
class TestExtractSize:
"""Tests for size label extraction."""
def test_extract_size_xs(self):
"""Should extract XS size."""
labels = [{"name": "size:XS"}, {"name": "bug"}]
assert extract_size(labels) == "XS"
def test_extract_size_s(self):
"""Should extract S size."""
labels = [{"name": "bug"}, {"name": "size:S"}]
assert extract_size(labels) == "S"
def test_extract_size_m(self):
"""Should extract M size."""
labels = [{"name": "size:M"}]
assert extract_size(labels) == "M"
def test_extract_size_unknown(self):
"""Should return ? for unknown size."""
labels = [{"name": "bug"}, {"name": "feature"}]
assert extract_size(labels) == "?"
def test_extract_size_empty(self):
"""Should return ? for empty labels."""
assert extract_size([]) == "?"
class TestClassifyIssueType:
"""Tests for issue type classification."""
def test_classify_triage(self):
"""Should classify triage issues."""
issue = {
"title": "Triage new issues",
"labels": [{"name": "triage"}],
}
assert classify_issue_type(issue) == "triage"
def test_classify_test(self):
"""Should classify test issues."""
issue = {
"title": "Add unit tests for parser",
"labels": [{"name": "test"}],
}
assert classify_issue_type(issue) == "test"
def test_classify_fix(self):
"""Should classify fix issues."""
issue = {
"title": "Fix login bug",
"labels": [{"name": "bug"}],
}
assert classify_issue_type(issue) == "fix"
def test_classify_docs(self):
"""Should classify docs issues."""
issue = {
"title": "Update README",
"labels": [{"name": "docs"}],
}
assert classify_issue_type(issue) == "docs"
def test_classify_refactor(self):
"""Should classify refactor issues."""
issue = {
"title": "Refactor validation logic",
"labels": [{"name": "refactor"}],
}
assert classify_issue_type(issue) == "refactor"
def test_classify_default_to_fix(self):
"""Should default to fix for uncategorized."""
issue = {
"title": "Something vague",
"labels": [{"name": "question"}],
}
assert classify_issue_type(issue) == "fix"
def test_classify_title_priority(self):
"""Title patterns should contribute to classification."""
issue = {
"title": "Fix the broken parser",
"labels": [],
}
assert classify_issue_type(issue) == "fix"
class TestEstimateTime:
"""Tests for time estimation."""
def test_estimate_xs_fix(self):
"""XS fix should be 10 minutes."""
issue = {
"title": "Fix typo",
"labels": [{"name": "size:XS"}, {"name": "bug"}],
}
assert estimate_time(issue) == 10
def test_estimate_s_test(self):
"""S test should be 15 minutes."""
issue = {
"title": "Add test coverage",
"labels": [{"name": "size:S"}, {"name": "test"}],
}
assert estimate_time(issue) == 15
def test_estimate_m_fix(self):
"""M fix should be 25 minutes."""
issue = {
"title": "Fix complex bug",
"labels": [{"name": "size:M"}, {"name": "bug"}],
}
assert estimate_time(issue) == 25
def test_estimate_unknown_size(self):
"""Unknown size should fallback to S."""
issue = {
"title": "Some fix",
"labels": [{"name": "bug"}],
}
# Falls back to S/fix = 15
assert estimate_time(issue) == 15
class TestScoreIssueForPath:
"""Tests for issue scoring."""
def test_score_prefers_xs(self):
"""XS issues should score higher."""
xs = {"title": "Fix", "labels": [{"name": "size:XS"}]}
s = {"title": "Fix", "labels": [{"name": "size:S"}]}
m = {"title": "Fix", "labels": [{"name": "size:M"}]}
assert score_issue_for_path(xs) > score_issue_for_path(s)
assert score_issue_for_path(s) > score_issue_for_path(m)
def test_score_prefers_clear_types(self):
"""Issues with clear type labels score higher."""
# Bug label adds score, so with bug should be >= without bug
with_type = {
"title": "Fix bug",
"labels": [{"name": "size:S"}, {"name": "bug"}],
}
without_type = {
"title": "Something",
"labels": [{"name": "size:S"}],
}
assert score_issue_for_path(with_type) >= score_issue_for_path(without_type)
def test_score_accepts_criteria(self):
"""Issues with acceptance criteria score higher."""
with_criteria = {
"title": "Fix",
"labels": [{"name": "size:S"}],
"body": "## Acceptance Criteria\n- [ ] Fix it",
}
without_criteria = {
"title": "Fix",
"labels": [{"name": "size:S"}],
"body": "Just fix it",
}
assert score_issue_for_path(with_criteria) > score_issue_for_path(without_criteria)
class TestGroupIssuesByType:
"""Tests for issue grouping."""
def test_groups_by_type(self):
"""Issues should be grouped by their type."""
issues = [
{"title": "Fix bug", "labels": [{"name": "bug"}], "number": 1},
{"title": "Add test", "labels": [{"name": "test"}], "number": 2},
{"title": "Another fix", "labels": [{"name": "bug"}], "number": 3},
]
grouped = group_issues_by_type(issues)
assert len(grouped["fix"]) == 2
assert len(grouped["test"]) == 1
assert len(grouped["triage"]) == 0
def test_sorts_by_score(self):
"""Issues within groups should be sorted by score."""
issues = [
{"title": "Fix", "labels": [{"name": "size:M"}], "number": 1},
{"title": "Fix", "labels": [{"name": "size:XS"}], "number": 2},
{"title": "Fix", "labels": [{"name": "size:S"}], "number": 3},
]
grouped = group_issues_by_type(issues)
# XS should be first (highest score)
assert grouped["fix"][0]["number"] == 2
# M should be last (lowest score)
assert grouped["fix"][2]["number"] == 1
class TestBuildGoldenPath:
"""Tests for Golden Path building."""
def test_builds_path_with_all_types(self):
"""Path should include items from different types."""
grouped = {
"triage": [
{"title": "Triage", "labels": [{"name": "size:XS"}], "number": 1, "html_url": ""},
],
"fix": [
{"title": "Fix 1", "labels": [{"name": "size:S"}], "number": 2, "html_url": ""},
{"title": "Fix 2", "labels": [{"name": "size:XS"}], "number": 3, "html_url": ""},
],
"test": [
{"title": "Test", "labels": [{"name": "size:S"}], "number": 4, "html_url": ""},
],
"docs": [],
"refactor": [],
}
path = build_golden_path(grouped, target_minutes=45)
assert path.item_count >= 3
assert path.items[0].issue_type == "triage" # Warm-up
assert any(item.issue_type == "test" for item in path.items)
def test_respects_time_budget(self):
"""Path should stay within reasonable time budget."""
grouped = {
"triage": [
{"title": "Triage", "labels": [{"name": "size:S"}], "number": 1, "html_url": ""},
],
"fix": [
{"title": "Fix 1", "labels": [{"name": "size:S"}], "number": 2, "html_url": ""},
{"title": "Fix 2", "labels": [{"name": "size:S"}], "number": 3, "html_url": ""},
],
"test": [
{"title": "Test", "labels": [{"name": "size:S"}], "number": 4, "html_url": ""},
],
"docs": [],
"refactor": [],
}
path = build_golden_path(grouped, target_minutes=45)
# Should be in 30-60 minute range
assert 20 <= path.total_estimated_minutes <= 70
def test_no_duplicate_issues(self):
"""Path should not include the same issue twice."""
grouped = {
"triage": [],
"fix": [
{"title": "Fix", "labels": [{"name": "size:S"}], "number": 1, "html_url": ""},
],
"test": [],
"docs": [],
"refactor": [],
}
path = build_golden_path(grouped, target_minutes=45)
numbers = [item.number for item in path.items]
assert len(numbers) == len(set(numbers)) # No duplicates
def test_fallback_when_triage_missing(self):
"""Should use fallback when no triage issues available."""
grouped = {
"triage": [],
"fix": [
{"title": "Fix", "labels": [{"name": "size:XS"}], "number": 1, "html_url": ""},
],
"test": [
{"title": "Test", "labels": [{"name": "size:XS"}], "number": 2, "html_url": ""},
],
"docs": [],
"refactor": [],
}
path = build_golden_path(grouped, target_minutes=45)
assert path.item_count > 0
class TestGoldenPathDataclass:
"""Tests for the GoldenPath dataclass."""
def test_total_time_calculation(self):
"""Should sum item times correctly."""
path = GoldenPath(
generated_at=datetime.now(UTC).isoformat(),
target_minutes=45,
items=[
PathItem(1, "Test 1", "XS", "fix", 10, ""),
PathItem(2, "Test 2", "S", "test", 15, ""),
],
)
assert path.total_estimated_minutes == 25
def test_to_dict(self):
"""Should convert to dict correctly."""
path = GoldenPath(
generated_at="2024-01-01T00:00:00+00:00",
target_minutes=45,
items=[PathItem(1, "Test", "XS", "fix", 10, "http://test")],
)
data = path.to_dict()
assert data["target_minutes"] == 45
assert data["total_estimated_minutes"] == 10
assert data["item_count"] == 1
assert len(data["items"]) == 1
def test_to_json(self):
"""Should convert to JSON correctly."""
path = GoldenPath(
generated_at="2024-01-01T00:00:00+00:00",
target_minutes=45,
items=[],
)
json_str = path.to_json()
data = json.loads(json_str)
assert data["target_minutes"] == 45
class TestGiteaClient:
"""Tests for the GiteaClient."""
def test_client_initialization(self):
"""Client should initialize with config."""
config = {
"gitea_api": "http://test:3000/api/v1",
"repo_slug": "test/repo",
}
client = GiteaClient(config, "token123")
assert client.api_base == "http://test:3000/api/v1"
assert client.repo_slug == "test/repo"
assert client.token == "token123"
def test_headers_with_token(self):
"""Headers should include auth token."""
config = {"gitea_api": "http://test", "repo_slug": "test/repo"}
client = GiteaClient(config, "mytoken")
headers = client._headers()
assert headers["Authorization"] == "token mytoken"
assert headers["Accept"] == "application/json"
def test_headers_without_token(self):
"""Headers should work without token."""
config = {"gitea_api": "http://test", "repo_slug": "test/repo"}
client = GiteaClient(config, None)
headers = client._headers()
assert "Authorization" not in headers
assert headers["Accept"] == "application/json"
@patch("timmy_automations.daily_run.golden_path.urlopen")
def test_is_available_success(self, mock_urlopen):
"""Should detect API availability."""
mock_response = MagicMock()
mock_response.status = 200
mock_context = MagicMock()
mock_context.__enter__ = MagicMock(return_value=mock_response)
mock_context.__exit__ = MagicMock(return_value=False)
mock_urlopen.return_value = mock_context
config = {"gitea_api": "http://test", "repo_slug": "test/repo"}
client = GiteaClient(config, None)
assert client.is_available() is True
@patch("urllib.request.urlopen")
def test_is_available_failure(self, mock_urlopen):
"""Should handle API unavailability."""
from urllib.error import URLError
mock_urlopen.side_effect = URLError("Connection refused")
config = {"gitea_api": "http://test", "repo_slug": "test/repo"}
client = GiteaClient(config, None)
assert client.is_available() is False
class TestIntegration:
"""Integration-style tests."""
@patch("timmy_automations.daily_run.golden_path.GiteaClient")
def test_generate_golden_path_integration(self, mock_client_class):
"""End-to-end test with mocked Gitea."""
# Setup mock
mock_client = MagicMock()
mock_client.is_available.return_value = True
mock_client.get_paginated.return_value = [
{
"number": 1,
"title": "Triage issues",
"labels": [{"name": "size:XS"}, {"name": "triage"}],
"html_url": "http://test/1",
},
{
"number": 2,
"title": "Fix bug",
"labels": [{"name": "size:S"}, {"name": "bug"}],
"html_url": "http://test/2",
},
{
"number": 3,
"title": "Add tests",
"labels": [{"name": "size:S"}, {"name": "test"}],
"html_url": "http://test/3",
},
{
"number": 4,
"title": "Another fix",
"labels": [{"name": "size:XS"}, {"name": "bug"}],
"html_url": "http://test/4",
},
]
mock_client_class.return_value = mock_client
path = generate_golden_path(target_minutes=45)
assert path.item_count >= 3
assert all(item.url.startswith("http://test/") for item in path.items)
@patch("timmy_automations.daily_run.golden_path.GiteaClient")
def test_generate_when_unavailable(self, mock_client_class):
"""Should return empty path when Gitea unavailable."""
mock_client = MagicMock()
mock_client.is_available.return_value = False
mock_client_class.return_value = mock_client
path = generate_golden_path(target_minutes=45)
assert path.item_count == 0
assert path.items == []
class TestTypePatterns:
"""Tests for type pattern definitions."""
def test_type_patterns_structure(self):
"""Type patterns should have required keys."""
for _issue_type, patterns in TYPE_PATTERNS.items():
assert "labels" in patterns
assert "title" in patterns
assert isinstance(patterns["labels"], list)
assert isinstance(patterns["title"], list)
def test_time_estimates_structure(self):
"""Time estimates should have all sizes."""
for size in ["XS", "S", "M"]:
assert size in TIME_ESTIMATES
for issue_type in ["triage", "fix", "test", "docs", "refactor"]:
assert issue_type in TIME_ESTIMATES[size]
assert isinstance(TIME_ESTIMATES[size][issue_type], int)
assert TIME_ESTIMATES[size][issue_type] > 0

View File

@@ -0,0 +1,489 @@
"""Unit tests for the quest system.
Tests quest definitions, progress tracking, completion detection,
and token rewards.
"""
from __future__ import annotations
import pytest
from timmy.quest_system import (
QuestDefinition,
QuestProgress,
QuestStatus,
QuestType,
_is_on_cooldown,
claim_quest_reward,
evaluate_quest_progress,
get_or_create_progress,
get_quest_definition,
get_quest_leaderboard,
load_quest_config,
reset_quest_progress,
update_quest_progress,
)
@pytest.fixture(autouse=True)
def clean_quest_state():
"""Reset quest progress between tests."""
reset_quest_progress()
yield
reset_quest_progress()
@pytest.fixture
def sample_issue_count_quest():
"""Create a sample issue_count quest definition."""
return QuestDefinition(
id="test_close_issues",
name="Test Issue Closer",
description="Close 3 test issues",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=False,
cooldown_hours=0,
criteria={"target_count": 3, "issue_labels": ["test"]},
notification_message="Test quest complete! Earned {tokens} tokens.",
)
@pytest.fixture
def sample_daily_run_quest():
"""Create a sample daily_run quest definition."""
return QuestDefinition(
id="test_daily_run",
name="Test Daily Runner",
description="Complete 5 sessions",
reward_tokens=250,
quest_type=QuestType.DAILY_RUN,
enabled=True,
repeatable=True,
cooldown_hours=24,
criteria={"min_sessions": 5},
notification_message="Daily run quest complete! Earned {tokens} tokens.",
)
# ── Quest Definition Tests ───────────────────────────────────────────────
class TestQuestDefinition:
def test_from_dict_minimal(self):
data = {"id": "test_quest", "name": "Test Quest"}
quest = QuestDefinition.from_dict(data)
assert quest.id == "test_quest"
assert quest.name == "Test Quest"
assert quest.quest_type == QuestType.CUSTOM
assert quest.enabled is True
def test_from_dict_full(self):
data = {
"id": "full_quest",
"name": "Full Quest",
"description": "A test quest",
"reward_tokens": 500,
"type": "issue_count",
"enabled": False,
"repeatable": True,
"cooldown_hours": 12,
"criteria": {"target_count": 5},
"notification_message": "Done!",
}
quest = QuestDefinition.from_dict(data)
assert quest.id == "full_quest"
assert quest.reward_tokens == 500
assert quest.quest_type == QuestType.ISSUE_COUNT
assert quest.enabled is False
assert quest.repeatable is True
assert quest.cooldown_hours == 12
# ── Quest Progress Tests ─────────────────────────────────────────────────
class TestQuestProgress:
def test_progress_creation(self):
progress = QuestProgress(
quest_id="test_quest",
agent_id="test_agent",
status=QuestStatus.NOT_STARTED,
)
assert progress.quest_id == "test_quest"
assert progress.agent_id == "test_agent"
assert progress.current_value == 0
def test_progress_to_dict(self):
progress = QuestProgress(
quest_id="test_quest",
agent_id="test_agent",
status=QuestStatus.IN_PROGRESS,
current_value=2,
target_value=5,
)
data = progress.to_dict()
assert data["quest_id"] == "test_quest"
assert data["status"] == "in_progress"
assert data["current_value"] == 2
# ── Quest Loading Tests ──────────────────────────────────────────────────
class TestQuestLoading:
def test_load_quest_config(self):
definitions, settings = load_quest_config()
assert isinstance(definitions, dict)
assert isinstance(settings, dict)
def test_get_quest_definition_exists(self):
# Should return None for non-existent quest in fresh state
quest = get_quest_definition("nonexistent")
# The function returns from loaded config, which may have quests
# or be empty if config doesn't exist
assert quest is None or isinstance(quest, QuestDefinition)
def test_get_quest_definition_not_found(self):
quest = get_quest_definition("definitely_not_a_real_quest_12345")
assert quest is None
# ── Quest Progress Management Tests ─────────────────────────────────────
class TestQuestProgressManagement:
def test_get_or_create_progress_new(self):
# First create a quest definition
quest = QuestDefinition(
id="progress_test",
name="Progress Test",
description="Test quest",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=False,
cooldown_hours=0,
criteria={"target_count": 3},
notification_message="Done!",
)
# Need to inject into the definitions dict
from timmy.quest_system import _quest_definitions
_quest_definitions["progress_test"] = quest
progress = get_or_create_progress("progress_test", "agent1")
assert progress.quest_id == "progress_test"
assert progress.agent_id == "agent1"
assert progress.status == QuestStatus.NOT_STARTED
assert progress.target_value == 3
del _quest_definitions["progress_test"]
def test_update_quest_progress(self):
quest = QuestDefinition(
id="update_test",
name="Update Test",
description="Test quest",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=False,
cooldown_hours=0,
criteria={"target_count": 3},
notification_message="Done!",
)
from timmy.quest_system import _quest_definitions
_quest_definitions["update_test"] = quest
# Create initial progress
progress = get_or_create_progress("update_test", "agent1")
assert progress.current_value == 0
# Update progress
updated = update_quest_progress("update_test", "agent1", 2)
assert updated.current_value == 2
assert updated.status == QuestStatus.NOT_STARTED
# Complete the quest
completed = update_quest_progress("update_test", "agent1", 3)
assert completed.current_value == 3
assert completed.status == QuestStatus.COMPLETED
assert completed.completed_at != ""
del _quest_definitions["update_test"]
# ── Quest Evaluation Tests ───────────────────────────────────────────────
class TestQuestEvaluation:
def test_evaluate_issue_count_quest(self):
quest = QuestDefinition(
id="eval_test",
name="Eval Test",
description="Test quest",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=False,
cooldown_hours=0,
criteria={"target_count": 2, "issue_labels": ["test"]},
notification_message="Done!",
)
from timmy.quest_system import _quest_definitions
_quest_definitions["eval_test"] = quest
# Simulate closed issues
closed_issues = [
{"id": 1, "labels": [{"name": "test"}]},
{"id": 2, "labels": [{"name": "test"}, {"name": "bug"}]},
{"id": 3, "labels": [{"name": "other"}]},
]
context = {"closed_issues": closed_issues}
progress = evaluate_quest_progress("eval_test", "agent1", context)
assert progress is not None
assert progress.current_value == 2 # Two issues with 'test' label
del _quest_definitions["eval_test"]
def test_evaluate_issue_reduce_quest(self):
quest = QuestDefinition(
id="reduce_test",
name="Reduce Test",
description="Test quest",
reward_tokens=200,
quest_type=QuestType.ISSUE_REDUCE,
enabled=True,
repeatable=False,
cooldown_hours=0,
criteria={"target_reduction": 2},
notification_message="Done!",
)
from timmy.quest_system import _quest_definitions
_quest_definitions["reduce_test"] = quest
context = {"previous_issue_count": 10, "current_issue_count": 7}
progress = evaluate_quest_progress("reduce_test", "agent1", context)
assert progress is not None
assert progress.current_value == 3 # Reduced by 3
del _quest_definitions["reduce_test"]
def test_evaluate_daily_run_quest(self):
quest = QuestDefinition(
id="daily_test",
name="Daily Test",
description="Test quest",
reward_tokens=250,
quest_type=QuestType.DAILY_RUN,
enabled=True,
repeatable=True,
cooldown_hours=24,
criteria={"min_sessions": 5},
notification_message="Done!",
)
from timmy.quest_system import _quest_definitions
_quest_definitions["daily_test"] = quest
context = {"sessions_completed": 5}
progress = evaluate_quest_progress("daily_test", "agent1", context)
assert progress is not None
assert progress.current_value == 5
assert progress.status == QuestStatus.COMPLETED
del _quest_definitions["daily_test"]
# ── Quest Cooldown Tests ─────────────────────────────────────────────────
class TestQuestCooldown:
def test_is_on_cooldown_no_cooldown(self):
quest = QuestDefinition(
id="cooldown_test",
name="Cooldown Test",
description="Test quest",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=True,
cooldown_hours=24,
criteria={},
notification_message="Done!",
)
progress = QuestProgress(
quest_id="cooldown_test",
agent_id="agent1",
status=QuestStatus.CLAIMED,
)
# No last_completed_at means no cooldown
assert _is_on_cooldown(progress, quest) is False
# ── Quest Reward Tests ───────────────────────────────────────────────────
class TestQuestReward:
def test_claim_quest_reward_not_completed(self):
quest = QuestDefinition(
id="reward_test",
name="Reward Test",
description="Test quest",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=False,
cooldown_hours=0,
criteria={"target_count": 3},
notification_message="Done!",
)
from timmy.quest_system import _quest_definitions, _quest_progress
_quest_definitions["reward_test"] = quest
# Create progress but don't complete
progress = get_or_create_progress("reward_test", "agent1")
_quest_progress["agent1:reward_test"] = progress
# Try to claim - should fail
reward = claim_quest_reward("reward_test", "agent1")
assert reward is None
del _quest_definitions["reward_test"]
# ── Leaderboard Tests ────────────────────────────────────────────────────
class TestQuestLeaderboard:
def test_get_quest_leaderboard_empty(self):
reset_quest_progress()
leaderboard = get_quest_leaderboard()
assert leaderboard == []
def test_get_quest_leaderboard_with_data(self):
# Create and complete a quest for two agents
quest = QuestDefinition(
id="leaderboard_test",
name="Leaderboard Test",
description="Test quest",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=True,
cooldown_hours=0,
criteria={"target_count": 1},
notification_message="Done!",
)
from timmy.quest_system import _quest_definitions, _quest_progress
_quest_definitions["leaderboard_test"] = quest
# Create progress for agent1 with 2 completions
progress1 = QuestProgress(
quest_id="leaderboard_test",
agent_id="agent1",
status=QuestStatus.NOT_STARTED,
completion_count=2,
)
_quest_progress["agent1:leaderboard_test"] = progress1
# Create progress for agent2 with 1 completion
progress2 = QuestProgress(
quest_id="leaderboard_test",
agent_id="agent2",
status=QuestStatus.NOT_STARTED,
completion_count=1,
)
_quest_progress["agent2:leaderboard_test"] = progress2
leaderboard = get_quest_leaderboard()
assert len(leaderboard) == 2
# agent1 should be first (more tokens)
assert leaderboard[0]["agent_id"] == "agent1"
assert leaderboard[0]["total_tokens"] == 200
assert leaderboard[1]["agent_id"] == "agent2"
assert leaderboard[1]["total_tokens"] == 100
del _quest_definitions["leaderboard_test"]
# ── Quest Reset Tests ─────────────────────────────────────────────────────
class TestQuestReset:
def test_reset_quest_progress_all(self):
# Create some progress entries
progress1 = QuestProgress(
quest_id="quest1", agent_id="agent1", status=QuestStatus.NOT_STARTED
)
progress2 = QuestProgress(
quest_id="quest2", agent_id="agent2", status=QuestStatus.NOT_STARTED
)
from timmy.quest_system import _quest_progress
_quest_progress["agent1:quest1"] = progress1
_quest_progress["agent2:quest2"] = progress2
assert len(_quest_progress) == 2
count = reset_quest_progress()
assert count == 2
assert len(_quest_progress) == 0
def test_reset_quest_progress_specific_quest(self):
progress1 = QuestProgress(
quest_id="quest1", agent_id="agent1", status=QuestStatus.NOT_STARTED
)
progress2 = QuestProgress(
quest_id="quest2", agent_id="agent1", status=QuestStatus.NOT_STARTED
)
from timmy.quest_system import _quest_progress
_quest_progress["agent1:quest1"] = progress1
_quest_progress["agent1:quest2"] = progress2
count = reset_quest_progress(quest_id="quest1")
assert count == 1
assert "agent1:quest1" not in _quest_progress
assert "agent1:quest2" in _quest_progress
def test_reset_quest_progress_specific_agent(self):
progress1 = QuestProgress(
quest_id="quest1", agent_id="agent1", status=QuestStatus.NOT_STARTED
)
progress2 = QuestProgress(
quest_id="quest1", agent_id="agent2", status=QuestStatus.NOT_STARTED
)
from timmy.quest_system import _quest_progress
_quest_progress["agent1:quest1"] = progress1
_quest_progress["agent2:quest1"] = progress2
count = reset_quest_progress(agent_id="agent1")
assert count == 1
assert "agent1:quest1" not in _quest_progress
assert "agent2:quest1" in _quest_progress

View File

@@ -0,0 +1,294 @@
"""Unit tests for the stress detector module.
Tests stress signal calculation, mode detection, multipliers,
and integration with the quest system.
"""
from __future__ import annotations
import pytest
from timmy.stress_detector import (
StressMode,
StressSignal,
StressSnapshot,
StressThresholds,
_calculate_stress_score,
_get_multipliers_for_mode,
apply_multiplier,
get_default_config,
reset_stress_state,
)
@pytest.fixture(autouse=True)
def clean_stress_state():
"""Reset stress state between tests."""
reset_stress_state()
yield
reset_stress_state()
# ── Stress Mode Tests ──────────────────────────────────────────────────────
class TestStressMode:
def test_stress_mode_values(self):
"""StressMode enum has expected values."""
assert StressMode.CALM.value == "calm"
assert StressMode.ELEVATED.value == "elevated"
assert StressMode.HIGH.value == "high"
# ── Stress Signal Tests ────────────────────────────────────────────────────
class TestStressSignal:
def test_signal_not_triggered(self):
"""Signal with value below threshold is not triggered."""
signal = StressSignal(
name="test_signal",
value=5.0,
threshold=10.0,
weight=0.5,
)
assert not signal.is_triggered
assert signal.contribution == 0.0
def test_signal_triggered(self):
"""Signal with value at threshold is triggered."""
signal = StressSignal(
name="test_signal",
value=10.0,
threshold=10.0,
weight=0.5,
)
assert signal.is_triggered
assert signal.contribution == 0.5 # weight * min(1, value/threshold)
def test_signal_contribution_capped(self):
"""Signal contribution is capped at weight when value >> threshold."""
signal = StressSignal(
name="test_signal",
value=100.0,
threshold=10.0,
weight=0.5,
)
assert signal.is_triggered
assert signal.contribution == 0.5 # Capped at weight
def test_signal_partial_contribution(self):
"""Signal contribution scales with value/threshold ratio."""
signal = StressSignal(
name="test_signal",
value=15.0,
threshold=10.0,
weight=0.5,
)
assert signal.is_triggered
# contribution = min(1, 15/10) * 0.5 = 0.5 (capped)
assert signal.contribution == 0.5
# ── Stress Thresholds Tests ────────────────────────────────────────────────
class TestStressThresholds:
def test_calm_mode(self):
"""Score below elevated_min returns CALM mode."""
thresholds = StressThresholds(elevated_min=0.3, high_min=0.6)
assert thresholds.get_mode_for_score(0.0) == StressMode.CALM
assert thresholds.get_mode_for_score(0.1) == StressMode.CALM
assert thresholds.get_mode_for_score(0.29) == StressMode.CALM
def test_elevated_mode(self):
"""Score between elevated_min and high_min returns ELEVATED mode."""
thresholds = StressThresholds(elevated_min=0.3, high_min=0.6)
assert thresholds.get_mode_for_score(0.3) == StressMode.ELEVATED
assert thresholds.get_mode_for_score(0.5) == StressMode.ELEVATED
assert thresholds.get_mode_for_score(0.59) == StressMode.ELEVATED
def test_high_mode(self):
"""Score at or above high_min returns HIGH mode."""
thresholds = StressThresholds(elevated_min=0.3, high_min=0.6)
assert thresholds.get_mode_for_score(0.6) == StressMode.HIGH
assert thresholds.get_mode_for_score(0.8) == StressMode.HIGH
assert thresholds.get_mode_for_score(1.0) == StressMode.HIGH
# ── Stress Score Calculation Tests ─────────────────────────────────────────
class TestStressScoreCalculation:
def test_empty_signals(self):
"""Empty signal list returns zero stress score."""
score = _calculate_stress_score([])
assert score == 0.0
def test_no_triggered_signals(self):
"""No triggered signals means zero stress score."""
signals = [
StressSignal(name="s1", value=1.0, threshold=10.0, weight=0.5),
StressSignal(name="s2", value=2.0, threshold=10.0, weight=0.5),
]
score = _calculate_stress_score(signals)
assert score == 0.0
def test_single_triggered_signal(self):
"""Single triggered signal contributes its weight."""
signals = [
StressSignal(name="s1", value=10.0, threshold=10.0, weight=0.5),
]
score = _calculate_stress_score(signals)
# contribution = 0.5, total_weight = 0.5, score = 0.5/0.5 = 1.0
assert score == 1.0
def test_mixed_signals(self):
"""Mix of triggered and non-triggered signals."""
signals = [
StressSignal(name="s1", value=10.0, threshold=10.0, weight=0.3),
StressSignal(name="s2", value=1.0, threshold=10.0, weight=0.3),
StressSignal(name="s3", value=10.0, threshold=10.0, weight=0.4),
]
score = _calculate_stress_score(signals)
# triggered contributions: 0.3 + 0.4 = 0.7
# total_weight: 0.3 + 0.3 + 0.4 = 1.0
# score = 0.7 / 1.0 = 0.7
assert score == 0.7
def test_score_capped_at_one(self):
"""Stress score is capped at 1.0."""
signals = [
StressSignal(name="s1", value=100.0, threshold=10.0, weight=1.0),
StressSignal(name="s2", value=100.0, threshold=10.0, weight=1.0),
]
score = _calculate_stress_score(signals)
assert score == 1.0 # Capped
# ── Multiplier Tests ───────────────────────────────────────────────────────
class TestMultipliers:
def test_default_config_structure(self):
"""Default config has expected structure."""
config = get_default_config()
assert "thresholds" in config
assert "signals" in config
assert "multipliers" in config
def test_calm_mode_multipliers(self):
"""Calm mode has expected multipliers."""
multipliers = _get_multipliers_for_mode(StressMode.CALM)
assert multipliers["test_improve"] == 1.0
assert multipliers["docs_update"] == 1.2
assert multipliers["exploration"] == 1.3
assert multipliers["refactor"] == 1.2
def test_elevated_mode_multipliers(self):
"""Elevated mode has expected multipliers."""
multipliers = _get_multipliers_for_mode(StressMode.ELEVATED)
assert multipliers["test_improve"] == 1.2
assert multipliers["issue_reduce"] == 1.1
assert multipliers["refactor"] == 0.9
def test_high_mode_multipliers(self):
"""High stress mode has expected multipliers."""
multipliers = _get_multipliers_for_mode(StressMode.HIGH)
assert multipliers["test_improve"] == 1.5
assert multipliers["issue_reduce"] == 1.4
assert multipliers["exploration"] == 0.7
assert multipliers["refactor"] == 0.6
def test_multiplier_fallback_for_unknown_type(self):
"""Unknown quest types return default multiplier of 1.0."""
multipliers = _get_multipliers_for_mode(StressMode.CALM)
assert multipliers.get("unknown_type", 1.0) == 1.0
# ── Apply Multiplier Tests ─────────────────────────────────────────────────
class TestApplyMultiplier:
def test_apply_multiplier_calm(self):
"""Multiplier applies correctly in calm mode."""
# This test uses get_multiplier which reads from current stress mode
# Since we can't easily mock the stress mode, we test the apply_multiplier logic
base = 100
# In calm mode with test_improve = 1.0
result = apply_multiplier(base, "unknown_type")
assert result >= 1 # At least 1 token
def test_apply_multiplier_minimum_one(self):
"""Applied reward is at least 1 token."""
# Even with very low multiplier, result should be >= 1
result = apply_multiplier(1, "any_type")
assert result >= 1
# ── Stress Snapshot Tests ──────────────────────────────────────────────────
class TestStressSnapshot:
def test_snapshot_to_dict(self):
"""Snapshot can be converted to dictionary."""
signals = [
StressSignal(name="test", value=10.0, threshold=5.0, weight=0.5),
]
snapshot = StressSnapshot(
mode=StressMode.ELEVATED,
score=0.5,
signals=signals,
multipliers={"test_improve": 1.2},
)
data = snapshot.to_dict()
assert data["mode"] == "elevated"
assert data["score"] == 0.5
assert len(data["signals"]) == 1
assert data["multipliers"]["test_improve"] == 1.2
# ── Integration Tests ──────────────────────────────────────────────────────
class TestStressDetectorIntegration:
def test_reset_stress_state(self):
"""Reset clears internal state."""
# Just verify reset doesn't error
reset_stress_state()
def test_default_config_contains_all_signals(self):
"""Default config defines all expected signals."""
config = get_default_config()
signals = config["signals"]
expected_signals = [
"flaky_test_rate",
"p1_backlog_growth",
"ci_failure_rate",
"open_bug_count",
]
for signal in expected_signals:
assert signal in signals
assert "threshold" in signals[signal]
assert "weight" in signals[signal]
def test_default_config_contains_all_modes(self):
"""Default config defines all stress modes."""
config = get_default_config()
multipliers = config["multipliers"]
assert "calm" in multipliers
assert "elevated" in multipliers
assert "high" in multipliers
def test_multiplier_weights_sum_approximately_one(self):
"""Signal weights should approximately sum to 1.0."""
config = get_default_config()
signals = config["signals"]
total_weight = sum(s["weight"] for s in signals.values())
# Allow some flexibility but should be close to 1.0
assert 0.9 <= total_weight <= 1.1

View File

@@ -211,6 +211,23 @@
"agenda_time_minutes": 10
},
"outputs": []
},
{
"id": "golden_path",
"name": "Golden Path Generator",
"description": "Generates coherent 30-60 minute mini-sessions from real Gitea issues — triage, fixes, and tests",
"script": "timmy_automations/daily_run/golden_path.py",
"category": "daily_run",
"enabled": true,
"trigger": "manual",
"executable": "python3",
"config": {
"target_minutes": 45,
"size_labels": ["size:XS", "size:S", "size:M"],
"min_items": 3,
"max_items": 5
},
"outputs": []
}
]
}

View File

@@ -0,0 +1,583 @@
"""Golden Path generator — coherent 30-60 minute mini-sessions from real issues.
Fetches issues from Gitea and assembles them into ordered sequences forming
a coherent mini-session. Each Golden Path includes:
- One small triage cleanup
- Two micro-fixes (XS/S sized)
- One test-improvement task
All tasks are real issues from the Gitea repository, never synthetic.
Usage:
from timmy_automations.daily_run.golden_path import generate_golden_path
path = generate_golden_path(minutes=45)
print(path.to_json())
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
# ── Configuration ─────────────────────────────────────────────────────────
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
CONFIG_PATH = Path(__file__).parent.parent / "config" / "daily_run.json"
DEFAULT_CONFIG = {
"gitea_api": "http://localhost:3000/api/v1",
"repo_slug": "rockachopa/Timmy-time-dashboard",
"token_file": "~/.hermes/gitea_token",
"size_labels": ["size:XS", "size:S", "size:M"],
}
# Time estimates (in minutes) by size and type
TIME_ESTIMATES: dict[str, dict[str, int]] = {
"XS": {"triage": 5, "fix": 10, "test": 10, "docs": 8, "refactor": 8},
"S": {"triage": 10, "fix": 15, "test": 15, "docs": 12, "refactor": 12},
"M": {"triage": 15, "fix": 25, "test": 25, "docs": 20, "refactor": 20},
}
# Issue type detection patterns
TYPE_PATTERNS: dict[str, dict[str, list[str]]] = {
"triage": {
"labels": ["triage", "cleanup", "organize", "sort", "categorize"],
"title": ["triage", "cleanup", "organize", "sort", "categorize", "clean up"],
},
"fix": {
"labels": ["bug", "fix", "error", "broken"],
"title": ["fix", "bug", "error", "broken", "repair", "correct"],
},
"test": {
"labels": ["test", "testing", "coverage", "pytest"],
"title": ["test", "coverage", "pytest", "unit test", "integration test"],
},
"docs": {
"labels": ["docs", "documentation", "readme", "docstring"],
"title": ["doc", "readme", "comment", "guide", "tutorial"],
},
"refactor": {
"labels": ["refactor", "cleanup", "debt", "maintainability"],
"title": ["refactor", "cleanup", "simplify", "extract", "reorganize"],
},
}
def load_config() -> dict:
"""Load configuration from config file with fallback to defaults."""
config = DEFAULT_CONFIG.copy()
if CONFIG_PATH.exists():
try:
file_config = json.loads(CONFIG_PATH.read_text())
if "orchestrator" in file_config:
config.update(file_config["orchestrator"])
except (json.JSONDecodeError, OSError) as exc:
print(f"[golden_path] Warning: Could not load config: {exc}", file=sys.stderr)
# Environment variable overrides
if os.environ.get("TIMMY_GITEA_API"):
config["gitea_api"] = os.environ.get("TIMMY_GITEA_API")
if os.environ.get("TIMMY_REPO_SLUG"):
config["repo_slug"] = os.environ.get("TIMMY_REPO_SLUG")
if os.environ.get("TIMMY_GITEA_TOKEN"):
config["token"] = os.environ.get("TIMMY_GITEA_TOKEN")
return config
def get_token(config: dict) -> str | None:
"""Get Gitea token from environment or file."""
if "token" in config:
return config["token"]
token_file = Path(config["token_file"]).expanduser()
if token_file.exists():
return token_file.read_text().strip()
return None
# ── Gitea API Client ──────────────────────────────────────────────────────
class GiteaClient:
"""Simple Gitea API client with graceful degradation."""
def __init__(self, config: dict, token: str | None):
self.api_base = config["gitea_api"].rstrip("/")
self.repo_slug = config["repo_slug"]
self.token = token
self._available: bool | None = None
def _headers(self) -> dict:
headers = {"Accept": "application/json"}
if self.token:
headers["Authorization"] = f"token {self.token}"
return headers
def _api_url(self, path: str) -> str:
return f"{self.api_base}/repos/{self.repo_slug}/{path}"
def is_available(self) -> bool:
"""Check if Gitea API is reachable."""
if self._available is not None:
return self._available
try:
req = Request(
f"{self.api_base}/version",
headers=self._headers(),
method="GET",
)
with urlopen(req, timeout=5) as resp:
self._available = resp.status == 200
return self._available
except (HTTPError, URLError, TimeoutError):
self._available = False
return False
def get(self, path: str, params: dict | None = None) -> list | dict:
"""Make a GET request to the Gitea API."""
url = self._api_url(path)
if params:
query = "&".join(f"{k}={v}" for k, v in params.items())
url = f"{url}?{query}"
req = Request(url, headers=self._headers(), method="GET")
with urlopen(req, timeout=15) as resp:
return json.loads(resp.read())
def get_paginated(self, path: str, params: dict | None = None) -> list:
"""Fetch all pages of a paginated endpoint."""
all_items = []
page = 1
limit = 50
while True:
page_params = {"limit": limit, "page": page}
if params:
page_params.update(params)
batch = self.get(path, page_params)
if not batch:
break
all_items.extend(batch)
if len(batch) < limit:
break
page += 1
return all_items
# ── Issue Classification ──────────────────────────────────────────────────
def extract_size(labels: list[dict]) -> str:
"""Extract size label from issue labels."""
for label in labels:
name = label.get("name", "")
if name.startswith("size:"):
return name.replace("size:", "").upper()
return "?"
def classify_issue_type(issue: dict) -> str:
"""Classify an issue into a type based on labels and title."""
labels = [l.get("name", "").lower() for l in issue.get("labels", [])]
title = issue.get("title", "").lower()
scores: dict[str, int] = {}
for issue_type, patterns in TYPE_PATTERNS.items():
score = 0
# Check labels
for pattern in patterns["labels"]:
if any(pattern in label for label in labels):
score += 2
# Check title
for pattern in patterns["title"]:
if pattern in title:
score += 1
scores[issue_type] = score
# Return the type with highest score, or "fix" as default
if scores:
best_type = max(scores, key=lambda k: scores[k])
if scores[best_type] > 0:
return best_type
return "fix" # Default to fix for uncategorized issues
def estimate_time(issue: dict) -> int:
"""Estimate time in minutes for an issue based on size and type."""
size = extract_size(issue.get("labels", []))
issue_type = classify_issue_type(issue)
# Default to fix time estimates if type not found
type_map = issue_type if issue_type in TIME_ESTIMATES.get(size, {}) else "fix"
return TIME_ESTIMATES.get(size, TIME_ESTIMATES["S"]).get(type_map, 15)
def score_issue_for_path(issue: dict) -> int:
"""Score an issue for Golden Path suitability (higher = better fit)."""
score = 0
labels = [l.get("name", "").lower() for l in issue.get("labels", [])]
issue_type = classify_issue_type(issue)
# Prefer smaller sizes for predictability
if "size:xs" in labels:
score += 10
elif "size:s" in labels:
score += 7
elif "size:m" in labels:
score += 3
# Prefer issues with clear type labels
if issue_type in ["triage", "test", "fix"]:
score += 3
# Prefer issues with acceptance criteria or good description
body = issue.get("body", "")
if body:
if "## acceptance criteria" in body.lower() or "acceptance criteria" in body.lower():
score += 3
if len(body) > 200:
score += 1
# Prefer issues with recent activity
updated_at = issue.get("updated_at", "")
if updated_at:
try:
updated = datetime.fromisoformat(updated_at.replace("Z", "+00:00"))
days_old = (datetime.now(timezone.utc) - updated).days
if days_old < 7:
score += 2
elif days_old < 30:
score += 1
except (ValueError, TypeError):
pass
return score
# ── Golden Path Generation ────────────────────────────────────────────────
@dataclass
class PathItem:
"""A single item in a Golden Path."""
number: int
title: str
size: str
issue_type: str
estimated_minutes: int
url: str
def to_dict(self) -> dict:
return {
"number": self.number,
"title": self.title,
"size": self.size,
"type": self.issue_type,
"estimated_minutes": self.estimated_minutes,
"url": self.url,
}
@dataclass
class GoldenPath:
"""A complete Golden Path sequence."""
generated_at: str
target_minutes: int
items: list[PathItem] = field(default_factory=list)
@property
def total_estimated_minutes(self) -> int:
return sum(item.estimated_minutes for item in self.items)
@property
def item_count(self) -> int:
return len(self.items)
def to_dict(self) -> dict:
return {
"generated_at": self.generated_at,
"target_minutes": self.target_minutes,
"total_estimated_minutes": self.total_estimated_minutes,
"item_count": self.item_count,
"items": [item.to_dict() for item in self.items],
}
def to_json(self, indent: int = 2) -> str:
return json.dumps(self.to_dict(), indent=indent)
def fetch_eligible_issues(client: GiteaClient, config: dict) -> list[dict]:
"""Fetch open issues eligible for Golden Paths."""
size_labels = config.get("size_labels", ["size:XS", "size:S", "size:M"])
try:
# Fetch all open issues
issues = client.get_paginated("issues", {"state": "open", "sort": "updated"})
except (HTTPError, URLError) as exc:
print(f"[golden_path] Warning: Failed to fetch issues: {exc}", file=sys.stderr)
return []
# Filter by size labels if specified
if size_labels:
filtered = []
size_names = {s.lower() for s in size_labels}
for issue in issues:
issue_labels = {l.get("name", "").lower() for l in issue.get("labels", [])}
if issue_labels & size_names:
filtered.append(issue)
issues = filtered
return issues
def group_issues_by_type(issues: list[dict]) -> dict[str, list[dict]]:
"""Group issues by their classified type, sorted by score."""
groups: dict[str, list[dict]] = {
"triage": [],
"fix": [],
"test": [],
"docs": [],
"refactor": [],
}
for issue in issues:
issue_type = classify_issue_type(issue)
if issue_type in groups:
groups[issue_type].append(issue)
# Sort each group by score (highest first)
for issue_type in groups:
groups[issue_type] = sorted(
groups[issue_type],
key=lambda i: score_issue_for_path(i),
reverse=True,
)
return groups
def build_golden_path(
grouped_issues: dict[str, list[dict]],
target_minutes: int = 45,
) -> GoldenPath:
"""Build a Golden Path from grouped issues.
The path follows a coherent sequence:
1. One small triage cleanup (warm-up)
2. One micro-fix (momentum building)
3. One test-improvement (quality focus)
4. One more micro-fix or docs (closure)
"""
path = GoldenPath(
generated_at=datetime.now(timezone.utc).isoformat(),
target_minutes=target_minutes,
)
used_issue_numbers: set[int] = set()
def add_best_item(issues: list[dict], max_minutes: int | None = None) -> bool:
"""Add the best available issue of a type to the path."""
for issue in issues:
number = issue.get("number", 0)
if number in used_issue_numbers:
continue
est_time = estimate_time(issue)
if max_minutes and est_time > max_minutes:
continue
used_issue_numbers.add(number)
path.items.append(
PathItem(
number=number,
title=issue.get("title", "Untitled"),
size=extract_size(issue.get("labels", [])),
issue_type=classify_issue_type(issue),
estimated_minutes=est_time,
url=issue.get("html_url", ""),
)
)
return True
return False
# Phase 1: Warm-up with triage (5-10 min)
if grouped_issues["triage"]:
add_best_item(grouped_issues["triage"], max_minutes=15)
else:
# Fallback: use smallest available issue
all_issues = (
grouped_issues["fix"]
+ grouped_issues["docs"]
+ grouped_issues["refactor"]
)
all_issues.sort(key=lambda i: score_issue_for_path(i), reverse=True)
add_best_item(all_issues, max_minutes=10)
# Phase 2: First micro-fix (10-15 min)
if grouped_issues["fix"]:
add_best_item(grouped_issues["fix"], max_minutes=20)
else:
# Fallback to refactor
add_best_item(grouped_issues["refactor"], max_minutes=15)
# Phase 3: Test improvement (10-15 min)
if grouped_issues["test"]:
add_best_item(grouped_issues["test"], max_minutes=20)
else:
# If no test issues, add another fix
add_best_item(grouped_issues["fix"], max_minutes=15)
# Phase 4: Closure fix or docs (10-15 min)
# Try to fill remaining time
remaining_budget = target_minutes - path.total_estimated_minutes
if remaining_budget >= 10:
# Prefer fix, then docs
if not add_best_item(grouped_issues["fix"], max_minutes=remaining_budget):
if not add_best_item(grouped_issues["docs"], max_minutes=remaining_budget):
add_best_item(grouped_issues["refactor"], max_minutes=remaining_budget)
return path
def generate_golden_path(
target_minutes: int = 45,
config: dict | None = None,
) -> GoldenPath:
"""Generate a Golden Path for the specified time budget.
Args:
target_minutes: Target session length (30-60 recommended)
config: Optional config override
Returns:
A GoldenPath with ordered items from real Gitea issues
"""
cfg = config or load_config()
token = get_token(cfg)
client = GiteaClient(cfg, token)
if not client.is_available():
# Return empty path with error indication
return GoldenPath(
generated_at=datetime.now(timezone.utc).isoformat(),
target_minutes=target_minutes,
items=[],
)
issues = fetch_eligible_issues(client, cfg)
grouped = group_issues_by_type(issues)
return build_golden_path(grouped, target_minutes)
# ── Output Formatting ─────────────────────────────────────────────────────
def print_golden_path(path: GoldenPath) -> None:
"""Print a formatted Golden Path to stdout."""
print("=" * 60)
print("🌟 GOLDEN PATH")
print("=" * 60)
print(f"Generated: {path.generated_at}")
print(f"Target: {path.target_minutes} minutes")
print(f"Estimated: {path.total_estimated_minutes} minutes")
print()
if not path.items:
print("No eligible issues found for a Golden Path.")
print()
print("To create Golden Paths, ensure issues have:")
print(" - Size labels: size:XS, size:S, or size:M")
print(" - Type labels: bug, test, triage, docs, refactor")
print()
return
for i, item in enumerate(path.items, 1):
type_emoji = {
"triage": "🧹",
"fix": "🔧",
"test": "🧪",
"docs": "📚",
"refactor": "♻️",
}.get(item.issue_type, "📋")
print(f"{i}. {type_emoji} #{item.number} [{item.size}] ({item.estimated_minutes}m)")
print(f" Title: {item.title}")
print(f" Type: {item.issue_type.upper()}")
if item.url:
print(f" URL: {item.url}")
print()
print("-" * 60)
print("Instructions:")
print(" 1. Start with the triage item to warm up")
print(" 2. Progress through fixes to build momentum")
print(" 3. Use the test item for quality focus")
print(" 4. Check off items as you complete them")
print()
# ── CLI ───────────────────────────────────────────────────────────────────
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Golden Path generator — coherent 30-60 minute mini-sessions",
)
p.add_argument(
"--minutes",
"-m",
type=int,
default=45,
help="Target session length in minutes (default: 45)",
)
p.add_argument(
"--json",
"-j",
action="store_true",
help="Output as JSON instead of formatted text",
)
return p.parse_args()
def main() -> int:
args = parse_args()
# Validate target minutes
target = max(30, min(60, args.minutes))
if target != args.minutes:
print(
f"[golden_path] Warning: Clamped {args.minutes}m to {target}m range",
file=sys.stderr,
)
path = generate_golden_path(target_minutes=target)
if args.json:
print(path.to_json())
else:
print_golden_path(path)
return 0 if path.items else 1
if __name__ == "__main__":
sys.exit(main())