forked from Rockachopa/Timmy-time-dashboard
Compare commits
6 Commits
kimi/issue
...
kimi/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d3eb5c31bf | ||
| 6dd48685e7 | |||
| a95cf806c8 | |||
| 19367d6e41 | |||
| 7e983fcdb3 | |||
| 46f89d59db |
178
config/quests.yaml
Normal file
178
config/quests.yaml
Normal file
@@ -0,0 +1,178 @@
|
||||
# ── Token Quest System Configuration ─────────────────────────────────────────
|
||||
#
|
||||
# Quests are special objectives that agents (and humans) can complete for
|
||||
# bonus tokens. Each quest has:
|
||||
# - id: Unique identifier
|
||||
# - name: Display name
|
||||
# - description: What the quest requires
|
||||
# - reward_tokens: Number of tokens awarded on completion
|
||||
# - criteria: Detection rules for completion
|
||||
# - enabled: Whether this quest is active
|
||||
# - repeatable: Whether this quest can be completed multiple times
|
||||
# - cooldown_hours: Minimum hours between completions (if repeatable)
|
||||
#
|
||||
# Quest Types:
|
||||
# - issue_count: Complete when N issues matching criteria are closed
|
||||
# - issue_reduce: Complete when open issue count drops by N
|
||||
# - docs_update: Complete when documentation files are updated
|
||||
# - test_improve: Complete when test coverage/cases improve
|
||||
# - daily_run: Complete Daily Run session objectives
|
||||
# - custom: Special quests with manual completion
|
||||
#
|
||||
# ── Active Quests ─────────────────────────────────────────────────────────────
|
||||
|
||||
quests:
|
||||
# ── Daily Run & Test Improvement Quests ───────────────────────────────────
|
||||
|
||||
close_flaky_tests:
|
||||
id: close_flaky_tests
|
||||
name: Flaky Test Hunter
|
||||
description: Close 3 issues labeled "flaky-test"
|
||||
reward_tokens: 150
|
||||
type: issue_count
|
||||
enabled: true
|
||||
repeatable: true
|
||||
cooldown_hours: 24
|
||||
criteria:
|
||||
issue_labels:
|
||||
- flaky-test
|
||||
target_count: 3
|
||||
issue_state: closed
|
||||
lookback_days: 7
|
||||
notification_message: "Quest Complete! You closed 3 flaky-test issues and earned {tokens} tokens."
|
||||
|
||||
reduce_p1_issues:
|
||||
id: reduce_p1_issues
|
||||
name: Priority Firefighter
|
||||
description: Reduce open P1 Daily Run issues by 2
|
||||
reward_tokens: 200
|
||||
type: issue_reduce
|
||||
enabled: true
|
||||
repeatable: true
|
||||
cooldown_hours: 48
|
||||
criteria:
|
||||
issue_labels:
|
||||
- layer:triage
|
||||
- P1
|
||||
target_reduction: 2
|
||||
lookback_days: 3
|
||||
notification_message: "Quest Complete! You reduced P1 issues by 2 and earned {tokens} tokens."
|
||||
|
||||
improve_test_coverage:
|
||||
id: improve_test_coverage
|
||||
name: Coverage Champion
|
||||
description: Improve test coverage by 5% or add 10 new test cases
|
||||
reward_tokens: 300
|
||||
type: test_improve
|
||||
enabled: true
|
||||
repeatable: false
|
||||
criteria:
|
||||
coverage_increase_percent: 5
|
||||
min_new_tests: 10
|
||||
notification_message: "Quest Complete! You improved test coverage and earned {tokens} tokens."
|
||||
|
||||
complete_daily_run_session:
|
||||
id: complete_daily_run_session
|
||||
name: Daily Runner
|
||||
description: Successfully complete 5 Daily Run sessions in a week
|
||||
reward_tokens: 250
|
||||
type: daily_run
|
||||
enabled: true
|
||||
repeatable: true
|
||||
cooldown_hours: 168 # 1 week
|
||||
criteria:
|
||||
min_sessions: 5
|
||||
lookback_days: 7
|
||||
notification_message: "Quest Complete! You completed 5 Daily Run sessions and earned {tokens} tokens."
|
||||
|
||||
# ── Documentation & Maintenance Quests ────────────────────────────────────
|
||||
|
||||
improve_automation_docs:
|
||||
id: improve_automation_docs
|
||||
name: Documentation Hero
|
||||
description: Improve documentation for automations (update 3+ doc files)
|
||||
reward_tokens: 100
|
||||
type: docs_update
|
||||
enabled: true
|
||||
repeatable: true
|
||||
cooldown_hours: 72
|
||||
criteria:
|
||||
file_patterns:
|
||||
- "docs/**/*.md"
|
||||
- "**/README.md"
|
||||
- "timmy_automations/**/*.md"
|
||||
min_files_changed: 3
|
||||
lookback_days: 7
|
||||
notification_message: "Quest Complete! You improved automation docs and earned {tokens} tokens."
|
||||
|
||||
close_micro_fixes:
|
||||
id: close_micro_fixes
|
||||
name: Micro Fix Master
|
||||
description: Close 5 issues labeled "layer:micro-fix"
|
||||
reward_tokens: 125
|
||||
type: issue_count
|
||||
enabled: true
|
||||
repeatable: true
|
||||
cooldown_hours: 24
|
||||
criteria:
|
||||
issue_labels:
|
||||
- layer:micro-fix
|
||||
target_count: 5
|
||||
issue_state: closed
|
||||
lookback_days: 7
|
||||
notification_message: "Quest Complete! You closed 5 micro-fix issues and earned {tokens} tokens."
|
||||
|
||||
# ── Special Achievements ──────────────────────────────────────────────────
|
||||
|
||||
first_contribution:
|
||||
id: first_contribution
|
||||
name: First Steps
|
||||
description: Make your first contribution (close any issue)
|
||||
reward_tokens: 50
|
||||
type: issue_count
|
||||
enabled: true
|
||||
repeatable: false
|
||||
criteria:
|
||||
target_count: 1
|
||||
issue_state: closed
|
||||
lookback_days: 30
|
||||
notification_message: "Welcome! You completed your first contribution and earned {tokens} tokens."
|
||||
|
||||
bug_squasher:
|
||||
id: bug_squasher
|
||||
name: Bug Squasher
|
||||
description: Close 10 issues labeled "bug"
|
||||
reward_tokens: 500
|
||||
type: issue_count
|
||||
enabled: true
|
||||
repeatable: true
|
||||
cooldown_hours: 168 # 1 week
|
||||
criteria:
|
||||
issue_labels:
|
||||
- bug
|
||||
target_count: 10
|
||||
issue_state: closed
|
||||
lookback_days: 7
|
||||
notification_message: "Quest Complete! You squashed 10 bugs and earned {tokens} tokens."
|
||||
|
||||
# ── Quest System Settings ───────────────────────────────────────────────────
|
||||
|
||||
settings:
|
||||
# Enable/disable quest notifications
|
||||
notifications_enabled: true
|
||||
|
||||
# Maximum number of concurrent active quests per agent
|
||||
max_concurrent_quests: 5
|
||||
|
||||
# Auto-detect quest completions on Daily Run metrics update
|
||||
auto_detect_on_daily_run: true
|
||||
|
||||
# Gitea issue labels that indicate quest-related work
|
||||
quest_work_labels:
|
||||
- layer:triage
|
||||
- layer:micro-fix
|
||||
- layer:tests
|
||||
- layer:economy
|
||||
- flaky-test
|
||||
- bug
|
||||
- documentation
|
||||
912
docs/research/openclaw-architecture-deployment-guide.md
Normal file
912
docs/research/openclaw-architecture-deployment-guide.md
Normal file
@@ -0,0 +1,912 @@
|
||||
# OpenClaw Architecture, Deployment Modes, and Ollama Integration
|
||||
|
||||
## Research Report for Timmy Time Dashboard Project
|
||||
|
||||
**Issue:** #721 — [Kimi Research] OpenClaw architecture, deployment modes, and Ollama integration
|
||||
**Date:** 2026-03-21
|
||||
**Author:** Kimi (Moonshot AI)
|
||||
**Status:** Complete
|
||||
|
||||
---
|
||||
|
||||
## Executive Summary
|
||||
|
||||
OpenClaw is an open-source AI agent framework that bridges messaging platforms (WhatsApp, Telegram, Slack, Discord, iMessage) to AI coding agents through a centralized gateway. Originally known as Clawdbot and Moltbot, it was rebranded to OpenClaw in early 2026. This report provides a comprehensive analysis of OpenClaw's architecture, deployment options, Ollama integration capabilities, and suitability for deployment on resource-constrained VPS environments like the Hermes DigitalOcean droplet (2GB RAM / 1 vCPU).
|
||||
|
||||
**Key Finding:** Running OpenClaw with local LLMs on a 2GB RAM VPS is **not recommended**. The absolute minimum for a text-only agent with external API models is 4GB RAM. For local model inference via Ollama, 8-16GB RAM is the practical minimum. A hybrid approach using OpenRouter as the primary provider with Ollama as fallback is the most viable configuration for small VPS deployments.
|
||||
|
||||
---
|
||||
|
||||
## 1. Architecture Overview
|
||||
|
||||
### 1.1 Core Components
|
||||
|
||||
OpenClaw follows a **hub-and-spoke (轴辐式)** architecture optimized for multi-agent task execution:
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────────────┐
|
||||
│ OPENCLAW ARCHITECTURE │
|
||||
├─────────────────────────────────────────────────────────────────────────┤
|
||||
│ │
|
||||
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
|
||||
│ │ WhatsApp │ │ Telegram │ │ Discord │ │
|
||||
│ │ Channel │ │ Channel │ │ Channel │ │
|
||||
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
|
||||
│ │ │ │ │
|
||||
│ └────────────────────┼────────────────────┘ │
|
||||
│ ▼ │
|
||||
│ ┌──────────────────┐ │
|
||||
│ │ Gateway │◄─────── WebSocket/API │
|
||||
│ │ (Port 18789) │ Control Plane │
|
||||
│ └────────┬─────────┘ │
|
||||
│ │ │
|
||||
│ ┌──────────────┼──────────────┐ │
|
||||
│ ▼ ▼ ▼ │
|
||||
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
|
||||
│ │ Agent A │ │ Agent B │ │ Pi Agent│ │
|
||||
│ │ (main) │ │ (coder) │ │(delegate)│ │
|
||||
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
|
||||
│ │ │ │ │
|
||||
│ └──────────────┼──────────────┘ │
|
||||
│ ▼ │
|
||||
│ ┌────────────────────────┐ │
|
||||
│ │ LLM Router │ │
|
||||
│ │ (Primary/Fallback) │ │
|
||||
│ └───────────┬────────────┘ │
|
||||
│ │ │
|
||||
│ ┌─────────────────┼─────────────────┐ │
|
||||
│ ▼ ▼ ▼ │
|
||||
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
|
||||
│ │ Ollama │ │ OpenAI │ │Anthropic│ │
|
||||
│ │(local) │ │(cloud) │ │(cloud) │ │
|
||||
│ └─────────┘ └─────────┘ └─────────┘ │
|
||||
│ │ ┌─────┐ │
|
||||
│ └────────────────────────────────────────────────────►│ MCP │ │
|
||||
│ │Tools│ │
|
||||
│ └─────┘ │
|
||||
│ │
|
||||
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
|
||||
│ │ Memory │ │ Skills │ │ Workspace │ │
|
||||
│ │ (SOUL.md) │ │ (SKILL.md) │ │ (sessions) │ │
|
||||
│ └──────────────┘ └──────────────┘ └──────────────┘ │
|
||||
│ │
|
||||
└─────────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### 1.2 Component Deep Dive
|
||||
|
||||
| Component | Purpose | Configuration File |
|
||||
|-----------|---------|-------------------|
|
||||
| **Gateway** | Central control plane, WebSocket/API server, session management | `gateway` section in `openclaw.json` |
|
||||
| **Pi Agent** | Core agent runner, "指挥中心" - schedules LLM calls, tool execution, error handling | `agents` section in `openclaw.json` |
|
||||
| **Channels** | Messaging platform integrations (Telegram, WhatsApp, Slack, Discord, iMessage) | `channels` section in `openclaw.json` |
|
||||
| **SOUL.md** | Agent persona definition - personality, communication style, behavioral guidelines | `~/.openclaw/workspace/SOUL.md` |
|
||||
| **AGENTS.md** | Multi-agent configuration, routing rules, agent specialization definitions | `~/.openclaw/workspace/AGENTS.md` |
|
||||
| **Workspace** | File system for agent state, session data, temporary files | `~/.openclaw/workspace/` |
|
||||
| **Skills** | Bundled tools, prompts, configurations that teach agents specific tasks | `~/.openclaw/workspace/skills/` |
|
||||
| **Sessions** | Conversation history, context persistence between interactions | `~/.openclaw/agents/<agent>/sessions/` |
|
||||
| **MCP Tools** | Model Context Protocol integration for external tool access | Via `mcporter` or native MCP |
|
||||
|
||||
### 1.3 Agent Runner Execution Flow
|
||||
|
||||
According to OpenClaw documentation, a complete agent run follows these stages:
|
||||
|
||||
1. **Queuing** - Session-level queue (serializes same-session requests) → Global queue (controls total concurrency)
|
||||
2. **Preparation** - Parse workspace, provider/model, thinking level parameters
|
||||
3. **Plugin Loading** - Load relevant skills based on task context
|
||||
4. **Memory Retrieval** - Fetch relevant context from SOUL.md and conversation history
|
||||
5. **LLM Inference** - Send prompt to configured provider with tool definitions
|
||||
6. **Tool Execution** - Execute any tool calls returned by the LLM
|
||||
7. **Response Generation** - Format and return final response to the channel
|
||||
8. **Memory Storage** - Persist conversation and results to session storage
|
||||
|
||||
---
|
||||
|
||||
## 2. Deployment Modes
|
||||
|
||||
### 2.1 Comparison Matrix
|
||||
|
||||
| Deployment Mode | Best For | Setup Complexity | Resource Overhead | Stability |
|
||||
|----------------|----------|------------------|-------------------|-----------|
|
||||
| **npm global** | Development, quick testing | Low | Minimal (~200MB) | Moderate |
|
||||
| **Docker** | Production, isolation, reproducibility | Medium | Higher (~2.5GB base image) | High |
|
||||
| **Docker Compose** | Multi-service stacks, complex setups | Medium-High | Higher | High |
|
||||
| **Bare metal/systemd** | Maximum performance, dedicated hardware | High | Minimal | Moderate |
|
||||
|
||||
### 2.2 NPM Global Installation (Recommended for Quick Start)
|
||||
|
||||
```bash
|
||||
# One-line installer
|
||||
curl -fsSL https://openclaw.ai/install.sh | bash
|
||||
|
||||
# Or manual npm install
|
||||
npm install -g openclaw
|
||||
|
||||
# Initialize configuration
|
||||
openclaw onboard
|
||||
|
||||
# Start gateway
|
||||
openclaw gateway
|
||||
```
|
||||
|
||||
**Pros:**
|
||||
- Fastest setup (~30 seconds)
|
||||
- Direct access to host resources
|
||||
- Easy updates via `npm update -g openclaw`
|
||||
|
||||
**Cons:**
|
||||
- Node.js 22+ dependency required
|
||||
- No process isolation
|
||||
- Manual dependency management
|
||||
|
||||
### 2.3 Docker Deployment (Recommended for Production)
|
||||
|
||||
```bash
|
||||
# Pull and run
|
||||
docker pull openclaw/openclaw:latest
|
||||
docker run -d \
|
||||
--name openclaw \
|
||||
-p 127.0.0.1:18789:18789 \
|
||||
-v ~/.openclaw:/root/.openclaw \
|
||||
-e ANTHROPIC_API_KEY=sk-ant-... \
|
||||
openclaw/openclaw:latest
|
||||
|
||||
# Or with Docker Compose
|
||||
docker compose -f compose.yml --env-file .env up -d --build
|
||||
```
|
||||
|
||||
**Docker Compose Configuration (production-ready):**
|
||||
|
||||
```yaml
|
||||
version: '3.8'
|
||||
services:
|
||||
openclaw:
|
||||
image: openclaw/openclaw:latest
|
||||
container_name: openclaw
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- "127.0.0.1:18789:18789" # Never expose to 0.0.0.0
|
||||
volumes:
|
||||
- ./openclaw-data:/root/.openclaw
|
||||
- ./workspace:/root/.openclaw/workspace
|
||||
environment:
|
||||
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
|
||||
- OPENROUTER_API_KEY=${OPENROUTER_API_KEY}
|
||||
- OLLAMA_API_KEY=ollama-local
|
||||
networks:
|
||||
- openclaw-net
|
||||
# Resource limits for small VPS
|
||||
deploy:
|
||||
resources:
|
||||
limits:
|
||||
cpus: '1.5'
|
||||
memory: 3G
|
||||
reservations:
|
||||
cpus: '0.5'
|
||||
memory: 1G
|
||||
|
||||
networks:
|
||||
openclaw-net:
|
||||
driver: bridge
|
||||
```
|
||||
|
||||
### 2.4 Bare Metal / Systemd Installation
|
||||
|
||||
For running as a system service on Linux:
|
||||
|
||||
```bash
|
||||
# Create systemd service
|
||||
sudo tee /etc/systemd/system/openclaw.service > /dev/null <<EOF
|
||||
[Unit]
|
||||
Description=OpenClaw Gateway
|
||||
After=network.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User=openclaw
|
||||
Group=openclaw
|
||||
WorkingDirectory=/home/openclaw
|
||||
Environment="PATH=/usr/local/bin:/usr/bin:/bin"
|
||||
Environment="NODE_ENV=production"
|
||||
Environment="ANTHROPIC_API_KEY=sk-ant-..."
|
||||
ExecStart=/usr/local/bin/openclaw gateway
|
||||
Restart=always
|
||||
RestartSec=10
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
EOF
|
||||
|
||||
sudo systemctl daemon-reload
|
||||
sudo systemctl enable openclaw
|
||||
sudo systemctl start openclaw
|
||||
```
|
||||
|
||||
### 2.5 Recommended Deployment for 2GB RAM VPS
|
||||
|
||||
**⚠️ Critical Finding:** OpenClaw's official minimum is 4GB RAM. On a 2GB VPS:
|
||||
|
||||
1. **Do NOT run local LLMs** - Use external API providers exclusively
|
||||
2. **Use npm installation** - Docker overhead is too heavy
|
||||
3. **Disable browser automation** - Chromium requires 2-4GB alone
|
||||
4. **Enable swap** - Critical for preventing OOM kills
|
||||
5. **Use OpenRouter** - Cheap/free tier models reduce costs
|
||||
|
||||
**Setup script for 2GB VPS:**
|
||||
|
||||
```bash
|
||||
#!/bin/bash
|
||||
# openclaw-minimal-vps.sh
|
||||
# Setup for 2GB RAM VPS - EXTERNAL API ONLY
|
||||
|
||||
# Create 4GB swap
|
||||
sudo fallocate -l 4G /swapfile
|
||||
sudo chmod 600 /swapfile
|
||||
sudo mkswap /swapfile
|
||||
sudo swapon /swapfile
|
||||
echo '/swapfile none swap sw 0 0' | sudo tee -a /etc/fstab
|
||||
|
||||
# Install Node.js 22
|
||||
curl -fsSL https://deb.nodesource.com/setup_22.x | sudo bash -
|
||||
sudo apt-get install -y nodejs
|
||||
|
||||
# Install OpenClaw
|
||||
npm install -g openclaw
|
||||
|
||||
# Configure for minimal resource usage
|
||||
mkdir -p ~/.openclaw
|
||||
cat > ~/.openclaw/openclaw.json <<'EOF'
|
||||
{
|
||||
"gateway": {
|
||||
"bind": "127.0.0.1",
|
||||
"port": 18789,
|
||||
"mode": "local"
|
||||
},
|
||||
"agents": {
|
||||
"defaults": {
|
||||
"model": {
|
||||
"primary": "openrouter/google/gemma-3-4b-it:free",
|
||||
"fallbacks": [
|
||||
"openrouter/meta/llama-3.1-8b-instruct:free"
|
||||
]
|
||||
},
|
||||
"maxIterations": 15,
|
||||
"timeout": 120
|
||||
}
|
||||
},
|
||||
"channels": {
|
||||
"telegram": {
|
||||
"enabled": true,
|
||||
"dmPolicy": "pairing"
|
||||
}
|
||||
}
|
||||
}
|
||||
EOF
|
||||
|
||||
# Set OpenRouter API key
|
||||
export OPENROUTER_API_KEY="sk-or-v1-..."
|
||||
|
||||
# Start gateway
|
||||
openclaw gateway &
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 3. Ollama Integration
|
||||
|
||||
### 3.1 Architecture
|
||||
|
||||
OpenClaw integrates with Ollama through its native `/api/chat` endpoint, supporting both streaming responses and tool calling simultaneously:
|
||||
|
||||
```
|
||||
┌──────────────┐ HTTP/JSON ┌──────────────┐ GGUF/CPU/GPU ┌──────────┐
|
||||
│ OpenClaw │◄───────────────────►│ Ollama │◄────────────────────►│ Local │
|
||||
│ Gateway │ /api/chat │ Server │ Model inference │ LLM │
|
||||
│ │ Port 11434 │ Port 11434 │ │ │
|
||||
└──────────────┘ └──────────────┘ └──────────┘
|
||||
```
|
||||
|
||||
### 3.2 Configuration
|
||||
|
||||
**Basic Ollama Setup:**
|
||||
|
||||
```bash
|
||||
# Install Ollama
|
||||
curl -fsSL https://ollama.com/install.sh | sh
|
||||
|
||||
# Start server
|
||||
ollama serve
|
||||
|
||||
# Pull a tool-capable model
|
||||
ollama pull qwen2.5-coder:7b
|
||||
ollama pull llama3.1:8b
|
||||
|
||||
# Configure OpenClaw
|
||||
export OLLAMA_API_KEY="ollama-local" # Any non-empty string works
|
||||
```
|
||||
|
||||
**OpenClaw Configuration for Ollama:**
|
||||
|
||||
```json
|
||||
{
|
||||
"models": {
|
||||
"providers": {
|
||||
"ollama": {
|
||||
"baseUrl": "http://localhost:11434",
|
||||
"apiKey": "ollama-local",
|
||||
"api": "ollama",
|
||||
"models": [
|
||||
{
|
||||
"id": "qwen2.5-coder:7b",
|
||||
"name": "Qwen 2.5 Coder 7B",
|
||||
"contextWindow": 32768,
|
||||
"maxTokens": 8192,
|
||||
"cost": { "input": 0, "output": 0 }
|
||||
},
|
||||
{
|
||||
"id": "llama3.1:8b",
|
||||
"name": "Llama 3.1 8B",
|
||||
"contextWindow": 128000,
|
||||
"maxTokens": 8192,
|
||||
"cost": { "input": 0, "output": 0 }
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"agents": {
|
||||
"defaults": {
|
||||
"model": {
|
||||
"primary": "ollama/qwen2.5-coder:7b",
|
||||
"fallbacks": ["ollama/llama3.1:8b"]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 3.3 Context Window Requirements
|
||||
|
||||
**⚠️ Critical Requirement:** OpenClaw requires a minimum **64K token context window** for reliable multi-step task execution.
|
||||
|
||||
| Model | Parameters | Context Window | Tool Support | OpenClaw Compatible |
|
||||
|-------|-----------|----------------|--------------|---------------------|
|
||||
| **llama3.1** | 8B | 128K | ✅ Yes | ✅ Yes |
|
||||
| **qwen2.5-coder** | 7B | 32K | ✅ Yes | ⚠️ Below minimum |
|
||||
| **qwen2.5-coder** | 32B | 128K | ✅ Yes | ✅ Yes |
|
||||
| **gpt-oss** | 20B | 128K | ✅ Yes | ✅ Yes |
|
||||
| **glm-4.7-flash** | - | 128K | ✅ Yes | ✅ Yes |
|
||||
| **deepseek-coder-v2** | 33B | 128K | ✅ Yes | ✅ Yes |
|
||||
| **mistral-small3.1** | - | 128K | ✅ Yes | ✅ Yes |
|
||||
|
||||
**Context Window Configuration:**
|
||||
|
||||
For models that don't report context window via Ollama's API:
|
||||
|
||||
```bash
|
||||
# Create custom Modelfile with extended context
|
||||
cat > ~/qwen-custom.modelfile <<EOF
|
||||
FROM qwen2.5-coder:7b
|
||||
PARAMETER num_ctx 65536
|
||||
PARAMETER temperature 0.7
|
||||
EOF
|
||||
|
||||
# Create custom model
|
||||
ollama create qwen2.5-coder-64k -f ~/qwen-custom.modelfile
|
||||
```
|
||||
|
||||
### 3.4 Models for Small VPS (≤8B Parameters)
|
||||
|
||||
For resource-constrained environments (2-4GB RAM):
|
||||
|
||||
| Model | Quantization | RAM Required | VRAM Required | Performance |
|
||||
|-------|-------------|--------------|---------------|-------------|
|
||||
| **Llama 3.1 8B** | Q4_K_M | ~5GB | ~6GB | Good |
|
||||
| **Llama 3.2 3B** | Q4_K_M | ~2.5GB | ~3GB | Basic |
|
||||
| **Qwen 2.5 7B** | Q4_K_M | ~5GB | ~6GB | Good |
|
||||
| **Qwen 2.5 3B** | Q4_K_M | ~2.5GB | ~3GB | Basic |
|
||||
| **DeepSeek 7B** | Q4_K_M | ~5GB | ~6GB | Good |
|
||||
| **Phi-4 4B** | Q4_K_M | ~3GB | ~4GB | Moderate |
|
||||
|
||||
**⚠️ Verdict for 2GB VPS:** Running local LLMs is **NOT viable**. Use external APIs only.
|
||||
|
||||
---
|
||||
|
||||
## 4. OpenRouter Integration (Fallback Strategy)
|
||||
|
||||
### 4.1 Overview
|
||||
|
||||
OpenRouter provides a unified API gateway to multiple LLM providers, enabling:
|
||||
- Single API key access to 200+ models
|
||||
- Automatic failover between providers
|
||||
- Free tier models for cost-conscious deployments
|
||||
- Unified billing and usage tracking
|
||||
|
||||
### 4.2 Configuration
|
||||
|
||||
**Environment Variable Setup:**
|
||||
|
||||
```bash
|
||||
export OPENROUTER_API_KEY="sk-or-v1-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
```
|
||||
|
||||
**OpenClaw Configuration:**
|
||||
|
||||
```json
|
||||
{
|
||||
"models": {
|
||||
"providers": {
|
||||
"openrouter": {
|
||||
"apiKey": "${OPENROUTER_API_KEY}",
|
||||
"baseUrl": "https://openrouter.ai/api/v1"
|
||||
}
|
||||
}
|
||||
},
|
||||
"agents": {
|
||||
"defaults": {
|
||||
"model": {
|
||||
"primary": "openrouter/anthropic/claude-sonnet-4-6",
|
||||
"fallbacks": [
|
||||
"openrouter/google/gemini-3.1-pro",
|
||||
"openrouter/meta/llama-3.3-70b-instruct",
|
||||
"openrouter/google/gemma-3-4b-it:free"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 4.3 Recommended Free/Cheap Models on OpenRouter
|
||||
|
||||
For cost-conscious VPS deployments:
|
||||
|
||||
| Model | Cost | Context | Best For |
|
||||
|-------|------|---------|----------|
|
||||
| **google/gemma-3-4b-it:free** | Free | 128K | General tasks, simple automation |
|
||||
| **meta/llama-3.1-8b-instruct:free** | Free | 128K | General tasks, longer contexts |
|
||||
| **deepseek/deepseek-chat-v3.2** | $0.53/M | 64K | Code generation, reasoning |
|
||||
| **xiaomi/mimo-v2-flash** | $0.40/M | 128K | Fast responses, basic tasks |
|
||||
| **qwen/qwen3-coder-next** | $1.20/M | 128K | Code-focused tasks |
|
||||
|
||||
### 4.4 Hybrid Configuration (Recommended for Timmy)
|
||||
|
||||
A production-ready configuration for the Hermes VPS:
|
||||
|
||||
```json
|
||||
{
|
||||
"models": {
|
||||
"providers": {
|
||||
"openrouter": {
|
||||
"apiKey": "${OPENROUTER_API_KEY}",
|
||||
"models": [
|
||||
{
|
||||
"id": "google/gemma-3-4b-it:free",
|
||||
"name": "Gemma 3 4B (Free)",
|
||||
"contextWindow": 131072,
|
||||
"maxTokens": 8192,
|
||||
"cost": { "input": 0, "output": 0 }
|
||||
},
|
||||
{
|
||||
"id": "deepseek/deepseek-chat-v3.2",
|
||||
"name": "DeepSeek V3.2",
|
||||
"contextWindow": 64000,
|
||||
"maxTokens": 8192,
|
||||
"cost": { "input": 0.00053, "output": 0.00053 }
|
||||
}
|
||||
]
|
||||
},
|
||||
"ollama": {
|
||||
"baseUrl": "http://localhost:11434",
|
||||
"apiKey": "ollama-local",
|
||||
"models": [
|
||||
{
|
||||
"id": "llama3.2:3b",
|
||||
"name": "Llama 3.2 3B (Local Fallback)",
|
||||
"contextWindow": 128000,
|
||||
"maxTokens": 4096,
|
||||
"cost": { "input": 0, "output": 0 }
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"agents": {
|
||||
"defaults": {
|
||||
"model": {
|
||||
"primary": "openrouter/google/gemma-3-4b-it:free",
|
||||
"fallbacks": [
|
||||
"openrouter/deepseek/deepseek-chat-v3.2",
|
||||
"ollama/llama3.2:3b"
|
||||
]
|
||||
},
|
||||
"maxIterations": 10,
|
||||
"timeout": 90
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 5. Hardware Constraints & VPS Viability
|
||||
|
||||
### 5.1 System Requirements Summary
|
||||
|
||||
| Component | Minimum | Recommended | Notes |
|
||||
|-----------|---------|-------------|-------|
|
||||
| **CPU** | 2 vCPU | 4 vCPU | Dedicated preferred over shared |
|
||||
| **RAM** | 4 GB | 8 GB | 2GB causes OOM with external APIs |
|
||||
| **Storage** | 40 GB SSD | 80 GB NVMe | Docker images are ~10-15GB |
|
||||
| **Network** | 100 Mbps | 1 Gbps | For API calls and model downloads |
|
||||
| **OS** | Ubuntu 22.04/Debian 12 | Ubuntu 24.04 LTS | Linux required for production |
|
||||
|
||||
### 5.2 2GB RAM VPS Analysis
|
||||
|
||||
**Can it work?** Yes, with severe limitations:
|
||||
|
||||
✅ **What works:**
|
||||
- Text-only agents with external API providers
|
||||
- Single Telegram/Discord channel
|
||||
- Basic file operations and shell commands
|
||||
- No browser automation
|
||||
|
||||
❌ **What doesn't work:**
|
||||
- Local LLM inference via Ollama
|
||||
- Browser automation (Chromium needs 2-4GB)
|
||||
- Multiple concurrent channels
|
||||
- Python environment-heavy skills
|
||||
|
||||
**Required mitigations for 2GB VPS:**
|
||||
|
||||
```bash
|
||||
# 1. Create substantial swap
|
||||
sudo fallocate -l 4G /swapfile
|
||||
sudo chmod 600 /swapfile
|
||||
sudo mkswap /swapfile
|
||||
sudo swapon /swapfile
|
||||
|
||||
# 2. Configure swappiness
|
||||
echo 'vm.swappiness=60' | sudo tee -a /etc/sysctl.conf
|
||||
sudo sysctl -p
|
||||
|
||||
# 3. Limit Node.js memory
|
||||
export NODE_OPTIONS="--max-old-space-size=1536"
|
||||
|
||||
# 4. Use external APIs only - NO OLLAMA
|
||||
# 5. Disable browser skills
|
||||
# 6. Set conservative concurrency limits
|
||||
```
|
||||
|
||||
### 5.3 4-bit Quantization Viability
|
||||
|
||||
**Qwen 2.5 7B Q4_K_M on 2GB VPS:**
|
||||
- Model size: ~4.5GB
|
||||
- RAM required at runtime: ~5-6GB
|
||||
- **Verdict:** Will cause immediate OOM on 2GB VPS
|
||||
- **Even with 4GB VPS:** Marginal, heavy swap usage, poor performance
|
||||
|
||||
**Viable models for 4GB VPS with Ollama:**
|
||||
- Llama 3.2 3B Q4_K_M (~2.5GB RAM)
|
||||
- Qwen 2.5 3B Q4_K_M (~2.5GB RAM)
|
||||
- Phi-4 4B Q4_K_M (~3GB RAM)
|
||||
|
||||
---
|
||||
|
||||
## 6. Security Configuration
|
||||
|
||||
### 6.1 Network Ports
|
||||
|
||||
| Port | Purpose | Exposure |
|
||||
|------|---------|----------|
|
||||
| **18789/tcp** | OpenClaw Gateway (WebSocket/HTTP) | **NEVER expose to internet** |
|
||||
| **11434/tcp** | Ollama API (if running locally) | Localhost only |
|
||||
| **22/tcp** | SSH | Restrict to known IPs |
|
||||
|
||||
**⚠️ CRITICAL:** Never expose port 18789 to the public internet. Use Tailscale or SSH tunnels for remote access.
|
||||
|
||||
### 6.2 Tailscale Integration
|
||||
|
||||
Tailscale provides zero-configuration VPN mesh for secure remote access:
|
||||
|
||||
```bash
|
||||
# Install Tailscale
|
||||
curl -fsSL https://tailscale.com/install.sh | sh
|
||||
sudo tailscale up
|
||||
|
||||
# Get Tailscale IP
|
||||
tailscale ip
|
||||
# Returns: 100.x.y.z
|
||||
|
||||
# Configure OpenClaw to bind to Tailscale
|
||||
cat > ~/.openclaw/openclaw.json <<EOF
|
||||
{
|
||||
"gateway": {
|
||||
"bind": "tailnet",
|
||||
"port": 18789
|
||||
},
|
||||
"tailscale": {
|
||||
"mode": "on",
|
||||
"resetOnExit": false
|
||||
}
|
||||
}
|
||||
EOF
|
||||
```
|
||||
|
||||
**Tailscale vs SSH Tunnel:**
|
||||
|
||||
| Feature | Tailscale | SSH Tunnel |
|
||||
|---------|-----------|------------|
|
||||
| Setup | Very easy | Moderate |
|
||||
| Persistence | Automatic | Requires autossh |
|
||||
| Multiple devices | Built-in | One tunnel per connection |
|
||||
| NAT traversal | Works | Requires exposed SSH |
|
||||
| Access control | Tailscale ACL | SSH keys |
|
||||
|
||||
### 6.3 Firewall Configuration (UFW)
|
||||
|
||||
```bash
|
||||
# Default deny
|
||||
sudo ufw default deny incoming
|
||||
sudo ufw default allow outgoing
|
||||
|
||||
# Allow SSH
|
||||
sudo ufw allow 22/tcp
|
||||
|
||||
# Allow Tailscale only (if using)
|
||||
sudo ufw allow in on tailscale0 to any port 18789
|
||||
|
||||
# Block public access to OpenClaw
|
||||
# (bind is 127.0.0.1, so this is defense in depth)
|
||||
|
||||
sudo ufw enable
|
||||
```
|
||||
|
||||
### 6.4 Authentication Configuration
|
||||
|
||||
```json
|
||||
{
|
||||
"gateway": {
|
||||
"bind": "127.0.0.1",
|
||||
"port": 18789,
|
||||
"auth": {
|
||||
"mode": "token",
|
||||
"token": "your-64-char-hex-token-here"
|
||||
},
|
||||
"controlUi": {
|
||||
"allowedOrigins": [
|
||||
"http://localhost:18789",
|
||||
"https://your-domain.tailnet-name.ts.net"
|
||||
],
|
||||
"allowInsecureAuth": false,
|
||||
"dangerouslyDisableDeviceAuth": false
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Generate secure token:**
|
||||
|
||||
```bash
|
||||
openssl rand -hex 32
|
||||
```
|
||||
|
||||
### 6.5 Sandboxing Considerations
|
||||
|
||||
OpenClaw executes arbitrary shell commands and file operations by default. For production:
|
||||
|
||||
1. **Run as non-root user:**
|
||||
```bash
|
||||
sudo useradd -r -s /bin/false openclaw
|
||||
sudo mkdir -p /home/openclaw/.openclaw
|
||||
sudo chown -R openclaw:openclaw /home/openclaw
|
||||
```
|
||||
|
||||
2. **Use Docker for isolation:**
|
||||
```bash
|
||||
docker run --security-opt=no-new-privileges \
|
||||
--cap-drop=ALL \
|
||||
--read-only \
|
||||
--tmpfs /tmp:noexec,nosuid,size=100m \
|
||||
openclaw/openclaw:latest
|
||||
```
|
||||
|
||||
3. **Enable dmPolicy for channels:**
|
||||
```json
|
||||
{
|
||||
"channels": {
|
||||
"telegram": {
|
||||
"dmPolicy": "pairing" // Require one-time code for new contacts
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 7. MCP (Model Context Protocol) Tools
|
||||
|
||||
### 7.1 Overview
|
||||
|
||||
MCP is an open standard created by Anthropic (donated to Linux Foundation in Dec 2025) that lets AI applications connect to external tools through a universal interface. Think of it as "USB-C for AI."
|
||||
|
||||
### 7.2 MCP vs OpenClaw Skills
|
||||
|
||||
| Aspect | MCP | OpenClaw Skills |
|
||||
|--------|-----|-----------------|
|
||||
| **Protocol** | Standardized (Anthropic) | OpenClaw-specific |
|
||||
| **Isolation** | Process-isolated | Runs in agent context |
|
||||
| **Security** | Higher (sandboxed) | Lower (full system access) |
|
||||
| **Discovery** | Automatic via protocol | Manual via SKILL.md |
|
||||
| **Ecosystem** | 10,000+ servers | 5400+ skills |
|
||||
|
||||
**Note:** OpenClaw currently has limited native MCP support. Use `mcporter` tool for MCP integration.
|
||||
|
||||
### 7.3 Using MCPorter (MCP Bridge)
|
||||
|
||||
```bash
|
||||
# Install mcporter
|
||||
clawhub install mcporter
|
||||
|
||||
# Configure MCP server
|
||||
mcporter config add github \
|
||||
--url "https://api.github.com/mcp" \
|
||||
--token "ghp_..."
|
||||
|
||||
# List available tools
|
||||
mcporter list
|
||||
|
||||
# Call MCP tool
|
||||
mcporter call github.list_repos --owner "rockachopa"
|
||||
```
|
||||
|
||||
### 7.4 Popular MCP Servers
|
||||
|
||||
| Server | Purpose | Integration |
|
||||
|--------|---------|-------------|
|
||||
| **GitHub** | Repo management, PRs, issues | `mcp-github` |
|
||||
| **Slack** | Messaging, channel management | `mcp-slack` |
|
||||
| **PostgreSQL** | Database queries | `mcp-postgres` |
|
||||
| **Filesystem** | File operations (sandboxed) | `mcp-filesystem` |
|
||||
| **Brave Search** | Web search | `mcp-brave` |
|
||||
|
||||
---
|
||||
|
||||
## 8. Recommendations for Timmy Time Dashboard
|
||||
|
||||
### 8.1 Deployment Strategy for Hermes VPS (2GB RAM)
|
||||
|
||||
Given the hardware constraints, here's the recommended approach:
|
||||
|
||||
**Option A: External API Only (Recommended)**
|
||||
```
|
||||
┌─────────────────────────────────────────┐
|
||||
│ Hermes VPS (2GB RAM) │
|
||||
│ ┌─────────────────────────────────┐ │
|
||||
│ │ OpenClaw Gateway │ │
|
||||
│ │ (npm global install) │ │
|
||||
│ └─────────────┬───────────────────┘ │
|
||||
│ │ │
|
||||
│ ▼ │
|
||||
│ ┌─────────────────────────────────┐ │
|
||||
│ │ OpenRouter API (Free Tier) │ │
|
||||
│ │ google/gemma-3-4b-it:free │ │
|
||||
│ └─────────────────────────────────┘ │
|
||||
│ │
|
||||
│ NO OLLAMA - insufficient RAM │
|
||||
└─────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
**Option B: Hybrid with External Ollama**
|
||||
```
|
||||
┌──────────────────────┐ ┌──────────────────────────┐
|
||||
│ Hermes VPS (2GB) │ │ Separate Ollama Host │
|
||||
│ ┌────────────────┐ │ │ ┌────────────────────┐ │
|
||||
│ │ OpenClaw │ │◄────►│ │ Ollama Server │ │
|
||||
│ │ (external API) │ │ │ │ (8GB+ RAM required)│ │
|
||||
│ └────────────────┘ │ │ └────────────────────┘ │
|
||||
└──────────────────────┘ └──────────────────────────┘
|
||||
```
|
||||
|
||||
### 8.2 Configuration Summary
|
||||
|
||||
```json
|
||||
{
|
||||
"gateway": {
|
||||
"bind": "127.0.0.1",
|
||||
"port": 18789,
|
||||
"auth": {
|
||||
"mode": "token",
|
||||
"token": "GENERATE_WITH_OPENSSL_RAND"
|
||||
}
|
||||
},
|
||||
"models": {
|
||||
"providers": {
|
||||
"openrouter": {
|
||||
"apiKey": "${OPENROUTER_API_KEY}",
|
||||
"models": [
|
||||
{
|
||||
"id": "google/gemma-3-4b-it:free",
|
||||
"contextWindow": 131072,
|
||||
"maxTokens": 4096
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"agents": {
|
||||
"defaults": {
|
||||
"model": {
|
||||
"primary": "openrouter/google/gemma-3-4b-it:free"
|
||||
},
|
||||
"maxIterations": 10,
|
||||
"timeout": 90,
|
||||
"maxConcurrent": 2
|
||||
}
|
||||
},
|
||||
"channels": {
|
||||
"telegram": {
|
||||
"enabled": true,
|
||||
"dmPolicy": "pairing"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 8.3 Migration Path (Future)
|
||||
|
||||
When upgrading to a larger VPS (4-8GB RAM):
|
||||
|
||||
1. **Phase 1:** Enable Ollama with Llama 3.2 3B as fallback
|
||||
2. **Phase 2:** Add browser automation skills (requires 4GB+ RAM)
|
||||
3. **Phase 3:** Enable multi-agent routing with specialized agents
|
||||
4. **Phase 4:** Add MCP server integration for external tools
|
||||
|
||||
---
|
||||
|
||||
## 9. References
|
||||
|
||||
1. OpenClaw Official Documentation: https://docs.openclaw.ai
|
||||
2. Ollama Integration Guide: https://docs.ollama.com/integrations/openclaw
|
||||
3. OpenRouter Documentation: https://openrouter.ai/docs
|
||||
4. MCP Specification: https://modelcontextprotocol.io
|
||||
5. OpenClaw Community Discord: https://discord.gg/openclaw
|
||||
6. GitHub Repository: https://github.com/openclaw/openclaw
|
||||
|
||||
---
|
||||
|
||||
## 10. Appendix: Quick Command Reference
|
||||
|
||||
```bash
|
||||
# Installation
|
||||
curl -fsSL https://openclaw.ai/install.sh | bash
|
||||
|
||||
# Configuration
|
||||
openclaw onboard # Interactive setup
|
||||
openclaw configure # Edit config
|
||||
openclaw config set <key> <value> # Set specific value
|
||||
|
||||
# Gateway management
|
||||
openclaw gateway # Start gateway
|
||||
openclaw gateway --verbose # Start with logs
|
||||
openclaw gateway status # Check status
|
||||
openclaw gateway restart # Restart gateway
|
||||
openclaw gateway stop # Stop gateway
|
||||
|
||||
# Model management
|
||||
openclaw models list # List available models
|
||||
openclaw models set <model> # Set default model
|
||||
openclaw models status # Check model status
|
||||
|
||||
# Diagnostics
|
||||
openclaw doctor # System health check
|
||||
openclaw doctor --repair # Auto-fix issues
|
||||
openclaw security audit # Security check
|
||||
|
||||
# Dashboard
|
||||
openclaw dashboard # Open web UI
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
*End of Research Report*
|
||||
@@ -330,6 +330,13 @@ class Settings(BaseSettings):
|
||||
autoresearch_max_iterations: int = 100
|
||||
autoresearch_metric: str = "val_bpb" # metric to optimise (lower = better)
|
||||
|
||||
# ── Weekly Narrative Summary ───────────────────────────────────────
|
||||
# Generates a human-readable weekly summary of development activity.
|
||||
# Disabling this will stop the weekly narrative generation.
|
||||
weekly_narrative_enabled: bool = True
|
||||
weekly_narrative_lookback_days: int = 7
|
||||
weekly_narrative_output_dir: str = ".loop"
|
||||
|
||||
# ── Local Hands (Shell + Git) ──────────────────────────────────────
|
||||
# Enable local shell/git execution hands.
|
||||
hands_shell_enabled: bool = True
|
||||
|
||||
@@ -32,6 +32,7 @@ from dashboard.routes.briefing import router as briefing_router
|
||||
from dashboard.routes.calm import router as calm_router
|
||||
from dashboard.routes.chat_api import router as chat_api_router
|
||||
from dashboard.routes.chat_api_v1 import router as chat_api_v1_router
|
||||
from dashboard.routes.daily_run import router as daily_run_router
|
||||
from dashboard.routes.db_explorer import router as db_explorer_router
|
||||
from dashboard.routes.discord import router as discord_router
|
||||
from dashboard.routes.experiments import router as experiments_router
|
||||
@@ -42,6 +43,7 @@ from dashboard.routes.memory import router as memory_router
|
||||
from dashboard.routes.mobile import router as mobile_router
|
||||
from dashboard.routes.models import api_router as models_api_router
|
||||
from dashboard.routes.models import router as models_router
|
||||
from dashboard.routes.quests import router as quests_router
|
||||
from dashboard.routes.spark import router as spark_router
|
||||
from dashboard.routes.system import router as system_router
|
||||
from dashboard.routes.tasks import router as tasks_router
|
||||
@@ -625,6 +627,8 @@ app.include_router(db_explorer_router)
|
||||
app.include_router(world_router)
|
||||
app.include_router(matrix_router)
|
||||
app.include_router(tower_router)
|
||||
app.include_router(daily_run_router)
|
||||
app.include_router(quests_router)
|
||||
|
||||
|
||||
@app.websocket("/ws")
|
||||
|
||||
435
src/dashboard/routes/daily_run.py
Normal file
435
src/dashboard/routes/daily_run.py
Normal file
@@ -0,0 +1,435 @@
|
||||
"""Daily Run metrics routes — dashboard card for triage and session metrics."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from pathlib import Path
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib.request import Request as UrlRequest
|
||||
from urllib.request import urlopen
|
||||
|
||||
from fastapi import APIRouter, Request
|
||||
from fastapi.responses import HTMLResponse, JSONResponse
|
||||
|
||||
from config import settings
|
||||
from dashboard.templating import templates
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(tags=["daily-run"])
|
||||
|
||||
REPO_ROOT = Path(settings.repo_root)
|
||||
CONFIG_PATH = REPO_ROOT / "timmy_automations" / "config" / "daily_run.json"
|
||||
|
||||
DEFAULT_CONFIG = {
|
||||
"gitea_api": "http://localhost:3000/api/v1",
|
||||
"repo_slug": "rockachopa/Timmy-time-dashboard",
|
||||
"token_file": "~/.hermes/gitea_token",
|
||||
"layer_labels_prefix": "layer:",
|
||||
}
|
||||
|
||||
LAYER_LABELS = ["layer:triage", "layer:micro-fix", "layer:tests", "layer:economy"]
|
||||
|
||||
|
||||
def _load_config() -> dict:
|
||||
"""Load configuration from config file with fallback to defaults."""
|
||||
config = DEFAULT_CONFIG.copy()
|
||||
if CONFIG_PATH.exists():
|
||||
try:
|
||||
file_config = json.loads(CONFIG_PATH.read_text())
|
||||
if "orchestrator" in file_config:
|
||||
config.update(file_config["orchestrator"])
|
||||
except (json.JSONDecodeError, OSError) as exc:
|
||||
logger.debug("Could not load daily_run config: %s", exc)
|
||||
|
||||
# Environment variable overrides
|
||||
if os.environ.get("TIMMY_GITEA_API"):
|
||||
config["gitea_api"] = os.environ.get("TIMMY_GITEA_API")
|
||||
if os.environ.get("TIMMY_REPO_SLUG"):
|
||||
config["repo_slug"] = os.environ.get("TIMMY_REPO_SLUG")
|
||||
if os.environ.get("TIMMY_GITEA_TOKEN"):
|
||||
config["token"] = os.environ.get("TIMMY_GITEA_TOKEN")
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def _get_token(config: dict) -> str | None:
|
||||
"""Get Gitea token from environment or file."""
|
||||
if "token" in config:
|
||||
return config["token"]
|
||||
|
||||
token_file = Path(config["token_file"]).expanduser()
|
||||
if token_file.exists():
|
||||
return token_file.read_text().strip()
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
"""Simple Gitea API client with graceful degradation."""
|
||||
|
||||
def __init__(self, config: dict, token: str | None):
|
||||
self.api_base = config["gitea_api"].rstrip("/")
|
||||
self.repo_slug = config["repo_slug"]
|
||||
self.token = token
|
||||
self._available: bool | None = None
|
||||
|
||||
def _headers(self) -> dict:
|
||||
headers = {"Accept": "application/json"}
|
||||
if self.token:
|
||||
headers["Authorization"] = f"token {self.token}"
|
||||
return headers
|
||||
|
||||
def _api_url(self, path: str) -> str:
|
||||
return f"{self.api_base}/repos/{self.repo_slug}/{path}"
|
||||
|
||||
def is_available(self) -> bool:
|
||||
"""Check if Gitea API is reachable."""
|
||||
if self._available is not None:
|
||||
return self._available
|
||||
|
||||
try:
|
||||
req = UrlRequest(
|
||||
f"{self.api_base}/version",
|
||||
headers=self._headers(),
|
||||
method="GET",
|
||||
)
|
||||
with urlopen(req, timeout=5) as resp:
|
||||
self._available = resp.status == 200
|
||||
return self._available
|
||||
except (HTTPError, URLError, TimeoutError):
|
||||
self._available = False
|
||||
return False
|
||||
|
||||
def get_paginated(self, path: str, params: dict | None = None) -> list:
|
||||
"""Fetch all pages of a paginated endpoint."""
|
||||
all_items = []
|
||||
page = 1
|
||||
limit = 50
|
||||
|
||||
while True:
|
||||
url = self._api_url(path)
|
||||
query_parts = [f"limit={limit}", f"page={page}"]
|
||||
if params:
|
||||
for key, val in params.items():
|
||||
query_parts.append(f"{key}={val}")
|
||||
url = f"{url}?{'&'.join(query_parts)}"
|
||||
|
||||
req = UrlRequest(url, headers=self._headers(), method="GET")
|
||||
with urlopen(req, timeout=15) as resp:
|
||||
batch = json.loads(resp.read())
|
||||
|
||||
if not batch:
|
||||
break
|
||||
|
||||
all_items.extend(batch)
|
||||
if len(batch) < limit:
|
||||
break
|
||||
page += 1
|
||||
|
||||
return all_items
|
||||
|
||||
|
||||
@dataclass
|
||||
class LayerMetrics:
|
||||
"""Metrics for a single layer."""
|
||||
|
||||
name: str
|
||||
label: str
|
||||
current_count: int
|
||||
previous_count: int
|
||||
|
||||
@property
|
||||
def trend(self) -> str:
|
||||
"""Return trend indicator."""
|
||||
if self.previous_count == 0:
|
||||
return "→" if self.current_count == 0 else "↑"
|
||||
diff = self.current_count - self.previous_count
|
||||
pct = (diff / self.previous_count) * 100
|
||||
if pct > 20:
|
||||
return "↑↑"
|
||||
elif pct > 5:
|
||||
return "↑"
|
||||
elif pct < -20:
|
||||
return "↓↓"
|
||||
elif pct < -5:
|
||||
return "↓"
|
||||
return "→"
|
||||
|
||||
@property
|
||||
def trend_color(self) -> str:
|
||||
"""Return color for trend (CSS variable name)."""
|
||||
trend = self.trend
|
||||
if trend in ("↑↑", "↑"):
|
||||
return "var(--green)" # More work = positive
|
||||
elif trend in ("↓↓", "↓"):
|
||||
return "var(--amber)" # Less work = caution
|
||||
return "var(--text-dim)"
|
||||
|
||||
|
||||
@dataclass
|
||||
class DailyRunMetrics:
|
||||
"""Complete Daily Run metrics."""
|
||||
|
||||
sessions_completed: int
|
||||
sessions_previous: int
|
||||
layers: list[LayerMetrics]
|
||||
total_touched_current: int
|
||||
total_touched_previous: int
|
||||
lookback_days: int
|
||||
generated_at: str
|
||||
|
||||
@property
|
||||
def sessions_trend(self) -> str:
|
||||
"""Return sessions trend indicator."""
|
||||
if self.sessions_previous == 0:
|
||||
return "→" if self.sessions_completed == 0 else "↑"
|
||||
diff = self.sessions_completed - self.sessions_previous
|
||||
pct = (diff / self.sessions_previous) * 100
|
||||
if pct > 20:
|
||||
return "↑↑"
|
||||
elif pct > 5:
|
||||
return "↑"
|
||||
elif pct < -20:
|
||||
return "↓↓"
|
||||
elif pct < -5:
|
||||
return "↓"
|
||||
return "→"
|
||||
|
||||
@property
|
||||
def sessions_trend_color(self) -> str:
|
||||
"""Return color for sessions trend."""
|
||||
trend = self.sessions_trend
|
||||
if trend in ("↑↑", "↑"):
|
||||
return "var(--green)"
|
||||
elif trend in ("↓↓", "↓"):
|
||||
return "var(--amber)"
|
||||
return "var(--text-dim)"
|
||||
|
||||
|
||||
def _extract_layer(labels: list[dict]) -> str | None:
|
||||
"""Extract layer label from issue labels."""
|
||||
for label in labels:
|
||||
name = label.get("name", "")
|
||||
if name.startswith("layer:"):
|
||||
return name.replace("layer:", "")
|
||||
return None
|
||||
|
||||
|
||||
def _load_cycle_data(days: int = 14) -> dict:
|
||||
"""Load cycle retrospective data for session counting."""
|
||||
retro_file = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
|
||||
if not retro_file.exists():
|
||||
return {"current": 0, "previous": 0}
|
||||
|
||||
try:
|
||||
entries = []
|
||||
for line in retro_file.read_text().strip().splitlines():
|
||||
try:
|
||||
entries.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
now = datetime.now(UTC)
|
||||
current_cutoff = now - timedelta(days=days)
|
||||
previous_cutoff = now - timedelta(days=days * 2)
|
||||
|
||||
current_count = 0
|
||||
previous_count = 0
|
||||
|
||||
for entry in entries:
|
||||
ts_str = entry.get("timestamp", "")
|
||||
if not ts_str:
|
||||
continue
|
||||
try:
|
||||
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
||||
if ts >= current_cutoff:
|
||||
if entry.get("success", False):
|
||||
current_count += 1
|
||||
elif ts >= previous_cutoff:
|
||||
if entry.get("success", False):
|
||||
previous_count += 1
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
|
||||
return {"current": current_count, "previous": previous_count}
|
||||
except (OSError, ValueError) as exc:
|
||||
logger.debug("Failed to load cycle data: %s", exc)
|
||||
return {"current": 0, "previous": 0}
|
||||
|
||||
|
||||
def _fetch_layer_metrics(
|
||||
client: GiteaClient, lookback_days: int = 7
|
||||
) -> tuple[list[LayerMetrics], int, int]:
|
||||
"""Fetch metrics for each layer from Gitea issues."""
|
||||
now = datetime.now(UTC)
|
||||
current_cutoff = now - timedelta(days=lookback_days)
|
||||
previous_cutoff = now - timedelta(days=lookback_days * 2)
|
||||
|
||||
layers = []
|
||||
total_current = 0
|
||||
total_previous = 0
|
||||
|
||||
for layer_label in LAYER_LABELS:
|
||||
layer_name = layer_label.replace("layer:", "")
|
||||
try:
|
||||
# Fetch all issues with this layer label (both open and closed)
|
||||
issues = client.get_paginated(
|
||||
"issues",
|
||||
{"state": "all", "labels": layer_label, "limit": 100},
|
||||
)
|
||||
|
||||
current_count = 0
|
||||
previous_count = 0
|
||||
|
||||
for issue in issues:
|
||||
updated_at = issue.get("updated_at", "")
|
||||
if not updated_at:
|
||||
continue
|
||||
try:
|
||||
updated = datetime.fromisoformat(updated_at.replace("Z", "+00:00"))
|
||||
if updated >= current_cutoff:
|
||||
current_count += 1
|
||||
elif updated >= previous_cutoff:
|
||||
previous_count += 1
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
|
||||
layers.append(
|
||||
LayerMetrics(
|
||||
name=layer_name,
|
||||
label=layer_label,
|
||||
current_count=current_count,
|
||||
previous_count=previous_count,
|
||||
)
|
||||
)
|
||||
total_current += current_count
|
||||
total_previous += previous_count
|
||||
|
||||
except (HTTPError, URLError) as exc:
|
||||
logger.debug("Failed to fetch issues for %s: %s", layer_label, exc)
|
||||
layers.append(
|
||||
LayerMetrics(
|
||||
name=layer_name,
|
||||
label=layer_label,
|
||||
current_count=0,
|
||||
previous_count=0,
|
||||
)
|
||||
)
|
||||
|
||||
return layers, total_current, total_previous
|
||||
|
||||
|
||||
def _get_metrics(lookback_days: int = 7) -> DailyRunMetrics | None:
|
||||
"""Get Daily Run metrics from Gitea API."""
|
||||
config = _load_config()
|
||||
token = _get_token(config)
|
||||
client = GiteaClient(config, token)
|
||||
|
||||
if not client.is_available():
|
||||
logger.debug("Gitea API not available for Daily Run metrics")
|
||||
return None
|
||||
|
||||
try:
|
||||
# Get layer metrics from issues
|
||||
layers, total_current, total_previous = _fetch_layer_metrics(client, lookback_days)
|
||||
|
||||
# Get session data from cycle retrospectives
|
||||
cycle_data = _load_cycle_data(days=lookback_days)
|
||||
|
||||
return DailyRunMetrics(
|
||||
sessions_completed=cycle_data["current"],
|
||||
sessions_previous=cycle_data["previous"],
|
||||
layers=layers,
|
||||
total_touched_current=total_current,
|
||||
total_touched_previous=total_previous,
|
||||
lookback_days=lookback_days,
|
||||
generated_at=datetime.now(UTC).isoformat(),
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("Error fetching Daily Run metrics: %s", exc)
|
||||
return None
|
||||
|
||||
|
||||
@router.get("/daily-run/metrics", response_class=JSONResponse)
|
||||
async def daily_run_metrics_api(lookback_days: int = 7):
|
||||
"""Return Daily Run metrics as JSON API."""
|
||||
metrics = _get_metrics(lookback_days)
|
||||
if not metrics:
|
||||
return JSONResponse(
|
||||
{"error": "Gitea API unavailable", "status": "unavailable"},
|
||||
status_code=503,
|
||||
)
|
||||
|
||||
# Check for quest completions based on Daily Run metrics
|
||||
quest_rewards = []
|
||||
try:
|
||||
from dashboard.routes.quests import check_daily_run_quests
|
||||
|
||||
quest_rewards = await check_daily_run_quests(agent_id="system")
|
||||
except Exception as exc:
|
||||
logger.debug("Quest checking failed: %s", exc)
|
||||
|
||||
return JSONResponse(
|
||||
{
|
||||
"status": "ok",
|
||||
"lookback_days": metrics.lookback_days,
|
||||
"sessions": {
|
||||
"completed": metrics.sessions_completed,
|
||||
"previous": metrics.sessions_previous,
|
||||
"trend": metrics.sessions_trend,
|
||||
},
|
||||
"layers": [
|
||||
{
|
||||
"name": layer.name,
|
||||
"label": layer.label,
|
||||
"current": layer.current_count,
|
||||
"previous": layer.previous_count,
|
||||
"trend": layer.trend,
|
||||
}
|
||||
for layer in metrics.layers
|
||||
],
|
||||
"totals": {
|
||||
"current": metrics.total_touched_current,
|
||||
"previous": metrics.total_touched_previous,
|
||||
},
|
||||
"generated_at": metrics.generated_at,
|
||||
"quest_rewards": quest_rewards,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@router.get("/daily-run/panel", response_class=HTMLResponse)
|
||||
async def daily_run_panel(request: Request, lookback_days: int = 7):
|
||||
"""Return Daily Run metrics panel HTML for HTMX polling."""
|
||||
metrics = _get_metrics(lookback_days)
|
||||
|
||||
# Build Gitea URLs for filtered issue lists
|
||||
config = _load_config()
|
||||
repo_slug = config.get("repo_slug", "rockachopa/Timmy-time-dashboard")
|
||||
gitea_base = config.get("gitea_api", "http://localhost:3000/api/v1").replace("/api/v1", "")
|
||||
|
||||
# Logbook URL (link to issues with any layer label)
|
||||
layer_labels = ",".join(LAYER_LABELS)
|
||||
logbook_url = f"{gitea_base}/{repo_slug}/issues?labels={layer_labels}&state=all"
|
||||
|
||||
# Layer-specific URLs
|
||||
layer_urls = {
|
||||
layer: f"{gitea_base}/{repo_slug}/issues?labels=layer:{layer}&state=all"
|
||||
for layer in ["triage", "micro-fix", "tests", "economy"]
|
||||
}
|
||||
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
"partials/daily_run_panel.html",
|
||||
{
|
||||
"metrics": metrics,
|
||||
"logbook_url": logbook_url,
|
||||
"layer_urls": layer_urls,
|
||||
"gitea_available": metrics is not None,
|
||||
},
|
||||
)
|
||||
377
src/dashboard/routes/quests.py
Normal file
377
src/dashboard/routes/quests.py
Normal file
@@ -0,0 +1,377 @@
|
||||
"""Quest system routes for agent token rewards.
|
||||
|
||||
Provides API endpoints for:
|
||||
- Listing quests and their status
|
||||
- Claiming quest rewards
|
||||
- Getting quest leaderboard
|
||||
- Quest progress tracking
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from fastapi import APIRouter, Request
|
||||
from fastapi.responses import HTMLResponse, JSONResponse
|
||||
from pydantic import BaseModel
|
||||
|
||||
from dashboard.templating import templates
|
||||
from timmy.quest_system import (
|
||||
QuestStatus,
|
||||
auto_evaluate_all_quests,
|
||||
claim_quest_reward,
|
||||
evaluate_quest_progress,
|
||||
get_active_quests,
|
||||
get_agent_quests_status,
|
||||
get_quest_definition,
|
||||
get_quest_leaderboard,
|
||||
load_quest_config,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/quests", tags=["quests"])
|
||||
|
||||
|
||||
class ClaimQuestRequest(BaseModel):
|
||||
"""Request to claim a quest reward."""
|
||||
|
||||
agent_id: str
|
||||
quest_id: str
|
||||
|
||||
|
||||
class EvaluateQuestRequest(BaseModel):
|
||||
"""Request to manually evaluate quest progress."""
|
||||
|
||||
agent_id: str
|
||||
quest_id: str
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# API Endpoints
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@router.get("/api/definitions")
|
||||
async def get_quest_definitions_api() -> JSONResponse:
|
||||
"""Get all quest definitions.
|
||||
|
||||
Returns:
|
||||
JSON list of all quest definitions with their criteria.
|
||||
"""
|
||||
definitions = get_active_quests()
|
||||
return JSONResponse(
|
||||
{
|
||||
"quests": [
|
||||
{
|
||||
"id": q.id,
|
||||
"name": q.name,
|
||||
"description": q.description,
|
||||
"reward_tokens": q.reward_tokens,
|
||||
"type": q.quest_type.value,
|
||||
"repeatable": q.repeatable,
|
||||
"cooldown_hours": q.cooldown_hours,
|
||||
"criteria": q.criteria,
|
||||
}
|
||||
for q in definitions
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@router.get("/api/status/{agent_id}")
|
||||
async def get_agent_quest_status(agent_id: str) -> JSONResponse:
|
||||
"""Get quest status for a specific agent.
|
||||
|
||||
Returns:
|
||||
Complete quest status including progress, completion counts,
|
||||
and tokens earned.
|
||||
"""
|
||||
status = get_agent_quests_status(agent_id)
|
||||
return JSONResponse(status)
|
||||
|
||||
|
||||
@router.post("/api/claim")
|
||||
async def claim_quest_reward_api(request: ClaimQuestRequest) -> JSONResponse:
|
||||
"""Claim a quest reward for an agent.
|
||||
|
||||
The quest must be completed but not yet claimed.
|
||||
"""
|
||||
reward = claim_quest_reward(request.quest_id, request.agent_id)
|
||||
|
||||
if not reward:
|
||||
return JSONResponse(
|
||||
{
|
||||
"success": False,
|
||||
"error": "Quest not completed, already claimed, or on cooldown",
|
||||
},
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
return JSONResponse(
|
||||
{
|
||||
"success": True,
|
||||
"reward": reward,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@router.post("/api/evaluate")
|
||||
async def evaluate_quest_api(request: EvaluateQuestRequest) -> JSONResponse:
|
||||
"""Manually evaluate quest progress with provided context.
|
||||
|
||||
This is useful for testing or when the quest completion
|
||||
needs to be triggered manually.
|
||||
"""
|
||||
quest = get_quest_definition(request.quest_id)
|
||||
if not quest:
|
||||
return JSONResponse(
|
||||
{"success": False, "error": "Quest not found"},
|
||||
status_code=404,
|
||||
)
|
||||
|
||||
# Build evaluation context based on quest type
|
||||
context = await _build_evaluation_context(quest)
|
||||
|
||||
progress = evaluate_quest_progress(request.quest_id, request.agent_id, context)
|
||||
|
||||
if not progress:
|
||||
return JSONResponse(
|
||||
{"success": False, "error": "Failed to evaluate quest"},
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
# Auto-claim if completed
|
||||
reward = None
|
||||
if progress.status == QuestStatus.COMPLETED:
|
||||
reward = claim_quest_reward(request.quest_id, request.agent_id)
|
||||
|
||||
return JSONResponse(
|
||||
{
|
||||
"success": True,
|
||||
"progress": progress.to_dict(),
|
||||
"reward": reward,
|
||||
"completed": progress.status == QuestStatus.COMPLETED,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@router.get("/api/leaderboard")
|
||||
async def get_leaderboard_api() -> JSONResponse:
|
||||
"""Get the quest completion leaderboard.
|
||||
|
||||
Returns agents sorted by total tokens earned.
|
||||
"""
|
||||
leaderboard = get_quest_leaderboard()
|
||||
return JSONResponse(
|
||||
{
|
||||
"leaderboard": leaderboard,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@router.post("/api/reload")
|
||||
async def reload_quest_config_api() -> JSONResponse:
|
||||
"""Reload quest configuration from quests.yaml.
|
||||
|
||||
Useful for applying quest changes without restarting.
|
||||
"""
|
||||
definitions, quest_settings = load_quest_config()
|
||||
return JSONResponse(
|
||||
{
|
||||
"success": True,
|
||||
"quests_loaded": len(definitions),
|
||||
"settings": quest_settings,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dashboard UI Endpoints
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@router.get("", response_class=HTMLResponse)
|
||||
async def quests_dashboard(request: Request) -> HTMLResponse:
|
||||
"""Main quests dashboard page."""
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
"quests.html",
|
||||
{"agent_id": "current_user"},
|
||||
)
|
||||
|
||||
|
||||
@router.get("/panel/{agent_id}", response_class=HTMLResponse)
|
||||
async def quests_panel(request: Request, agent_id: str) -> HTMLResponse:
|
||||
"""Quest panel for HTMX partial updates."""
|
||||
status = get_agent_quests_status(agent_id)
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
"partials/quests_panel.html",
|
||||
{
|
||||
"agent_id": agent_id,
|
||||
"quests": status["quests"],
|
||||
"total_tokens": status["total_tokens_earned"],
|
||||
"completed_count": status["total_quests_completed"],
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal Functions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def _build_evaluation_context(quest) -> dict[str, Any]:
|
||||
"""Build evaluation context for a quest based on its type."""
|
||||
context: dict[str, Any] = {}
|
||||
|
||||
if quest.quest_type.value == "issue_count":
|
||||
# Fetch closed issues with relevant labels
|
||||
context["closed_issues"] = await _fetch_closed_issues(
|
||||
quest.criteria.get("issue_labels", [])
|
||||
)
|
||||
|
||||
elif quest.quest_type.value == "issue_reduce":
|
||||
# Fetch current and previous issue counts
|
||||
labels = quest.criteria.get("issue_labels", [])
|
||||
context["current_issue_count"] = await _fetch_open_issue_count(labels)
|
||||
context["previous_issue_count"] = await _fetch_previous_issue_count(
|
||||
labels, quest.criteria.get("lookback_days", 7)
|
||||
)
|
||||
|
||||
elif quest.quest_type.value == "daily_run":
|
||||
# Fetch Daily Run metrics
|
||||
metrics = await _fetch_daily_run_metrics()
|
||||
context["sessions_completed"] = metrics.get("sessions_completed", 0)
|
||||
|
||||
return context
|
||||
|
||||
|
||||
async def _fetch_closed_issues(labels: list[str]) -> list[dict]:
|
||||
"""Fetch closed issues matching the given labels."""
|
||||
try:
|
||||
from dashboard.routes.daily_run import GiteaClient, _load_config
|
||||
|
||||
config = _load_config()
|
||||
token = _get_gitea_token(config)
|
||||
client = GiteaClient(config, token)
|
||||
|
||||
if not client.is_available():
|
||||
return []
|
||||
|
||||
# Build label filter
|
||||
label_filter = ",".join(labels) if labels else ""
|
||||
|
||||
issues = client.get_paginated(
|
||||
"issues",
|
||||
{"state": "closed", "labels": label_filter, "limit": 100},
|
||||
)
|
||||
|
||||
return issues
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to fetch closed issues: %s", exc)
|
||||
return []
|
||||
|
||||
|
||||
async def _fetch_open_issue_count(labels: list[str]) -> int:
|
||||
"""Fetch count of open issues with given labels."""
|
||||
try:
|
||||
from dashboard.routes.daily_run import GiteaClient, _load_config
|
||||
|
||||
config = _load_config()
|
||||
token = _get_gitea_token(config)
|
||||
client = GiteaClient(config, token)
|
||||
|
||||
if not client.is_available():
|
||||
return 0
|
||||
|
||||
label_filter = ",".join(labels) if labels else ""
|
||||
|
||||
issues = client.get_paginated(
|
||||
"issues",
|
||||
{"state": "open", "labels": label_filter, "limit": 100},
|
||||
)
|
||||
|
||||
return len(issues)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to fetch open issue count: %s", exc)
|
||||
return 0
|
||||
|
||||
|
||||
async def _fetch_previous_issue_count(labels: list[str], lookback_days: int) -> int:
|
||||
"""Fetch previous issue count (simplified - uses current for now)."""
|
||||
# This is a simplified implementation
|
||||
# In production, you'd query historical data
|
||||
return await _fetch_open_issue_count(labels)
|
||||
|
||||
|
||||
async def _fetch_daily_run_metrics() -> dict[str, Any]:
|
||||
"""Fetch Daily Run metrics."""
|
||||
try:
|
||||
from dashboard.routes.daily_run import _get_metrics
|
||||
|
||||
metrics = _get_metrics(lookback_days=7)
|
||||
if metrics:
|
||||
return {
|
||||
"sessions_completed": metrics.sessions_completed,
|
||||
"sessions_previous": metrics.sessions_previous,
|
||||
}
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to fetch Daily Run metrics: %s", exc)
|
||||
|
||||
return {"sessions_completed": 0, "sessions_previous": 0}
|
||||
|
||||
|
||||
def _get_gitea_token(config: dict) -> str | None:
|
||||
"""Get Gitea token from config."""
|
||||
if "token" in config:
|
||||
return config["token"]
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
token_file = Path(config.get("token_file", "~/.hermes/gitea_token")).expanduser()
|
||||
if token_file.exists():
|
||||
return token_file.read_text().strip()
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Daily Run Integration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def check_daily_run_quests(agent_id: str = "system") -> list[dict]:
|
||||
"""Check and award Daily Run related quests.
|
||||
|
||||
Called by the Daily Run system when metrics are updated.
|
||||
|
||||
Returns:
|
||||
List of rewards awarded
|
||||
"""
|
||||
# Check if auto-detect is enabled
|
||||
_, quest_settings = load_quest_config()
|
||||
if not quest_settings.get("auto_detect_on_daily_run", True):
|
||||
return []
|
||||
|
||||
# Build context from Daily Run metrics
|
||||
metrics = await _fetch_daily_run_metrics()
|
||||
context = {
|
||||
"sessions_completed": metrics.get("sessions_completed", 0),
|
||||
"sessions_previous": metrics.get("sessions_previous", 0),
|
||||
}
|
||||
|
||||
# Add closed issues for issue_count quests
|
||||
active_quests = get_active_quests()
|
||||
for quest in active_quests:
|
||||
if quest.quest_type.value == "issue_count":
|
||||
labels = quest.criteria.get("issue_labels", [])
|
||||
context["closed_issues"] = await _fetch_closed_issues(labels)
|
||||
break # Only need to fetch once
|
||||
|
||||
# Evaluate all quests
|
||||
rewards = auto_evaluate_all_quests(agent_id, context)
|
||||
|
||||
return rewards
|
||||
@@ -21,6 +21,11 @@
|
||||
</div>
|
||||
{% endcall %}
|
||||
|
||||
<!-- Daily Run Metrics (HTMX polled) -->
|
||||
{% call panel("DAILY RUN", hx_get="/daily-run/panel", hx_trigger="every 60s") %}
|
||||
<div class="mc-loading-placeholder">LOADING...</div>
|
||||
{% endcall %}
|
||||
|
||||
</div>
|
||||
|
||||
<!-- Main panel — swappable via HTMX; defaults to Timmy on load -->
|
||||
|
||||
54
src/dashboard/templates/partials/daily_run_panel.html
Normal file
54
src/dashboard/templates/partials/daily_run_panel.html
Normal file
@@ -0,0 +1,54 @@
|
||||
<div class="card-header mc-panel-header">// DAILY RUN METRICS</div>
|
||||
<div class="card-body p-3">
|
||||
{% if not gitea_available %}
|
||||
<div class="mc-muted" style="font-size: 0.85rem; padding: 8px 0;">
|
||||
<span style="color: var(--amber);">⚠</span> Gitea API unavailable
|
||||
</div>
|
||||
{% else %}
|
||||
{% set m = metrics %}
|
||||
|
||||
<!-- Sessions summary -->
|
||||
<div class="dr-section" style="margin-bottom: 16px;">
|
||||
<div class="dr-row" style="display: flex; justify-content: space-between; align-items: center; margin-bottom: 8px;">
|
||||
<span class="dr-label" style="font-size: 0.85rem; color: var(--text-dim);">Sessions ({{ m.lookback_days }}d)</span>
|
||||
<a href="{{ logbook_url }}" target="_blank" class="dr-link" style="font-size: 0.75rem; color: var(--green); text-decoration: none;">
|
||||
Logbook →
|
||||
</a>
|
||||
</div>
|
||||
<div class="dr-stat" style="display: flex; align-items: baseline; gap: 8px;">
|
||||
<span class="dr-value" style="font-size: 1.5rem; font-weight: 600; color: var(--text-bright);">{{ m.sessions_completed }}</span>
|
||||
<span class="dr-trend" style="font-size: 0.9rem; color: {{ m.sessions_trend_color }};">{{ m.sessions_trend }}</span>
|
||||
<span class="dr-prev" style="font-size: 0.75rem; color: var(--text-dim);">vs {{ m.sessions_previous }} prev</span>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Layer breakdown -->
|
||||
<div class="dr-section">
|
||||
<div class="dr-label" style="font-size: 0.85rem; color: var(--text-dim); margin-bottom: 8px;">Issues by Layer</div>
|
||||
<div class="dr-layers" style="display: flex; flex-direction: column; gap: 6px;">
|
||||
{% for layer in m.layers %}
|
||||
<div class="dr-layer-row" style="display: flex; justify-content: space-between; align-items: center;">
|
||||
<a href="{{ layer_urls[layer.name] }}" target="_blank" class="dr-layer-name" style="font-size: 0.8rem; color: var(--text); text-decoration: none; text-transform: capitalize;">
|
||||
{{ layer.name.replace('-', ' ') }}
|
||||
</a>
|
||||
<div class="dr-layer-stat" style="display: flex; align-items: center; gap: 6px;">
|
||||
<span class="dr-layer-value" style="font-size: 0.9rem; font-weight: 500; color: var(--text-bright);">{{ layer.current_count }}</span>
|
||||
<span class="dr-layer-trend" style="font-size: 0.75rem; color: {{ layer.trend_color }}; width: 18px; text-align: center;">{{ layer.trend }}</span>
|
||||
</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Total touched -->
|
||||
<div class="dr-section" style="margin-top: 12px; padding-top: 12px; border-top: 1px solid var(--border);">
|
||||
<div class="dr-row" style="display: flex; justify-content: space-between; align-items: center;">
|
||||
<span class="dr-label" style="font-size: 0.8rem; color: var(--text-dim);">Total Issues Touched</span>
|
||||
<div class="dr-total-stat" style="display: flex; align-items: center; gap: 6px;">
|
||||
<span class="dr-total-value" style="font-size: 1rem; font-weight: 600; color: var(--text-bright);">{{ m.total_touched_current }}</span>
|
||||
<span class="dr-total-prev" style="font-size: 0.7rem; color: var(--text-dim);">/ {{ m.total_touched_previous }} prev</span>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
</div>
|
||||
80
src/dashboard/templates/partials/quests_panel.html
Normal file
80
src/dashboard/templates/partials/quests_panel.html
Normal file
@@ -0,0 +1,80 @@
|
||||
{% from "macros.html" import panel %}
|
||||
|
||||
<div class="quests-summary mb-4">
|
||||
<div class="row">
|
||||
<div class="col-md-4">
|
||||
<div class="stat-card">
|
||||
<div class="stat-value">{{ total_tokens }}</div>
|
||||
<div class="stat-label">Tokens Earned</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="col-md-4">
|
||||
<div class="stat-card">
|
||||
<div class="stat-value">{{ completed_count }}</div>
|
||||
<div class="stat-label">Quests Completed</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="col-md-4">
|
||||
<div class="stat-card">
|
||||
<div class="stat-value">{{ quests|selectattr('enabled', 'equalto', true)|list|length }}</div>
|
||||
<div class="stat-label">Active Quests</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="quests-list">
|
||||
{% for quest in quests %}
|
||||
{% if quest.enabled %}
|
||||
<div class="quest-card quest-status-{{ quest.status }}">
|
||||
<div class="quest-header">
|
||||
<h5 class="quest-name">{{ quest.name }}</h5>
|
||||
<span class="quest-reward">+{{ quest.reward_tokens }} ⚡</span>
|
||||
</div>
|
||||
<p class="quest-description">{{ quest.description }}</p>
|
||||
|
||||
<div class="quest-progress">
|
||||
{% if quest.status == 'completed' %}
|
||||
<div class="progress">
|
||||
<div class="progress-bar bg-success" style="width: 100%"></div>
|
||||
</div>
|
||||
<span class="quest-status-badge completed">Completed</span>
|
||||
{% elif quest.status == 'claimed' %}
|
||||
<div class="progress">
|
||||
<div class="progress-bar bg-success" style="width: 100%"></div>
|
||||
</div>
|
||||
<span class="quest-status-badge claimed">Reward Claimed</span>
|
||||
{% elif quest.on_cooldown %}
|
||||
<div class="progress">
|
||||
<div class="progress-bar bg-secondary" style="width: 100%"></div>
|
||||
</div>
|
||||
<span class="quest-status-badge cooldown">
|
||||
Cooldown: {{ quest.cooldown_hours_remaining }}h remaining
|
||||
</span>
|
||||
{% else %}
|
||||
<div class="progress">
|
||||
<div class="progress-bar" style="width: {{ (quest.current_value / quest.target_value * 100)|int }}%"></div>
|
||||
</div>
|
||||
<span class="quest-progress-text">{{ quest.current_value }} / {{ quest.target_value }}</span>
|
||||
{% endif %}
|
||||
</div>
|
||||
|
||||
<div class="quest-meta">
|
||||
<span class="quest-type">{{ quest.type }}</span>
|
||||
{% if quest.repeatable %}
|
||||
<span class="quest-repeatable">↻ Repeatable</span>
|
||||
{% endif %}
|
||||
{% if quest.completion_count > 0 %}
|
||||
<span class="quest-completions">Completed {{ quest.completion_count }} time{% if quest.completion_count != 1 %}s{% endif %}</span>
|
||||
{% endif %}
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
</div>
|
||||
|
||||
{% if not quests|selectattr('enabled', 'equalto', true)|list|length %}
|
||||
<div class="alert alert-info">
|
||||
No active quests available. Check back later or contact an administrator.
|
||||
</div>
|
||||
{% endif %}
|
||||
50
src/dashboard/templates/quests.html
Normal file
50
src/dashboard/templates/quests.html
Normal file
@@ -0,0 +1,50 @@
|
||||
{% extends "base.html" %}
|
||||
|
||||
{% block title %}Quests — Mission Control{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<div class="container-fluid">
|
||||
<div class="row">
|
||||
<div class="col-12">
|
||||
<h1 class="mc-title">Token Quests</h1>
|
||||
<p class="mc-subtitle">Complete quests to earn bonus tokens</p>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="row mt-4">
|
||||
<div class="col-md-8">
|
||||
<div id="quests-panel" hx-get="/quests/panel/{{ agent_id }}" hx-trigger="load, every 30s">
|
||||
<div class="mc-loading">Loading quests...</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="col-md-4">
|
||||
<div class="card mc-panel">
|
||||
<div class="card-header">
|
||||
<h5 class="mb-0">Leaderboard</h5>
|
||||
</div>
|
||||
<div class="card-body">
|
||||
<div id="leaderboard" hx-get="/quests/api/leaderboard" hx-trigger="load, every 60s">
|
||||
<div class="mc-loading">Loading leaderboard...</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="card mc-panel mt-4">
|
||||
<div class="card-header">
|
||||
<h5 class="mb-0">About Quests</h5>
|
||||
</div>
|
||||
<div class="card-body">
|
||||
<p class="mb-2">Quests are special objectives that reward tokens upon completion.</p>
|
||||
<ul class="mc-list mb-0">
|
||||
<li>Complete Daily Run sessions</li>
|
||||
<li>Close flaky-test issues</li>
|
||||
<li>Reduce P1 issue backlog</li>
|
||||
<li>Improve documentation</li>
|
||||
</ul>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
{% endblock %}
|
||||
581
src/timmy/quest_system.py
Normal file
581
src/timmy/quest_system.py
Normal file
@@ -0,0 +1,581 @@
|
||||
"""Token Quest System for agent rewards.
|
||||
|
||||
Provides quest definitions, progress tracking, completion detection,
|
||||
and token awards for agent accomplishments.
|
||||
|
||||
Quests are defined in config/quests.yaml and loaded at runtime.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from enum import StrEnum
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Path to quest configuration
|
||||
QUEST_CONFIG_PATH = Path(settings.repo_root) / "config" / "quests.yaml"
|
||||
|
||||
|
||||
class QuestType(StrEnum):
|
||||
"""Types of quests supported by the system."""
|
||||
|
||||
ISSUE_COUNT = "issue_count"
|
||||
ISSUE_REDUCE = "issue_reduce"
|
||||
DOCS_UPDATE = "docs_update"
|
||||
TEST_IMPROVE = "test_improve"
|
||||
DAILY_RUN = "daily_run"
|
||||
CUSTOM = "custom"
|
||||
|
||||
|
||||
class QuestStatus(StrEnum):
|
||||
"""Status of a quest for an agent."""
|
||||
|
||||
NOT_STARTED = "not_started"
|
||||
IN_PROGRESS = "in_progress"
|
||||
COMPLETED = "completed"
|
||||
CLAIMED = "claimed"
|
||||
EXPIRED = "expired"
|
||||
|
||||
|
||||
@dataclass
|
||||
class QuestDefinition:
|
||||
"""Definition of a quest from configuration."""
|
||||
|
||||
id: str
|
||||
name: str
|
||||
description: str
|
||||
reward_tokens: int
|
||||
quest_type: QuestType
|
||||
enabled: bool
|
||||
repeatable: bool
|
||||
cooldown_hours: int
|
||||
criteria: dict[str, Any]
|
||||
notification_message: str
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict[str, Any]) -> QuestDefinition:
|
||||
"""Create a QuestDefinition from a dictionary."""
|
||||
return cls(
|
||||
id=data["id"],
|
||||
name=data.get("name", "Unnamed Quest"),
|
||||
description=data.get("description", ""),
|
||||
reward_tokens=data.get("reward_tokens", 0),
|
||||
quest_type=QuestType(data.get("type", "custom")),
|
||||
enabled=data.get("enabled", True),
|
||||
repeatable=data.get("repeatable", False),
|
||||
cooldown_hours=data.get("cooldown_hours", 0),
|
||||
criteria=data.get("criteria", {}),
|
||||
notification_message=data.get(
|
||||
"notification_message", "Quest Complete! You earned {tokens} tokens."
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class QuestProgress:
|
||||
"""Progress of a quest for a specific agent."""
|
||||
|
||||
quest_id: str
|
||||
agent_id: str
|
||||
status: QuestStatus
|
||||
current_value: int = 0
|
||||
target_value: int = 0
|
||||
started_at: str = ""
|
||||
completed_at: str = ""
|
||||
claimed_at: str = ""
|
||||
completion_count: int = 0
|
||||
last_completed_at: str = ""
|
||||
metadata: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Convert to dictionary for serialization."""
|
||||
return {
|
||||
"quest_id": self.quest_id,
|
||||
"agent_id": self.agent_id,
|
||||
"status": self.status.value,
|
||||
"current_value": self.current_value,
|
||||
"target_value": self.target_value,
|
||||
"started_at": self.started_at,
|
||||
"completed_at": self.completed_at,
|
||||
"claimed_at": self.claimed_at,
|
||||
"completion_count": self.completion_count,
|
||||
"last_completed_at": self.last_completed_at,
|
||||
"metadata": self.metadata,
|
||||
}
|
||||
|
||||
|
||||
# In-memory storage for quest progress
|
||||
_quest_progress: dict[str, QuestProgress] = {}
|
||||
_quest_definitions: dict[str, QuestDefinition] = {}
|
||||
_quest_settings: dict[str, Any] = {}
|
||||
|
||||
|
||||
def _get_progress_key(quest_id: str, agent_id: str) -> str:
|
||||
"""Generate a unique key for quest progress."""
|
||||
return f"{agent_id}:{quest_id}"
|
||||
|
||||
|
||||
def load_quest_config() -> tuple[dict[str, QuestDefinition], dict[str, Any]]:
|
||||
"""Load quest definitions from quests.yaml.
|
||||
|
||||
Returns:
|
||||
Tuple of (quest definitions dict, settings dict)
|
||||
"""
|
||||
global _quest_definitions, _quest_settings
|
||||
|
||||
if not QUEST_CONFIG_PATH.exists():
|
||||
logger.warning("Quest config not found at %s", QUEST_CONFIG_PATH)
|
||||
return {}, {}
|
||||
|
||||
try:
|
||||
raw = QUEST_CONFIG_PATH.read_text()
|
||||
config = yaml.safe_load(raw)
|
||||
|
||||
if not isinstance(config, dict):
|
||||
logger.warning("Invalid quest config format")
|
||||
return {}, {}
|
||||
|
||||
# Load quest definitions
|
||||
quests_data = config.get("quests", {})
|
||||
definitions = {}
|
||||
for quest_id, quest_data in quests_data.items():
|
||||
quest_data["id"] = quest_id
|
||||
try:
|
||||
definition = QuestDefinition.from_dict(quest_data)
|
||||
definitions[quest_id] = definition
|
||||
except (ValueError, KeyError) as exc:
|
||||
logger.warning("Failed to load quest %s: %s", quest_id, exc)
|
||||
|
||||
# Load settings
|
||||
_quest_settings = config.get("settings", {})
|
||||
_quest_definitions = definitions
|
||||
|
||||
logger.debug("Loaded %d quest definitions", len(definitions))
|
||||
return definitions, _quest_settings
|
||||
|
||||
except (OSError, yaml.YAMLError) as exc:
|
||||
logger.warning("Failed to load quest config: %s", exc)
|
||||
return {}, {}
|
||||
|
||||
|
||||
def get_quest_definitions() -> dict[str, QuestDefinition]:
|
||||
"""Get all quest definitions, loading if necessary."""
|
||||
global _quest_definitions
|
||||
if not _quest_definitions:
|
||||
_quest_definitions, _ = load_quest_config()
|
||||
return _quest_definitions
|
||||
|
||||
|
||||
def get_quest_definition(quest_id: str) -> QuestDefinition | None:
|
||||
"""Get a specific quest definition by ID."""
|
||||
definitions = get_quest_definitions()
|
||||
return definitions.get(quest_id)
|
||||
|
||||
|
||||
def get_active_quests() -> list[QuestDefinition]:
|
||||
"""Get all enabled quest definitions."""
|
||||
definitions = get_quest_definitions()
|
||||
return [q for q in definitions.values() if q.enabled]
|
||||
|
||||
|
||||
def get_quest_progress(quest_id: str, agent_id: str) -> QuestProgress | None:
|
||||
"""Get progress for a specific quest and agent."""
|
||||
key = _get_progress_key(quest_id, agent_id)
|
||||
return _quest_progress.get(key)
|
||||
|
||||
|
||||
def get_or_create_progress(quest_id: str, agent_id: str) -> QuestProgress:
|
||||
"""Get existing progress or create new for quest/agent."""
|
||||
key = _get_progress_key(quest_id, agent_id)
|
||||
if key not in _quest_progress:
|
||||
quest = get_quest_definition(quest_id)
|
||||
if not quest:
|
||||
raise ValueError(f"Quest {quest_id} not found")
|
||||
|
||||
target = _get_target_value(quest)
|
||||
_quest_progress[key] = QuestProgress(
|
||||
quest_id=quest_id,
|
||||
agent_id=agent_id,
|
||||
status=QuestStatus.NOT_STARTED,
|
||||
current_value=0,
|
||||
target_value=target,
|
||||
started_at=datetime.now(UTC).isoformat(),
|
||||
)
|
||||
return _quest_progress[key]
|
||||
|
||||
|
||||
def _get_target_value(quest: QuestDefinition) -> int:
|
||||
"""Extract target value from quest criteria."""
|
||||
criteria = quest.criteria
|
||||
if quest.quest_type == QuestType.ISSUE_COUNT:
|
||||
return criteria.get("target_count", 1)
|
||||
elif quest.quest_type == QuestType.ISSUE_REDUCE:
|
||||
return criteria.get("target_reduction", 1)
|
||||
elif quest.quest_type == QuestType.DAILY_RUN:
|
||||
return criteria.get("min_sessions", 1)
|
||||
elif quest.quest_type == QuestType.DOCS_UPDATE:
|
||||
return criteria.get("min_files_changed", 1)
|
||||
elif quest.quest_type == QuestType.TEST_IMPROVE:
|
||||
return criteria.get("min_new_tests", 1)
|
||||
return 1
|
||||
|
||||
|
||||
def update_quest_progress(
|
||||
quest_id: str,
|
||||
agent_id: str,
|
||||
current_value: int,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
) -> QuestProgress:
|
||||
"""Update progress for a quest."""
|
||||
progress = get_or_create_progress(quest_id, agent_id)
|
||||
progress.current_value = current_value
|
||||
|
||||
if metadata:
|
||||
progress.metadata.update(metadata)
|
||||
|
||||
# Check if quest is now complete
|
||||
if progress.current_value >= progress.target_value:
|
||||
if progress.status not in (QuestStatus.COMPLETED, QuestStatus.CLAIMED):
|
||||
progress.status = QuestStatus.COMPLETED
|
||||
progress.completed_at = datetime.now(UTC).isoformat()
|
||||
logger.info("Quest %s completed for agent %s", quest_id, agent_id)
|
||||
|
||||
return progress
|
||||
|
||||
|
||||
def _is_on_cooldown(progress: QuestProgress, quest: QuestDefinition) -> bool:
|
||||
"""Check if a repeatable quest is on cooldown."""
|
||||
if not quest.repeatable or not progress.last_completed_at:
|
||||
return False
|
||||
|
||||
if quest.cooldown_hours <= 0:
|
||||
return False
|
||||
|
||||
try:
|
||||
last_completed = datetime.fromisoformat(progress.last_completed_at)
|
||||
cooldown_end = last_completed + timedelta(hours=quest.cooldown_hours)
|
||||
return datetime.now(UTC) < cooldown_end
|
||||
except (ValueError, TypeError):
|
||||
return False
|
||||
|
||||
|
||||
def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
|
||||
"""Claim the token reward for a completed quest.
|
||||
|
||||
Returns:
|
||||
Reward info dict if successful, None if not claimable
|
||||
"""
|
||||
progress = get_quest_progress(quest_id, agent_id)
|
||||
if not progress:
|
||||
return None
|
||||
|
||||
quest = get_quest_definition(quest_id)
|
||||
if not quest:
|
||||
return None
|
||||
|
||||
# Check if quest is completed but not yet claimed
|
||||
if progress.status != QuestStatus.COMPLETED:
|
||||
return None
|
||||
|
||||
# Check cooldown for repeatable quests
|
||||
if _is_on_cooldown(progress, quest):
|
||||
return None
|
||||
|
||||
try:
|
||||
# Award tokens via ledger
|
||||
from lightning.ledger import create_invoice_entry, mark_settled
|
||||
|
||||
# Create a mock invoice for the reward
|
||||
invoice_entry = create_invoice_entry(
|
||||
payment_hash=f"quest_{quest_id}_{agent_id}_{int(time.time())}",
|
||||
amount_sats=quest.reward_tokens,
|
||||
memo=f"Quest reward: {quest.name}",
|
||||
source="quest_reward",
|
||||
agent_id=agent_id,
|
||||
)
|
||||
|
||||
# Mark as settled immediately (quest rewards are auto-settled)
|
||||
mark_settled(invoice_entry.payment_hash, preimage=f"quest_{quest_id}")
|
||||
|
||||
# Update progress
|
||||
progress.status = QuestStatus.CLAIMED
|
||||
progress.claimed_at = datetime.now(UTC).isoformat()
|
||||
progress.completion_count += 1
|
||||
progress.last_completed_at = progress.claimed_at
|
||||
|
||||
# Reset for repeatable quests
|
||||
if quest.repeatable:
|
||||
progress.status = QuestStatus.NOT_STARTED
|
||||
progress.current_value = 0
|
||||
progress.completed_at = ""
|
||||
progress.claimed_at = ""
|
||||
|
||||
notification = quest.notification_message.format(tokens=quest.reward_tokens)
|
||||
|
||||
return {
|
||||
"quest_id": quest_id,
|
||||
"agent_id": agent_id,
|
||||
"tokens_awarded": quest.reward_tokens,
|
||||
"notification": notification,
|
||||
"completion_count": progress.completion_count,
|
||||
}
|
||||
|
||||
except Exception as exc:
|
||||
logger.error("Failed to award quest reward: %s", exc)
|
||||
return None
|
||||
|
||||
|
||||
def check_issue_count_quest(
|
||||
quest: QuestDefinition,
|
||||
agent_id: str,
|
||||
closed_issues: list[dict],
|
||||
) -> QuestProgress | None:
|
||||
"""Check progress for issue_count type quest."""
|
||||
criteria = quest.criteria
|
||||
target_labels = set(criteria.get("issue_labels", []))
|
||||
# target_count is available in criteria but not used directly here
|
||||
|
||||
# Count matching issues
|
||||
matching_count = 0
|
||||
for issue in closed_issues:
|
||||
issue_labels = {label.get("name", "") for label in issue.get("labels", [])}
|
||||
if target_labels.issubset(issue_labels) or (not target_labels and issue_labels):
|
||||
matching_count += 1
|
||||
|
||||
progress = update_quest_progress(
|
||||
quest.id, agent_id, matching_count, {"matching_issues": matching_count}
|
||||
)
|
||||
|
||||
return progress
|
||||
|
||||
|
||||
def check_issue_reduce_quest(
|
||||
quest: QuestDefinition,
|
||||
agent_id: str,
|
||||
previous_count: int,
|
||||
current_count: int,
|
||||
) -> QuestProgress | None:
|
||||
"""Check progress for issue_reduce type quest."""
|
||||
# target_reduction available in quest.criteria but we track actual reduction
|
||||
reduction = max(0, previous_count - current_count)
|
||||
|
||||
progress = update_quest_progress(quest.id, agent_id, reduction, {"reduction": reduction})
|
||||
|
||||
return progress
|
||||
|
||||
|
||||
def check_daily_run_quest(
|
||||
quest: QuestDefinition,
|
||||
agent_id: str,
|
||||
sessions_completed: int,
|
||||
) -> QuestProgress | None:
|
||||
"""Check progress for daily_run type quest."""
|
||||
# min_sessions available in quest.criteria but we track actual sessions
|
||||
progress = update_quest_progress(
|
||||
quest.id, agent_id, sessions_completed, {"sessions": sessions_completed}
|
||||
)
|
||||
|
||||
return progress
|
||||
|
||||
|
||||
def evaluate_quest_progress(
|
||||
quest_id: str,
|
||||
agent_id: str,
|
||||
context: dict[str, Any],
|
||||
) -> QuestProgress | None:
|
||||
"""Evaluate quest progress based on quest type and context.
|
||||
|
||||
Args:
|
||||
quest_id: The quest to evaluate
|
||||
agent_id: The agent to evaluate for
|
||||
context: Context data for evaluation (issues, metrics, etc.)
|
||||
|
||||
Returns:
|
||||
Updated QuestProgress or None if evaluation failed
|
||||
"""
|
||||
quest = get_quest_definition(quest_id)
|
||||
if not quest or not quest.enabled:
|
||||
return None
|
||||
|
||||
progress = get_quest_progress(quest_id, agent_id)
|
||||
|
||||
# Check cooldown for repeatable quests
|
||||
if progress and _is_on_cooldown(progress, quest):
|
||||
return progress
|
||||
|
||||
try:
|
||||
if quest.quest_type == QuestType.ISSUE_COUNT:
|
||||
closed_issues = context.get("closed_issues", [])
|
||||
return check_issue_count_quest(quest, agent_id, closed_issues)
|
||||
|
||||
elif quest.quest_type == QuestType.ISSUE_REDUCE:
|
||||
prev_count = context.get("previous_issue_count", 0)
|
||||
curr_count = context.get("current_issue_count", 0)
|
||||
return check_issue_reduce_quest(quest, agent_id, prev_count, curr_count)
|
||||
|
||||
elif quest.quest_type == QuestType.DAILY_RUN:
|
||||
sessions = context.get("sessions_completed", 0)
|
||||
return check_daily_run_quest(quest, agent_id, sessions)
|
||||
|
||||
elif quest.quest_type == QuestType.CUSTOM:
|
||||
# Custom quests require manual completion
|
||||
return progress
|
||||
|
||||
else:
|
||||
logger.debug("Quest type %s not yet implemented", quest.quest_type)
|
||||
return progress
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning("Quest evaluation failed for %s: %s", quest_id, exc)
|
||||
return progress
|
||||
|
||||
|
||||
def auto_evaluate_all_quests(agent_id: str, context: dict[str, Any]) -> list[dict]:
|
||||
"""Evaluate all active quests for an agent and award rewards.
|
||||
|
||||
Returns:
|
||||
List of reward info for newly completed quests
|
||||
"""
|
||||
rewards = []
|
||||
active_quests = get_active_quests()
|
||||
|
||||
for quest in active_quests:
|
||||
progress = evaluate_quest_progress(quest.id, agent_id, context)
|
||||
if progress and progress.status == QuestStatus.COMPLETED:
|
||||
# Auto-claim the reward
|
||||
reward = claim_quest_reward(quest.id, agent_id)
|
||||
if reward:
|
||||
rewards.append(reward)
|
||||
|
||||
return rewards
|
||||
|
||||
|
||||
def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
|
||||
"""Get complete quest status for an agent."""
|
||||
definitions = get_quest_definitions()
|
||||
quests_status = []
|
||||
total_rewards = 0
|
||||
completed_count = 0
|
||||
|
||||
for quest_id, quest in definitions.items():
|
||||
progress = get_quest_progress(quest_id, agent_id)
|
||||
if not progress:
|
||||
progress = get_or_create_progress(quest_id, agent_id)
|
||||
|
||||
is_on_cooldown = _is_on_cooldown(progress, quest) if quest.repeatable else False
|
||||
|
||||
quest_info = {
|
||||
"quest_id": quest_id,
|
||||
"name": quest.name,
|
||||
"description": quest.description,
|
||||
"reward_tokens": quest.reward_tokens,
|
||||
"type": quest.quest_type.value,
|
||||
"enabled": quest.enabled,
|
||||
"repeatable": quest.repeatable,
|
||||
"status": progress.status.value,
|
||||
"current_value": progress.current_value,
|
||||
"target_value": progress.target_value,
|
||||
"completion_count": progress.completion_count,
|
||||
"on_cooldown": is_on_cooldown,
|
||||
"cooldown_hours_remaining": 0,
|
||||
}
|
||||
|
||||
if is_on_cooldown and progress.last_completed_at:
|
||||
try:
|
||||
last = datetime.fromisoformat(progress.last_completed_at)
|
||||
cooldown_end = last + timedelta(hours=quest.cooldown_hours)
|
||||
hours_remaining = (cooldown_end - datetime.now(UTC)).total_seconds() / 3600
|
||||
quest_info["cooldown_hours_remaining"] = round(max(0, hours_remaining), 1)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
quests_status.append(quest_info)
|
||||
total_rewards += progress.completion_count * quest.reward_tokens
|
||||
completed_count += progress.completion_count
|
||||
|
||||
return {
|
||||
"agent_id": agent_id,
|
||||
"quests": quests_status,
|
||||
"total_tokens_earned": total_rewards,
|
||||
"total_quests_completed": completed_count,
|
||||
"active_quests_count": len([q for q in quests_status if q["enabled"]]),
|
||||
}
|
||||
|
||||
|
||||
def reset_quest_progress(quest_id: str | None = None, agent_id: str | None = None) -> int:
|
||||
"""Reset quest progress. Useful for testing.
|
||||
|
||||
Args:
|
||||
quest_id: Specific quest to reset, or None for all
|
||||
agent_id: Specific agent to reset, or None for all
|
||||
|
||||
Returns:
|
||||
Number of progress entries reset
|
||||
"""
|
||||
global _quest_progress
|
||||
count = 0
|
||||
|
||||
keys_to_reset = []
|
||||
for key, _progress in _quest_progress.items():
|
||||
key_agent, key_quest = key.split(":", 1)
|
||||
if (quest_id is None or key_quest == quest_id) and (
|
||||
agent_id is None or key_agent == agent_id
|
||||
):
|
||||
keys_to_reset.append(key)
|
||||
|
||||
for key in keys_to_reset:
|
||||
del _quest_progress[key]
|
||||
count += 1
|
||||
|
||||
return count
|
||||
|
||||
|
||||
def get_quest_leaderboard() -> list[dict[str, Any]]:
|
||||
"""Get a leaderboard of agents by quest completion."""
|
||||
agent_stats: dict[str, dict[str, Any]] = {}
|
||||
|
||||
for _key, progress in _quest_progress.items():
|
||||
agent_id = progress.agent_id
|
||||
if agent_id not in agent_stats:
|
||||
agent_stats[agent_id] = {
|
||||
"agent_id": agent_id,
|
||||
"total_completions": 0,
|
||||
"total_tokens": 0,
|
||||
"quests_completed": set(),
|
||||
}
|
||||
|
||||
quest = get_quest_definition(progress.quest_id)
|
||||
if quest:
|
||||
agent_stats[agent_id]["total_completions"] += progress.completion_count
|
||||
agent_stats[agent_id]["total_tokens"] += progress.completion_count * quest.reward_tokens
|
||||
if progress.completion_count > 0:
|
||||
agent_stats[agent_id]["quests_completed"].add(quest.id)
|
||||
|
||||
leaderboard = []
|
||||
for stats in agent_stats.values():
|
||||
leaderboard.append(
|
||||
{
|
||||
"agent_id": stats["agent_id"],
|
||||
"total_completions": stats["total_completions"],
|
||||
"total_tokens": stats["total_tokens"],
|
||||
"unique_quests_completed": len(stats["quests_completed"]),
|
||||
}
|
||||
)
|
||||
|
||||
# Sort by total tokens (descending)
|
||||
leaderboard.sort(key=lambda x: x["total_tokens"], reverse=True)
|
||||
return leaderboard
|
||||
|
||||
|
||||
# Initialize on module load
|
||||
load_quest_config()
|
||||
536
tests/timmy/test_golden_path.py
Normal file
536
tests/timmy/test_golden_path.py
Normal file
@@ -0,0 +1,536 @@
|
||||
"""Tests for the Golden Path generator."""
|
||||
|
||||
import json
|
||||
from datetime import UTC, datetime
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from timmy_automations.daily_run.golden_path import (
|
||||
TIME_ESTIMATES,
|
||||
TYPE_PATTERNS,
|
||||
GiteaClient,
|
||||
GoldenPath,
|
||||
PathItem,
|
||||
build_golden_path,
|
||||
classify_issue_type,
|
||||
estimate_time,
|
||||
extract_size,
|
||||
generate_golden_path,
|
||||
get_token,
|
||||
group_issues_by_type,
|
||||
load_config,
|
||||
score_issue_for_path,
|
||||
)
|
||||
|
||||
|
||||
class TestLoadConfig:
|
||||
"""Tests for configuration loading."""
|
||||
|
||||
def test_load_config_defaults(self):
|
||||
"""Config should have sensible defaults."""
|
||||
config = load_config()
|
||||
assert "gitea_api" in config
|
||||
assert "repo_slug" in config
|
||||
assert "size_labels" in config
|
||||
|
||||
def test_load_config_env_override(self, monkeypatch):
|
||||
"""Environment variables should override defaults."""
|
||||
monkeypatch.setenv("TIMMY_GITEA_API", "http://custom:3000/api/v1")
|
||||
monkeypatch.setenv("TIMMY_REPO_SLUG", "custom/repo")
|
||||
monkeypatch.setenv("TIMMY_GITEA_TOKEN", "test-token")
|
||||
|
||||
config = load_config()
|
||||
assert config["gitea_api"] == "http://custom:3000/api/v1"
|
||||
assert config["repo_slug"] == "custom/repo"
|
||||
assert config["token"] == "test-token"
|
||||
|
||||
|
||||
class TestGetToken:
|
||||
"""Tests for token retrieval."""
|
||||
|
||||
def test_get_token_from_config(self):
|
||||
"""Token from config takes precedence."""
|
||||
config = {"token": "config-token", "token_file": "~/.test"}
|
||||
assert get_token(config) == "config-token"
|
||||
|
||||
@patch("pathlib.Path.exists")
|
||||
@patch("pathlib.Path.read_text")
|
||||
def test_get_token_from_file(self, mock_read, mock_exists):
|
||||
"""Token can be read from file."""
|
||||
mock_exists.return_value = True
|
||||
mock_read.return_value = "file-token\n"
|
||||
|
||||
config = {"token_file": "~/.hermes/test_token"}
|
||||
assert get_token(config) == "file-token"
|
||||
|
||||
def test_get_token_none(self):
|
||||
"""Returns None if no token available."""
|
||||
config = {"token_file": "/nonexistent/path"}
|
||||
assert get_token(config) is None
|
||||
|
||||
|
||||
class TestExtractSize:
|
||||
"""Tests for size label extraction."""
|
||||
|
||||
def test_extract_size_xs(self):
|
||||
"""Should extract XS size."""
|
||||
labels = [{"name": "size:XS"}, {"name": "bug"}]
|
||||
assert extract_size(labels) == "XS"
|
||||
|
||||
def test_extract_size_s(self):
|
||||
"""Should extract S size."""
|
||||
labels = [{"name": "bug"}, {"name": "size:S"}]
|
||||
assert extract_size(labels) == "S"
|
||||
|
||||
def test_extract_size_m(self):
|
||||
"""Should extract M size."""
|
||||
labels = [{"name": "size:M"}]
|
||||
assert extract_size(labels) == "M"
|
||||
|
||||
def test_extract_size_unknown(self):
|
||||
"""Should return ? for unknown size."""
|
||||
labels = [{"name": "bug"}, {"name": "feature"}]
|
||||
assert extract_size(labels) == "?"
|
||||
|
||||
def test_extract_size_empty(self):
|
||||
"""Should return ? for empty labels."""
|
||||
assert extract_size([]) == "?"
|
||||
|
||||
|
||||
class TestClassifyIssueType:
|
||||
"""Tests for issue type classification."""
|
||||
|
||||
def test_classify_triage(self):
|
||||
"""Should classify triage issues."""
|
||||
issue = {
|
||||
"title": "Triage new issues",
|
||||
"labels": [{"name": "triage"}],
|
||||
}
|
||||
assert classify_issue_type(issue) == "triage"
|
||||
|
||||
def test_classify_test(self):
|
||||
"""Should classify test issues."""
|
||||
issue = {
|
||||
"title": "Add unit tests for parser",
|
||||
"labels": [{"name": "test"}],
|
||||
}
|
||||
assert classify_issue_type(issue) == "test"
|
||||
|
||||
def test_classify_fix(self):
|
||||
"""Should classify fix issues."""
|
||||
issue = {
|
||||
"title": "Fix login bug",
|
||||
"labels": [{"name": "bug"}],
|
||||
}
|
||||
assert classify_issue_type(issue) == "fix"
|
||||
|
||||
def test_classify_docs(self):
|
||||
"""Should classify docs issues."""
|
||||
issue = {
|
||||
"title": "Update README",
|
||||
"labels": [{"name": "docs"}],
|
||||
}
|
||||
assert classify_issue_type(issue) == "docs"
|
||||
|
||||
def test_classify_refactor(self):
|
||||
"""Should classify refactor issues."""
|
||||
issue = {
|
||||
"title": "Refactor validation logic",
|
||||
"labels": [{"name": "refactor"}],
|
||||
}
|
||||
assert classify_issue_type(issue) == "refactor"
|
||||
|
||||
def test_classify_default_to_fix(self):
|
||||
"""Should default to fix for uncategorized."""
|
||||
issue = {
|
||||
"title": "Something vague",
|
||||
"labels": [{"name": "question"}],
|
||||
}
|
||||
assert classify_issue_type(issue) == "fix"
|
||||
|
||||
def test_classify_title_priority(self):
|
||||
"""Title patterns should contribute to classification."""
|
||||
issue = {
|
||||
"title": "Fix the broken parser",
|
||||
"labels": [],
|
||||
}
|
||||
assert classify_issue_type(issue) == "fix"
|
||||
|
||||
|
||||
class TestEstimateTime:
|
||||
"""Tests for time estimation."""
|
||||
|
||||
def test_estimate_xs_fix(self):
|
||||
"""XS fix should be 10 minutes."""
|
||||
issue = {
|
||||
"title": "Fix typo",
|
||||
"labels": [{"name": "size:XS"}, {"name": "bug"}],
|
||||
}
|
||||
assert estimate_time(issue) == 10
|
||||
|
||||
def test_estimate_s_test(self):
|
||||
"""S test should be 15 minutes."""
|
||||
issue = {
|
||||
"title": "Add test coverage",
|
||||
"labels": [{"name": "size:S"}, {"name": "test"}],
|
||||
}
|
||||
assert estimate_time(issue) == 15
|
||||
|
||||
def test_estimate_m_fix(self):
|
||||
"""M fix should be 25 minutes."""
|
||||
issue = {
|
||||
"title": "Fix complex bug",
|
||||
"labels": [{"name": "size:M"}, {"name": "bug"}],
|
||||
}
|
||||
assert estimate_time(issue) == 25
|
||||
|
||||
def test_estimate_unknown_size(self):
|
||||
"""Unknown size should fallback to S."""
|
||||
issue = {
|
||||
"title": "Some fix",
|
||||
"labels": [{"name": "bug"}],
|
||||
}
|
||||
# Falls back to S/fix = 15
|
||||
assert estimate_time(issue) == 15
|
||||
|
||||
|
||||
class TestScoreIssueForPath:
|
||||
"""Tests for issue scoring."""
|
||||
|
||||
def test_score_prefers_xs(self):
|
||||
"""XS issues should score higher."""
|
||||
xs = {"title": "Fix", "labels": [{"name": "size:XS"}]}
|
||||
s = {"title": "Fix", "labels": [{"name": "size:S"}]}
|
||||
m = {"title": "Fix", "labels": [{"name": "size:M"}]}
|
||||
|
||||
assert score_issue_for_path(xs) > score_issue_for_path(s)
|
||||
assert score_issue_for_path(s) > score_issue_for_path(m)
|
||||
|
||||
def test_score_prefers_clear_types(self):
|
||||
"""Issues with clear type labels score higher."""
|
||||
# Bug label adds score, so with bug should be >= without bug
|
||||
with_type = {
|
||||
"title": "Fix bug",
|
||||
"labels": [{"name": "size:S"}, {"name": "bug"}],
|
||||
}
|
||||
without_type = {
|
||||
"title": "Something",
|
||||
"labels": [{"name": "size:S"}],
|
||||
}
|
||||
|
||||
assert score_issue_for_path(with_type) >= score_issue_for_path(without_type)
|
||||
|
||||
def test_score_accepts_criteria(self):
|
||||
"""Issues with acceptance criteria score higher."""
|
||||
with_criteria = {
|
||||
"title": "Fix",
|
||||
"labels": [{"name": "size:S"}],
|
||||
"body": "## Acceptance Criteria\n- [ ] Fix it",
|
||||
}
|
||||
without_criteria = {
|
||||
"title": "Fix",
|
||||
"labels": [{"name": "size:S"}],
|
||||
"body": "Just fix it",
|
||||
}
|
||||
|
||||
assert score_issue_for_path(with_criteria) > score_issue_for_path(without_criteria)
|
||||
|
||||
|
||||
class TestGroupIssuesByType:
|
||||
"""Tests for issue grouping."""
|
||||
|
||||
def test_groups_by_type(self):
|
||||
"""Issues should be grouped by their type."""
|
||||
issues = [
|
||||
{"title": "Fix bug", "labels": [{"name": "bug"}], "number": 1},
|
||||
{"title": "Add test", "labels": [{"name": "test"}], "number": 2},
|
||||
{"title": "Another fix", "labels": [{"name": "bug"}], "number": 3},
|
||||
]
|
||||
|
||||
grouped = group_issues_by_type(issues)
|
||||
|
||||
assert len(grouped["fix"]) == 2
|
||||
assert len(grouped["test"]) == 1
|
||||
assert len(grouped["triage"]) == 0
|
||||
|
||||
def test_sorts_by_score(self):
|
||||
"""Issues within groups should be sorted by score."""
|
||||
issues = [
|
||||
{"title": "Fix", "labels": [{"name": "size:M"}], "number": 1},
|
||||
{"title": "Fix", "labels": [{"name": "size:XS"}], "number": 2},
|
||||
{"title": "Fix", "labels": [{"name": "size:S"}], "number": 3},
|
||||
]
|
||||
|
||||
grouped = group_issues_by_type(issues)
|
||||
|
||||
# XS should be first (highest score)
|
||||
assert grouped["fix"][0]["number"] == 2
|
||||
# M should be last (lowest score)
|
||||
assert grouped["fix"][2]["number"] == 1
|
||||
|
||||
|
||||
class TestBuildGoldenPath:
|
||||
"""Tests for Golden Path building."""
|
||||
|
||||
def test_builds_path_with_all_types(self):
|
||||
"""Path should include items from different types."""
|
||||
grouped = {
|
||||
"triage": [
|
||||
{"title": "Triage", "labels": [{"name": "size:XS"}], "number": 1, "html_url": ""},
|
||||
],
|
||||
"fix": [
|
||||
{"title": "Fix 1", "labels": [{"name": "size:S"}], "number": 2, "html_url": ""},
|
||||
{"title": "Fix 2", "labels": [{"name": "size:XS"}], "number": 3, "html_url": ""},
|
||||
],
|
||||
"test": [
|
||||
{"title": "Test", "labels": [{"name": "size:S"}], "number": 4, "html_url": ""},
|
||||
],
|
||||
"docs": [],
|
||||
"refactor": [],
|
||||
}
|
||||
|
||||
path = build_golden_path(grouped, target_minutes=45)
|
||||
|
||||
assert path.item_count >= 3
|
||||
assert path.items[0].issue_type == "triage" # Warm-up
|
||||
assert any(item.issue_type == "test" for item in path.items)
|
||||
|
||||
def test_respects_time_budget(self):
|
||||
"""Path should stay within reasonable time budget."""
|
||||
grouped = {
|
||||
"triage": [
|
||||
{"title": "Triage", "labels": [{"name": "size:S"}], "number": 1, "html_url": ""},
|
||||
],
|
||||
"fix": [
|
||||
{"title": "Fix 1", "labels": [{"name": "size:S"}], "number": 2, "html_url": ""},
|
||||
{"title": "Fix 2", "labels": [{"name": "size:S"}], "number": 3, "html_url": ""},
|
||||
],
|
||||
"test": [
|
||||
{"title": "Test", "labels": [{"name": "size:S"}], "number": 4, "html_url": ""},
|
||||
],
|
||||
"docs": [],
|
||||
"refactor": [],
|
||||
}
|
||||
|
||||
path = build_golden_path(grouped, target_minutes=45)
|
||||
|
||||
# Should be in 30-60 minute range
|
||||
assert 20 <= path.total_estimated_minutes <= 70
|
||||
|
||||
def test_no_duplicate_issues(self):
|
||||
"""Path should not include the same issue twice."""
|
||||
grouped = {
|
||||
"triage": [],
|
||||
"fix": [
|
||||
{"title": "Fix", "labels": [{"name": "size:S"}], "number": 1, "html_url": ""},
|
||||
],
|
||||
"test": [],
|
||||
"docs": [],
|
||||
"refactor": [],
|
||||
}
|
||||
|
||||
path = build_golden_path(grouped, target_minutes=45)
|
||||
|
||||
numbers = [item.number for item in path.items]
|
||||
assert len(numbers) == len(set(numbers)) # No duplicates
|
||||
|
||||
def test_fallback_when_triage_missing(self):
|
||||
"""Should use fallback when no triage issues available."""
|
||||
grouped = {
|
||||
"triage": [],
|
||||
"fix": [
|
||||
{"title": "Fix", "labels": [{"name": "size:XS"}], "number": 1, "html_url": ""},
|
||||
],
|
||||
"test": [
|
||||
{"title": "Test", "labels": [{"name": "size:XS"}], "number": 2, "html_url": ""},
|
||||
],
|
||||
"docs": [],
|
||||
"refactor": [],
|
||||
}
|
||||
|
||||
path = build_golden_path(grouped, target_minutes=45)
|
||||
|
||||
assert path.item_count > 0
|
||||
|
||||
|
||||
class TestGoldenPathDataclass:
|
||||
"""Tests for the GoldenPath dataclass."""
|
||||
|
||||
def test_total_time_calculation(self):
|
||||
"""Should sum item times correctly."""
|
||||
path = GoldenPath(
|
||||
generated_at=datetime.now(UTC).isoformat(),
|
||||
target_minutes=45,
|
||||
items=[
|
||||
PathItem(1, "Test 1", "XS", "fix", 10, ""),
|
||||
PathItem(2, "Test 2", "S", "test", 15, ""),
|
||||
],
|
||||
)
|
||||
|
||||
assert path.total_estimated_minutes == 25
|
||||
|
||||
def test_to_dict(self):
|
||||
"""Should convert to dict correctly."""
|
||||
path = GoldenPath(
|
||||
generated_at="2024-01-01T00:00:00+00:00",
|
||||
target_minutes=45,
|
||||
items=[PathItem(1, "Test", "XS", "fix", 10, "http://test")],
|
||||
)
|
||||
|
||||
data = path.to_dict()
|
||||
|
||||
assert data["target_minutes"] == 45
|
||||
assert data["total_estimated_minutes"] == 10
|
||||
assert data["item_count"] == 1
|
||||
assert len(data["items"]) == 1
|
||||
|
||||
def test_to_json(self):
|
||||
"""Should convert to JSON correctly."""
|
||||
path = GoldenPath(
|
||||
generated_at="2024-01-01T00:00:00+00:00",
|
||||
target_minutes=45,
|
||||
items=[],
|
||||
)
|
||||
|
||||
json_str = path.to_json()
|
||||
data = json.loads(json_str)
|
||||
|
||||
assert data["target_minutes"] == 45
|
||||
|
||||
|
||||
class TestGiteaClient:
|
||||
"""Tests for the GiteaClient."""
|
||||
|
||||
def test_client_initialization(self):
|
||||
"""Client should initialize with config."""
|
||||
config = {
|
||||
"gitea_api": "http://test:3000/api/v1",
|
||||
"repo_slug": "test/repo",
|
||||
}
|
||||
client = GiteaClient(config, "token123")
|
||||
|
||||
assert client.api_base == "http://test:3000/api/v1"
|
||||
assert client.repo_slug == "test/repo"
|
||||
assert client.token == "token123"
|
||||
|
||||
def test_headers_with_token(self):
|
||||
"""Headers should include auth token."""
|
||||
config = {"gitea_api": "http://test", "repo_slug": "test/repo"}
|
||||
client = GiteaClient(config, "mytoken")
|
||||
|
||||
headers = client._headers()
|
||||
|
||||
assert headers["Authorization"] == "token mytoken"
|
||||
assert headers["Accept"] == "application/json"
|
||||
|
||||
def test_headers_without_token(self):
|
||||
"""Headers should work without token."""
|
||||
config = {"gitea_api": "http://test", "repo_slug": "test/repo"}
|
||||
client = GiteaClient(config, None)
|
||||
|
||||
headers = client._headers()
|
||||
|
||||
assert "Authorization" not in headers
|
||||
assert headers["Accept"] == "application/json"
|
||||
|
||||
@patch("timmy_automations.daily_run.golden_path.urlopen")
|
||||
def test_is_available_success(self, mock_urlopen):
|
||||
"""Should detect API availability."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status = 200
|
||||
mock_context = MagicMock()
|
||||
mock_context.__enter__ = MagicMock(return_value=mock_response)
|
||||
mock_context.__exit__ = MagicMock(return_value=False)
|
||||
mock_urlopen.return_value = mock_context
|
||||
|
||||
config = {"gitea_api": "http://test", "repo_slug": "test/repo"}
|
||||
client = GiteaClient(config, None)
|
||||
|
||||
assert client.is_available() is True
|
||||
|
||||
@patch("urllib.request.urlopen")
|
||||
def test_is_available_failure(self, mock_urlopen):
|
||||
"""Should handle API unavailability."""
|
||||
from urllib.error import URLError
|
||||
|
||||
mock_urlopen.side_effect = URLError("Connection refused")
|
||||
|
||||
config = {"gitea_api": "http://test", "repo_slug": "test/repo"}
|
||||
client = GiteaClient(config, None)
|
||||
|
||||
assert client.is_available() is False
|
||||
|
||||
|
||||
class TestIntegration:
|
||||
"""Integration-style tests."""
|
||||
|
||||
@patch("timmy_automations.daily_run.golden_path.GiteaClient")
|
||||
def test_generate_golden_path_integration(self, mock_client_class):
|
||||
"""End-to-end test with mocked Gitea."""
|
||||
# Setup mock
|
||||
mock_client = MagicMock()
|
||||
mock_client.is_available.return_value = True
|
||||
mock_client.get_paginated.return_value = [
|
||||
{
|
||||
"number": 1,
|
||||
"title": "Triage issues",
|
||||
"labels": [{"name": "size:XS"}, {"name": "triage"}],
|
||||
"html_url": "http://test/1",
|
||||
},
|
||||
{
|
||||
"number": 2,
|
||||
"title": "Fix bug",
|
||||
"labels": [{"name": "size:S"}, {"name": "bug"}],
|
||||
"html_url": "http://test/2",
|
||||
},
|
||||
{
|
||||
"number": 3,
|
||||
"title": "Add tests",
|
||||
"labels": [{"name": "size:S"}, {"name": "test"}],
|
||||
"html_url": "http://test/3",
|
||||
},
|
||||
{
|
||||
"number": 4,
|
||||
"title": "Another fix",
|
||||
"labels": [{"name": "size:XS"}, {"name": "bug"}],
|
||||
"html_url": "http://test/4",
|
||||
},
|
||||
]
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
path = generate_golden_path(target_minutes=45)
|
||||
|
||||
assert path.item_count >= 3
|
||||
assert all(item.url.startswith("http://test/") for item in path.items)
|
||||
|
||||
@patch("timmy_automations.daily_run.golden_path.GiteaClient")
|
||||
def test_generate_when_unavailable(self, mock_client_class):
|
||||
"""Should return empty path when Gitea unavailable."""
|
||||
mock_client = MagicMock()
|
||||
mock_client.is_available.return_value = False
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
path = generate_golden_path(target_minutes=45)
|
||||
|
||||
assert path.item_count == 0
|
||||
assert path.items == []
|
||||
|
||||
|
||||
class TestTypePatterns:
|
||||
"""Tests for type pattern definitions."""
|
||||
|
||||
def test_type_patterns_structure(self):
|
||||
"""Type patterns should have required keys."""
|
||||
for _issue_type, patterns in TYPE_PATTERNS.items():
|
||||
assert "labels" in patterns
|
||||
assert "title" in patterns
|
||||
assert isinstance(patterns["labels"], list)
|
||||
assert isinstance(patterns["title"], list)
|
||||
|
||||
def test_time_estimates_structure(self):
|
||||
"""Time estimates should have all sizes."""
|
||||
for size in ["XS", "S", "M"]:
|
||||
assert size in TIME_ESTIMATES
|
||||
for issue_type in ["triage", "fix", "test", "docs", "refactor"]:
|
||||
assert issue_type in TIME_ESTIMATES[size]
|
||||
assert isinstance(TIME_ESTIMATES[size][issue_type], int)
|
||||
assert TIME_ESTIMATES[size][issue_type] > 0
|
||||
524
tests/timmy_automations/test_token_rules.py
Normal file
524
tests/timmy_automations/test_token_rules.py
Normal file
@@ -0,0 +1,524 @@
|
||||
"""Tests for token_rules module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Add timmy_automations to path for imports
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations"))
|
||||
|
||||
from utils import token_rules as tr
|
||||
|
||||
|
||||
class TestTokenEvent:
|
||||
"""Test TokenEvent dataclass."""
|
||||
|
||||
def test_delta_calculation_reward(self):
|
||||
"""Delta is positive for rewards."""
|
||||
event = tr.TokenEvent(
|
||||
name="test",
|
||||
description="Test event",
|
||||
reward=10,
|
||||
penalty=0,
|
||||
category="test",
|
||||
)
|
||||
assert event.delta == 10
|
||||
|
||||
def test_delta_calculation_penalty(self):
|
||||
"""Delta is negative for penalties."""
|
||||
event = tr.TokenEvent(
|
||||
name="test",
|
||||
description="Test event",
|
||||
reward=0,
|
||||
penalty=-5,
|
||||
category="test",
|
||||
)
|
||||
assert event.delta == -5
|
||||
|
||||
def test_delta_calculation_mixed(self):
|
||||
"""Delta is net of reward and penalty."""
|
||||
event = tr.TokenEvent(
|
||||
name="test",
|
||||
description="Test event",
|
||||
reward=10,
|
||||
penalty=-3,
|
||||
category="test",
|
||||
)
|
||||
assert event.delta == 7
|
||||
|
||||
|
||||
class TestTokenRulesLoading:
|
||||
"""Test TokenRules configuration loading."""
|
||||
|
||||
def test_loads_from_yaml_file(self, tmp_path):
|
||||
"""Load configuration from YAML file."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0-test",
|
||||
"events": {
|
||||
"test_event": {
|
||||
"description": "A test event",
|
||||
"reward": 15,
|
||||
"category": "test",
|
||||
}
|
||||
},
|
||||
"gating_thresholds": {"test_op": 50},
|
||||
"daily_limits": {"test": {"max_earn": 100, "max_spend": 10}},
|
||||
"audit": {"log_all_transactions": False},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_config_version() == "1.0.0-test"
|
||||
assert rules.get_delta("test_event") == 15
|
||||
assert rules.get_gate_threshold("test_op") == 50
|
||||
|
||||
def test_fallback_when_yaml_missing(self, tmp_path):
|
||||
"""Use fallback defaults when YAML file doesn't exist."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_config_version() == "fallback"
|
||||
# Fallback should have some basic events
|
||||
assert rules.get_delta("pr_merged") == 10
|
||||
assert rules.get_delta("test_fixed") == 8
|
||||
assert rules.get_delta("automation_failure") == -2
|
||||
|
||||
def test_fallback_when_yaml_not_installed(self, tmp_path):
|
||||
"""Use fallback when PyYAML is not installed."""
|
||||
with patch.dict(sys.modules, {"yaml": None}):
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_file.write_text("version: '1.0.0'")
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_config_version() == "fallback"
|
||||
|
||||
|
||||
class TestTokenRulesGetDelta:
|
||||
"""Test get_delta method."""
|
||||
|
||||
def test_get_delta_existing_event(self, tmp_path):
|
||||
"""Get delta for configured event."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"pr_merged": {"description": "PR merged", "reward": 10, "category": "merge"},
|
||||
"automation_failure": {"description": "Failure", "penalty": -2, "category": "ops"},
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_delta("pr_merged") == 10
|
||||
assert rules.get_delta("automation_failure") == -2
|
||||
|
||||
def test_get_delta_unknown_event(self, tmp_path):
|
||||
"""Return 0 for unknown events."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_delta("unknown_event") == 0
|
||||
|
||||
|
||||
class TestTokenRulesGetEvent:
|
||||
"""Test get_event method."""
|
||||
|
||||
def test_get_event_returns_full_config(self, tmp_path):
|
||||
"""Get full event configuration."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"pr_merged": {
|
||||
"description": "PR merged successfully",
|
||||
"reward": 10,
|
||||
"category": "merge",
|
||||
"gate_threshold": 0,
|
||||
}
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
event = rules.get_event("pr_merged")
|
||||
|
||||
assert event is not None
|
||||
assert event.name == "pr_merged"
|
||||
assert event.description == "PR merged successfully"
|
||||
assert event.reward == 10
|
||||
assert event.category == "merge"
|
||||
assert event.gate_threshold == 0
|
||||
|
||||
def test_get_event_unknown_returns_none(self, tmp_path):
|
||||
"""Return None for unknown event."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_event("unknown") is None
|
||||
|
||||
|
||||
class TestTokenRulesListEvents:
|
||||
"""Test list_events method."""
|
||||
|
||||
def test_list_all_events(self, tmp_path):
|
||||
"""List all configured events."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
|
||||
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
|
||||
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
events = rules.list_events()
|
||||
|
||||
assert len(events) == 3
|
||||
event_names = {e.name for e in events}
|
||||
assert "event_a" in event_names
|
||||
assert "event_b" in event_names
|
||||
assert "event_c" in event_names
|
||||
|
||||
def test_list_events_by_category(self, tmp_path):
|
||||
"""Filter events by category."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
|
||||
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
|
||||
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
events = rules.list_events(category="cat1")
|
||||
|
||||
assert len(events) == 2
|
||||
for event in events:
|
||||
assert event.category == "cat1"
|
||||
|
||||
|
||||
class TestTokenRulesGating:
|
||||
"""Test gating threshold methods."""
|
||||
|
||||
def test_check_gate_with_threshold(self, tmp_path):
|
||||
"""Check gate when threshold is defined."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {},
|
||||
"gating_thresholds": {"pr_merge": 50},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.check_gate("pr_merge", current_tokens=100) is True
|
||||
assert rules.check_gate("pr_merge", current_tokens=50) is True
|
||||
assert rules.check_gate("pr_merge", current_tokens=49) is False
|
||||
assert rules.check_gate("pr_merge", current_tokens=0) is False
|
||||
|
||||
def test_check_gate_no_threshold(self, tmp_path):
|
||||
"""Check gate when no threshold is defined (always allowed)."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {},
|
||||
"gating_thresholds": {},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
# No threshold defined, should always be allowed
|
||||
assert rules.check_gate("unknown_op", current_tokens=0) is True
|
||||
assert rules.check_gate("unknown_op", current_tokens=-100) is True
|
||||
|
||||
def test_get_gate_threshold(self, tmp_path):
|
||||
"""Get threshold value."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"gating_thresholds": {"pr_merge": 50, "sensitive_op": 100},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_gate_threshold("pr_merge") == 50
|
||||
assert rules.get_gate_threshold("sensitive_op") == 100
|
||||
assert rules.get_gate_threshold("unknown") is None
|
||||
|
||||
|
||||
class TestTokenRulesDailyLimits:
|
||||
"""Test daily limits methods."""
|
||||
|
||||
def test_get_daily_limits(self, tmp_path):
|
||||
"""Get daily limits for a category."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"daily_limits": {
|
||||
"triage": {"max_earn": 100, "max_spend": 0},
|
||||
"merge": {"max_earn": 50, "max_spend": 10},
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
triage_limits = rules.get_daily_limits("triage")
|
||||
assert triage_limits is not None
|
||||
assert triage_limits.max_earn == 100
|
||||
assert triage_limits.max_spend == 0
|
||||
|
||||
merge_limits = rules.get_daily_limits("merge")
|
||||
assert merge_limits is not None
|
||||
assert merge_limits.max_earn == 50
|
||||
assert merge_limits.max_spend == 10
|
||||
|
||||
def test_get_daily_limits_unknown(self, tmp_path):
|
||||
"""Return None for unknown category."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {"version": "1.0.0", "daily_limits": {}}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_daily_limits("unknown") is None
|
||||
|
||||
|
||||
class TestTokenRulesComputeTransaction:
|
||||
"""Test compute_transaction method."""
|
||||
|
||||
def test_compute_successful_transaction(self, tmp_path):
|
||||
"""Compute transaction for valid event."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"pr_merged": {"description": "PR merged", "reward": 10, "category": "merge"}
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
result = rules.compute_transaction("pr_merged", current_tokens=100)
|
||||
|
||||
assert result["event"] == "pr_merged"
|
||||
assert result["delta"] == 10
|
||||
assert result["category"] == "merge"
|
||||
assert result["allowed"] is True
|
||||
assert result["new_balance"] == 110
|
||||
assert result["limit_reached"] is False
|
||||
|
||||
def test_compute_unknown_event(self, tmp_path):
|
||||
"""Compute transaction for unknown event."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
result = rules.compute_transaction("unknown_event", current_tokens=50)
|
||||
|
||||
assert result["event"] == "unknown_event"
|
||||
assert result["delta"] == 0
|
||||
assert result["allowed"] is False
|
||||
assert result["reason"] == "unknown_event"
|
||||
assert result["new_balance"] == 50
|
||||
|
||||
def test_compute_with_gate_check(self, tmp_path):
|
||||
"""Compute transaction respects gating."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"sensitive_op": {
|
||||
"description": "Sensitive",
|
||||
"reward": 50,
|
||||
"category": "sensitive",
|
||||
"gate_threshold": 100,
|
||||
}
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
# With enough tokens
|
||||
result = rules.compute_transaction("sensitive_op", current_tokens=150)
|
||||
assert result["allowed"] is True
|
||||
|
||||
# Without enough tokens
|
||||
result = rules.compute_transaction("sensitive_op", current_tokens=50)
|
||||
assert result["allowed"] is False
|
||||
assert "gate_reason" in result
|
||||
|
||||
def test_compute_with_daily_limits(self, tmp_path):
|
||||
"""Compute transaction respects daily limits."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"triage_action": {
|
||||
"description": "Triage",
|
||||
"reward": 20,
|
||||
"category": "triage",
|
||||
}
|
||||
},
|
||||
"daily_limits": {"triage": {"max_earn": 50, "max_spend": 0}},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
# Within limit
|
||||
daily_earned = {"triage": 20}
|
||||
result = rules.compute_transaction(
|
||||
"triage_action", current_tokens=100, current_daily_earned=daily_earned
|
||||
)
|
||||
assert result["allowed"] is True
|
||||
assert result["limit_reached"] is False
|
||||
|
||||
# Would exceed limit (20 + 20 > 50 is false, so this should be fine)
|
||||
# Let's test with higher current earned
|
||||
daily_earned = {"triage": 40}
|
||||
result = rules.compute_transaction(
|
||||
"triage_action", current_tokens=100, current_daily_earned=daily_earned
|
||||
)
|
||||
assert result["allowed"] is False
|
||||
assert result["limit_reached"] is True
|
||||
assert "limit_reason" in result
|
||||
|
||||
|
||||
class TestTokenRulesCategories:
|
||||
"""Test category methods."""
|
||||
|
||||
def test_get_categories(self, tmp_path):
|
||||
"""Get all unique categories."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
|
||||
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
|
||||
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
categories = rules.get_categories()
|
||||
|
||||
assert sorted(categories) == ["cat1", "cat2"]
|
||||
|
||||
|
||||
class TestTokenRulesAudit:
|
||||
"""Test audit methods."""
|
||||
|
||||
def test_is_auditable_true(self, tmp_path):
|
||||
"""Check if auditable when enabled."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {"version": "1.0.0", "audit": {"log_all_transactions": True}}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
assert rules.is_auditable() is True
|
||||
|
||||
def test_is_auditable_false(self, tmp_path):
|
||||
"""Check if auditable when disabled."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {"version": "1.0.0", "audit": {"log_all_transactions": False}}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
assert rules.is_auditable() is False
|
||||
|
||||
|
||||
class TestConvenienceFunctions:
|
||||
"""Test module-level convenience functions."""
|
||||
|
||||
def test_get_token_delta(self, tmp_path):
|
||||
"""Convenience function returns delta."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
|
||||
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
||||
delta = tr.get_token_delta("pr_merged")
|
||||
assert delta == 10 # From fallback
|
||||
|
||||
def test_check_operation_gate(self, tmp_path):
|
||||
"""Convenience function checks gate."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
|
||||
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
||||
# Fallback has pr_merge gate at 0
|
||||
assert tr.check_operation_gate("pr_merge", current_tokens=0) is True
|
||||
assert tr.check_operation_gate("pr_merge", current_tokens=100) is True
|
||||
|
||||
def test_compute_token_reward(self, tmp_path):
|
||||
"""Convenience function computes reward."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
|
||||
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
||||
result = tr.compute_token_reward("pr_merged", current_tokens=50)
|
||||
assert result["event"] == "pr_merged"
|
||||
assert result["delta"] == 10
|
||||
assert result["new_balance"] == 60
|
||||
|
||||
def test_list_token_events(self, tmp_path):
|
||||
"""Convenience function lists events."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
|
||||
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
||||
events = tr.list_token_events()
|
||||
assert len(events) >= 3 # Fallback has at least 3 events
|
||||
|
||||
# Check structure
|
||||
for event in events:
|
||||
assert "name" in event
|
||||
assert "description" in event
|
||||
assert "delta" in event
|
||||
assert "category" in event
|
||||
343
tests/timmy_automations/test_weekly_narrative.py
Normal file
343
tests/timmy_automations/test_weekly_narrative.py
Normal file
@@ -0,0 +1,343 @@
|
||||
"""Tests for weekly_narrative.py script."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
# Add timmy_automations to path for imports
|
||||
sys.path.insert(
|
||||
0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations" / "daily_run")
|
||||
)
|
||||
|
||||
import weekly_narrative as wn
|
||||
|
||||
|
||||
class TestParseTimestamp:
|
||||
"""Test timestamp parsing."""
|
||||
|
||||
def test_parse_iso_with_z(self):
|
||||
"""Parse ISO timestamp with Z suffix."""
|
||||
result = wn.parse_ts("2026-03-21T12:00:00Z")
|
||||
assert result is not None
|
||||
assert result.year == 2026
|
||||
assert result.month == 3
|
||||
assert result.day == 21
|
||||
|
||||
def test_parse_iso_with_offset(self):
|
||||
"""Parse ISO timestamp with timezone offset."""
|
||||
result = wn.parse_ts("2026-03-21T12:00:00+00:00")
|
||||
assert result is not None
|
||||
assert result.year == 2026
|
||||
|
||||
def test_parse_empty_string(self):
|
||||
"""Empty string returns None."""
|
||||
result = wn.parse_ts("")
|
||||
assert result is None
|
||||
|
||||
def test_parse_invalid_string(self):
|
||||
"""Invalid string returns None."""
|
||||
result = wn.parse_ts("not-a-timestamp")
|
||||
assert result is None
|
||||
|
||||
|
||||
class TestCollectCyclesData:
|
||||
"""Test cycle data collection."""
|
||||
|
||||
def test_no_cycles_file(self, tmp_path):
|
||||
"""Handle missing cycles file gracefully."""
|
||||
with patch.object(wn, "REPO_ROOT", tmp_path):
|
||||
since = datetime.now(UTC) - timedelta(days=7)
|
||||
result = wn.collect_cycles_data(since)
|
||||
assert result["total"] == 0
|
||||
assert result["successes"] == 0
|
||||
assert result["failures"] == 0
|
||||
|
||||
def test_collect_recent_cycles(self, tmp_path):
|
||||
"""Collect cycles within lookback period."""
|
||||
retro_dir = tmp_path / ".loop" / "retro"
|
||||
retro_dir.mkdir(parents=True)
|
||||
|
||||
now = datetime.now(UTC)
|
||||
cycles = [
|
||||
{"timestamp": now.isoformat(), "success": True, "cycle": 1},
|
||||
{"timestamp": now.isoformat(), "success": False, "cycle": 2},
|
||||
{"timestamp": (now - timedelta(days=10)).isoformat(), "success": True, "cycle": 3},
|
||||
]
|
||||
|
||||
with open(retro_dir / "cycles.jsonl", "w") as f:
|
||||
for c in cycles:
|
||||
f.write(json.dumps(c) + "\n")
|
||||
|
||||
with patch.object(wn, "REPO_ROOT", tmp_path):
|
||||
since = now - timedelta(days=7)
|
||||
result = wn.collect_cycles_data(since)
|
||||
assert result["total"] == 2 # Only recent 2
|
||||
assert result["successes"] == 1
|
||||
assert result["failures"] == 1
|
||||
|
||||
|
||||
class TestExtractThemes:
|
||||
"""Test theme extraction from issues."""
|
||||
|
||||
def test_extract_layer_labels(self):
|
||||
"""Extract layer labels from issues."""
|
||||
issues = [
|
||||
{"labels": [{"name": "layer:triage"}, {"name": "bug"}]},
|
||||
{"labels": [{"name": "layer:tests"}, {"name": "bug"}]},
|
||||
{"labels": [{"name": "layer:triage"}, {"name": "feature"}]},
|
||||
]
|
||||
|
||||
result = wn.extract_themes(issues)
|
||||
|
||||
assert len(result["layers"]) == 2
|
||||
layer_names = {layer["name"] for layer in result["layers"]}
|
||||
assert "triage" in layer_names
|
||||
assert "tests" in layer_names
|
||||
|
||||
def test_extract_type_labels(self):
|
||||
"""Extract type labels (bug/feature/etc)."""
|
||||
issues = [
|
||||
{"labels": [{"name": "bug"}]},
|
||||
{"labels": [{"name": "feature"}]},
|
||||
{"labels": [{"name": "bug"}]},
|
||||
]
|
||||
|
||||
result = wn.extract_themes(issues)
|
||||
|
||||
type_names = {t_type["name"] for t_type in result["types"]}
|
||||
assert "bug" in type_names
|
||||
assert "feature" in type_names
|
||||
|
||||
def test_empty_issues(self):
|
||||
"""Handle empty issue list."""
|
||||
result = wn.extract_themes([])
|
||||
assert result["layers"] == []
|
||||
assert result["types"] == []
|
||||
assert result["top_labels"] == []
|
||||
|
||||
|
||||
class TestExtractAgentContributions:
|
||||
"""Test agent contribution extraction."""
|
||||
|
||||
def test_extract_assignees(self):
|
||||
"""Extract assignee counts."""
|
||||
issues = [
|
||||
{"assignee": {"login": "kimi"}},
|
||||
{"assignee": {"login": "hermes"}},
|
||||
{"assignee": {"login": "kimi"}},
|
||||
]
|
||||
|
||||
result = wn.extract_agent_contributions(issues, [], [])
|
||||
|
||||
assert len(result["active_assignees"]) == 2
|
||||
assignee_logins = {a["login"] for a in result["active_assignees"]} # noqa: E741
|
||||
assert "kimi" in assignee_logins
|
||||
assert "hermes" in assignee_logins
|
||||
|
||||
def test_extract_pr_authors(self):
|
||||
"""Extract PR author counts."""
|
||||
prs = [
|
||||
{"user": {"login": "kimi"}},
|
||||
{"user": {"login": "claude"}},
|
||||
{"user": {"login": "kimi"}},
|
||||
]
|
||||
|
||||
result = wn.extract_agent_contributions([], prs, [])
|
||||
|
||||
assert len(result["pr_authors"]) == 2
|
||||
|
||||
def test_kimi_mentions_in_cycles(self):
|
||||
"""Count Kimi mentions in cycle notes."""
|
||||
cycles = [
|
||||
{"notes": "Kimi did great work", "reason": ""},
|
||||
{"notes": "", "reason": "Kimi timeout"},
|
||||
{"notes": "All good", "reason": ""},
|
||||
]
|
||||
|
||||
result = wn.extract_agent_contributions([], [], cycles)
|
||||
assert result["kimi_mentioned_cycles"] == 2
|
||||
|
||||
|
||||
class TestAnalyzeTestShifts:
|
||||
"""Test test pattern analysis."""
|
||||
|
||||
def test_no_cycles(self):
|
||||
"""Handle no cycle data."""
|
||||
result = wn.analyze_test_shifts([])
|
||||
assert "note" in result
|
||||
|
||||
def test_test_metrics(self):
|
||||
"""Calculate test metrics from cycles."""
|
||||
cycles = [
|
||||
{"tests_passed": 100, "tests_added": 5},
|
||||
{"tests_passed": 150, "tests_added": 3},
|
||||
]
|
||||
|
||||
result = wn.analyze_test_shifts(cycles)
|
||||
|
||||
assert result["total_tests_passed"] == 250
|
||||
assert result["total_tests_added"] == 8
|
||||
|
||||
|
||||
class TestGenerateVibeSummary:
|
||||
"""Test vibe summary generation."""
|
||||
|
||||
def test_productive_vibe(self):
|
||||
"""High success rate and activity = productive vibe."""
|
||||
cycles_data = {"success_rate": 0.95, "successes": 10, "failures": 1}
|
||||
issues_data = {"closed_count": 5}
|
||||
|
||||
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
|
||||
|
||||
assert result["overall"] == "productive"
|
||||
assert "strong week" in result["description"].lower()
|
||||
|
||||
def test_struggling_vibe(self):
|
||||
"""More failures than successes = struggling vibe."""
|
||||
cycles_data = {"success_rate": 0.3, "successes": 3, "failures": 7}
|
||||
issues_data = {"closed_count": 0}
|
||||
|
||||
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
|
||||
|
||||
assert result["overall"] == "struggling"
|
||||
|
||||
def test_quiet_vibe(self):
|
||||
"""Low activity = quiet vibe."""
|
||||
cycles_data = {"success_rate": 0.0, "successes": 0, "failures": 0}
|
||||
issues_data = {"closed_count": 0}
|
||||
|
||||
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
|
||||
|
||||
assert result["overall"] == "quiet"
|
||||
|
||||
|
||||
class TestGenerateMarkdownSummary:
|
||||
"""Test markdown summary generation."""
|
||||
|
||||
def test_includes_header(self):
|
||||
"""Markdown includes header."""
|
||||
narrative = {
|
||||
"period": {"start": "2026-03-14T00:00:00", "end": "2026-03-21T00:00:00"},
|
||||
"vibe": {"overall": "productive", "description": "Good week"},
|
||||
"activity": {
|
||||
"cycles": {"total": 10, "successes": 9, "failures": 1},
|
||||
"issues": {"closed": 5, "opened": 3},
|
||||
"pull_requests": {"merged": 4, "opened": 2},
|
||||
},
|
||||
}
|
||||
|
||||
result = wn.generate_markdown_summary(narrative)
|
||||
|
||||
assert "# Weekly Narrative Summary" in result
|
||||
assert "productive" in result.lower()
|
||||
assert "10 total" in result or "10" in result
|
||||
|
||||
def test_includes_focus_areas(self):
|
||||
"""Markdown includes focus areas when present."""
|
||||
narrative = {
|
||||
"period": {"start": "2026-03-14", "end": "2026-03-21"},
|
||||
"vibe": {
|
||||
"overall": "productive",
|
||||
"description": "Good week",
|
||||
"focus_areas": ["triage (5 items)", "tests (3 items)"],
|
||||
},
|
||||
"activity": {
|
||||
"cycles": {"total": 0, "successes": 0, "failures": 0},
|
||||
"issues": {"closed": 0, "opened": 0},
|
||||
"pull_requests": {"merged": 0, "opened": 0},
|
||||
},
|
||||
}
|
||||
|
||||
result = wn.generate_markdown_summary(narrative)
|
||||
|
||||
assert "Focus Areas" in result
|
||||
assert "triage" in result
|
||||
|
||||
|
||||
class TestConfigLoading:
|
||||
"""Test configuration loading."""
|
||||
|
||||
def test_default_config(self, tmp_path):
|
||||
"""Default config when manifest missing."""
|
||||
with patch.object(wn, "CONFIG_PATH", tmp_path / "nonexistent.json"):
|
||||
config = wn.load_automation_config()
|
||||
assert config["lookback_days"] == 7
|
||||
assert config["enabled"] is True
|
||||
|
||||
def test_environment_override(self, tmp_path):
|
||||
"""Environment variables override config."""
|
||||
with patch.dict("os.environ", {"TIMMY_WEEKLY_NARRATIVE_ENABLED": "false"}):
|
||||
with patch.object(wn, "CONFIG_PATH", tmp_path / "nonexistent.json"):
|
||||
config = wn.load_automation_config()
|
||||
assert config["enabled"] is False
|
||||
|
||||
|
||||
class TestMain:
|
||||
"""Test main function."""
|
||||
|
||||
def test_disabled_exits_cleanly(self, tmp_path):
|
||||
"""When disabled and no --force, exits cleanly."""
|
||||
with patch.object(wn, "REPO_ROOT", tmp_path):
|
||||
with patch.object(wn, "load_automation_config", return_value={"enabled": False}):
|
||||
with patch("sys.argv", ["weekly_narrative"]):
|
||||
result = wn.main()
|
||||
assert result == 0
|
||||
|
||||
def test_force_runs_when_disabled(self, tmp_path):
|
||||
"""--force runs even when disabled."""
|
||||
# Setup minimal structure
|
||||
(tmp_path / ".loop" / "retro").mkdir(parents=True)
|
||||
|
||||
with patch.object(wn, "REPO_ROOT", tmp_path):
|
||||
with patch.object(
|
||||
wn,
|
||||
"load_automation_config",
|
||||
return_value={
|
||||
"enabled": False,
|
||||
"lookback_days": 7,
|
||||
"gitea_api": "http://localhost:3000/api/v1",
|
||||
"repo_slug": "test/repo",
|
||||
"token_file": "~/.hermes/gitea_token",
|
||||
},
|
||||
):
|
||||
with patch.object(wn, "GiteaClient") as mock_client:
|
||||
mock_instance = MagicMock()
|
||||
mock_instance.is_available.return_value = False
|
||||
mock_client.return_value = mock_instance
|
||||
|
||||
with patch("sys.argv", ["weekly_narrative", "--force"]):
|
||||
result = wn.main()
|
||||
# Should complete without error even though Gitea unavailable
|
||||
assert result == 0
|
||||
|
||||
|
||||
class TestGiteaClient:
|
||||
"""Test Gitea API client."""
|
||||
|
||||
def test_is_available_when_unavailable(self):
|
||||
"""is_available returns False when server down."""
|
||||
config = {"gitea_api": "http://localhost:99999", "repo_slug": "test/repo"}
|
||||
client = wn.GiteaClient(config, None)
|
||||
|
||||
# Should return False without raising
|
||||
assert client.is_available() is False
|
||||
|
||||
def test_headers_with_token(self):
|
||||
"""Headers include Authorization when token provided."""
|
||||
config = {"gitea_api": "http://localhost:3000", "repo_slug": "test/repo"}
|
||||
client = wn.GiteaClient(config, "test-token")
|
||||
|
||||
headers = client._headers()
|
||||
assert headers["Authorization"] == "token test-token"
|
||||
|
||||
def test_headers_without_token(self):
|
||||
"""Headers don't include Authorization when no token."""
|
||||
config = {"gitea_api": "http://localhost:3000", "repo_slug": "test/repo"}
|
||||
client = wn.GiteaClient(config, None)
|
||||
|
||||
headers = client._headers()
|
||||
assert "Authorization" not in headers
|
||||
489
tests/unit/test_quest_system.py
Normal file
489
tests/unit/test_quest_system.py
Normal file
@@ -0,0 +1,489 @@
|
||||
"""Unit tests for the quest system.
|
||||
|
||||
Tests quest definitions, progress tracking, completion detection,
|
||||
and token rewards.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.quest_system import (
|
||||
QuestDefinition,
|
||||
QuestProgress,
|
||||
QuestStatus,
|
||||
QuestType,
|
||||
_is_on_cooldown,
|
||||
claim_quest_reward,
|
||||
evaluate_quest_progress,
|
||||
get_or_create_progress,
|
||||
get_quest_definition,
|
||||
get_quest_leaderboard,
|
||||
load_quest_config,
|
||||
reset_quest_progress,
|
||||
update_quest_progress,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def clean_quest_state():
|
||||
"""Reset quest progress between tests."""
|
||||
reset_quest_progress()
|
||||
yield
|
||||
reset_quest_progress()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_issue_count_quest():
|
||||
"""Create a sample issue_count quest definition."""
|
||||
return QuestDefinition(
|
||||
id="test_close_issues",
|
||||
name="Test Issue Closer",
|
||||
description="Close 3 test issues",
|
||||
reward_tokens=100,
|
||||
quest_type=QuestType.ISSUE_COUNT,
|
||||
enabled=True,
|
||||
repeatable=False,
|
||||
cooldown_hours=0,
|
||||
criteria={"target_count": 3, "issue_labels": ["test"]},
|
||||
notification_message="Test quest complete! Earned {tokens} tokens.",
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_daily_run_quest():
|
||||
"""Create a sample daily_run quest definition."""
|
||||
return QuestDefinition(
|
||||
id="test_daily_run",
|
||||
name="Test Daily Runner",
|
||||
description="Complete 5 sessions",
|
||||
reward_tokens=250,
|
||||
quest_type=QuestType.DAILY_RUN,
|
||||
enabled=True,
|
||||
repeatable=True,
|
||||
cooldown_hours=24,
|
||||
criteria={"min_sessions": 5},
|
||||
notification_message="Daily run quest complete! Earned {tokens} tokens.",
|
||||
)
|
||||
|
||||
|
||||
# ── Quest Definition Tests ───────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestQuestDefinition:
|
||||
def test_from_dict_minimal(self):
|
||||
data = {"id": "test_quest", "name": "Test Quest"}
|
||||
quest = QuestDefinition.from_dict(data)
|
||||
assert quest.id == "test_quest"
|
||||
assert quest.name == "Test Quest"
|
||||
assert quest.quest_type == QuestType.CUSTOM
|
||||
assert quest.enabled is True
|
||||
|
||||
def test_from_dict_full(self):
|
||||
data = {
|
||||
"id": "full_quest",
|
||||
"name": "Full Quest",
|
||||
"description": "A test quest",
|
||||
"reward_tokens": 500,
|
||||
"type": "issue_count",
|
||||
"enabled": False,
|
||||
"repeatable": True,
|
||||
"cooldown_hours": 12,
|
||||
"criteria": {"target_count": 5},
|
||||
"notification_message": "Done!",
|
||||
}
|
||||
quest = QuestDefinition.from_dict(data)
|
||||
assert quest.id == "full_quest"
|
||||
assert quest.reward_tokens == 500
|
||||
assert quest.quest_type == QuestType.ISSUE_COUNT
|
||||
assert quest.enabled is False
|
||||
assert quest.repeatable is True
|
||||
assert quest.cooldown_hours == 12
|
||||
|
||||
|
||||
# ── Quest Progress Tests ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestQuestProgress:
|
||||
def test_progress_creation(self):
|
||||
progress = QuestProgress(
|
||||
quest_id="test_quest",
|
||||
agent_id="test_agent",
|
||||
status=QuestStatus.NOT_STARTED,
|
||||
)
|
||||
assert progress.quest_id == "test_quest"
|
||||
assert progress.agent_id == "test_agent"
|
||||
assert progress.current_value == 0
|
||||
|
||||
def test_progress_to_dict(self):
|
||||
progress = QuestProgress(
|
||||
quest_id="test_quest",
|
||||
agent_id="test_agent",
|
||||
status=QuestStatus.IN_PROGRESS,
|
||||
current_value=2,
|
||||
target_value=5,
|
||||
)
|
||||
data = progress.to_dict()
|
||||
assert data["quest_id"] == "test_quest"
|
||||
assert data["status"] == "in_progress"
|
||||
assert data["current_value"] == 2
|
||||
|
||||
|
||||
# ── Quest Loading Tests ──────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestQuestLoading:
|
||||
def test_load_quest_config(self):
|
||||
definitions, settings = load_quest_config()
|
||||
assert isinstance(definitions, dict)
|
||||
assert isinstance(settings, dict)
|
||||
|
||||
def test_get_quest_definition_exists(self):
|
||||
# Should return None for non-existent quest in fresh state
|
||||
quest = get_quest_definition("nonexistent")
|
||||
# The function returns from loaded config, which may have quests
|
||||
# or be empty if config doesn't exist
|
||||
assert quest is None or isinstance(quest, QuestDefinition)
|
||||
|
||||
def test_get_quest_definition_not_found(self):
|
||||
quest = get_quest_definition("definitely_not_a_real_quest_12345")
|
||||
assert quest is None
|
||||
|
||||
|
||||
# ── Quest Progress Management Tests ─────────────────────────────────────
|
||||
|
||||
|
||||
class TestQuestProgressManagement:
|
||||
def test_get_or_create_progress_new(self):
|
||||
# First create a quest definition
|
||||
quest = QuestDefinition(
|
||||
id="progress_test",
|
||||
name="Progress Test",
|
||||
description="Test quest",
|
||||
reward_tokens=100,
|
||||
quest_type=QuestType.ISSUE_COUNT,
|
||||
enabled=True,
|
||||
repeatable=False,
|
||||
cooldown_hours=0,
|
||||
criteria={"target_count": 3},
|
||||
notification_message="Done!",
|
||||
)
|
||||
|
||||
# Need to inject into the definitions dict
|
||||
from timmy.quest_system import _quest_definitions
|
||||
|
||||
_quest_definitions["progress_test"] = quest
|
||||
|
||||
progress = get_or_create_progress("progress_test", "agent1")
|
||||
assert progress.quest_id == "progress_test"
|
||||
assert progress.agent_id == "agent1"
|
||||
assert progress.status == QuestStatus.NOT_STARTED
|
||||
assert progress.target_value == 3
|
||||
|
||||
del _quest_definitions["progress_test"]
|
||||
|
||||
def test_update_quest_progress(self):
|
||||
quest = QuestDefinition(
|
||||
id="update_test",
|
||||
name="Update Test",
|
||||
description="Test quest",
|
||||
reward_tokens=100,
|
||||
quest_type=QuestType.ISSUE_COUNT,
|
||||
enabled=True,
|
||||
repeatable=False,
|
||||
cooldown_hours=0,
|
||||
criteria={"target_count": 3},
|
||||
notification_message="Done!",
|
||||
)
|
||||
|
||||
from timmy.quest_system import _quest_definitions
|
||||
|
||||
_quest_definitions["update_test"] = quest
|
||||
|
||||
# Create initial progress
|
||||
progress = get_or_create_progress("update_test", "agent1")
|
||||
assert progress.current_value == 0
|
||||
|
||||
# Update progress
|
||||
updated = update_quest_progress("update_test", "agent1", 2)
|
||||
assert updated.current_value == 2
|
||||
assert updated.status == QuestStatus.NOT_STARTED
|
||||
|
||||
# Complete the quest
|
||||
completed = update_quest_progress("update_test", "agent1", 3)
|
||||
assert completed.current_value == 3
|
||||
assert completed.status == QuestStatus.COMPLETED
|
||||
assert completed.completed_at != ""
|
||||
|
||||
del _quest_definitions["update_test"]
|
||||
|
||||
|
||||
# ── Quest Evaluation Tests ───────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestQuestEvaluation:
|
||||
def test_evaluate_issue_count_quest(self):
|
||||
quest = QuestDefinition(
|
||||
id="eval_test",
|
||||
name="Eval Test",
|
||||
description="Test quest",
|
||||
reward_tokens=100,
|
||||
quest_type=QuestType.ISSUE_COUNT,
|
||||
enabled=True,
|
||||
repeatable=False,
|
||||
cooldown_hours=0,
|
||||
criteria={"target_count": 2, "issue_labels": ["test"]},
|
||||
notification_message="Done!",
|
||||
)
|
||||
|
||||
from timmy.quest_system import _quest_definitions
|
||||
|
||||
_quest_definitions["eval_test"] = quest
|
||||
|
||||
# Simulate closed issues
|
||||
closed_issues = [
|
||||
{"id": 1, "labels": [{"name": "test"}]},
|
||||
{"id": 2, "labels": [{"name": "test"}, {"name": "bug"}]},
|
||||
{"id": 3, "labels": [{"name": "other"}]},
|
||||
]
|
||||
|
||||
context = {"closed_issues": closed_issues}
|
||||
progress = evaluate_quest_progress("eval_test", "agent1", context)
|
||||
|
||||
assert progress is not None
|
||||
assert progress.current_value == 2 # Two issues with 'test' label
|
||||
|
||||
del _quest_definitions["eval_test"]
|
||||
|
||||
def test_evaluate_issue_reduce_quest(self):
|
||||
quest = QuestDefinition(
|
||||
id="reduce_test",
|
||||
name="Reduce Test",
|
||||
description="Test quest",
|
||||
reward_tokens=200,
|
||||
quest_type=QuestType.ISSUE_REDUCE,
|
||||
enabled=True,
|
||||
repeatable=False,
|
||||
cooldown_hours=0,
|
||||
criteria={"target_reduction": 2},
|
||||
notification_message="Done!",
|
||||
)
|
||||
|
||||
from timmy.quest_system import _quest_definitions
|
||||
|
||||
_quest_definitions["reduce_test"] = quest
|
||||
|
||||
context = {"previous_issue_count": 10, "current_issue_count": 7}
|
||||
progress = evaluate_quest_progress("reduce_test", "agent1", context)
|
||||
|
||||
assert progress is not None
|
||||
assert progress.current_value == 3 # Reduced by 3
|
||||
|
||||
del _quest_definitions["reduce_test"]
|
||||
|
||||
def test_evaluate_daily_run_quest(self):
|
||||
quest = QuestDefinition(
|
||||
id="daily_test",
|
||||
name="Daily Test",
|
||||
description="Test quest",
|
||||
reward_tokens=250,
|
||||
quest_type=QuestType.DAILY_RUN,
|
||||
enabled=True,
|
||||
repeatable=True,
|
||||
cooldown_hours=24,
|
||||
criteria={"min_sessions": 5},
|
||||
notification_message="Done!",
|
||||
)
|
||||
|
||||
from timmy.quest_system import _quest_definitions
|
||||
|
||||
_quest_definitions["daily_test"] = quest
|
||||
|
||||
context = {"sessions_completed": 5}
|
||||
progress = evaluate_quest_progress("daily_test", "agent1", context)
|
||||
|
||||
assert progress is not None
|
||||
assert progress.current_value == 5
|
||||
assert progress.status == QuestStatus.COMPLETED
|
||||
|
||||
del _quest_definitions["daily_test"]
|
||||
|
||||
|
||||
# ── Quest Cooldown Tests ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestQuestCooldown:
|
||||
def test_is_on_cooldown_no_cooldown(self):
|
||||
quest = QuestDefinition(
|
||||
id="cooldown_test",
|
||||
name="Cooldown Test",
|
||||
description="Test quest",
|
||||
reward_tokens=100,
|
||||
quest_type=QuestType.ISSUE_COUNT,
|
||||
enabled=True,
|
||||
repeatable=True,
|
||||
cooldown_hours=24,
|
||||
criteria={},
|
||||
notification_message="Done!",
|
||||
)
|
||||
|
||||
progress = QuestProgress(
|
||||
quest_id="cooldown_test",
|
||||
agent_id="agent1",
|
||||
status=QuestStatus.CLAIMED,
|
||||
)
|
||||
|
||||
# No last_completed_at means no cooldown
|
||||
assert _is_on_cooldown(progress, quest) is False
|
||||
|
||||
|
||||
# ── Quest Reward Tests ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestQuestReward:
|
||||
def test_claim_quest_reward_not_completed(self):
|
||||
quest = QuestDefinition(
|
||||
id="reward_test",
|
||||
name="Reward Test",
|
||||
description="Test quest",
|
||||
reward_tokens=100,
|
||||
quest_type=QuestType.ISSUE_COUNT,
|
||||
enabled=True,
|
||||
repeatable=False,
|
||||
cooldown_hours=0,
|
||||
criteria={"target_count": 3},
|
||||
notification_message="Done!",
|
||||
)
|
||||
|
||||
from timmy.quest_system import _quest_definitions, _quest_progress
|
||||
|
||||
_quest_definitions["reward_test"] = quest
|
||||
|
||||
# Create progress but don't complete
|
||||
progress = get_or_create_progress("reward_test", "agent1")
|
||||
_quest_progress["agent1:reward_test"] = progress
|
||||
|
||||
# Try to claim - should fail
|
||||
reward = claim_quest_reward("reward_test", "agent1")
|
||||
assert reward is None
|
||||
|
||||
del _quest_definitions["reward_test"]
|
||||
|
||||
|
||||
# ── Leaderboard Tests ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestQuestLeaderboard:
|
||||
def test_get_quest_leaderboard_empty(self):
|
||||
reset_quest_progress()
|
||||
leaderboard = get_quest_leaderboard()
|
||||
assert leaderboard == []
|
||||
|
||||
def test_get_quest_leaderboard_with_data(self):
|
||||
# Create and complete a quest for two agents
|
||||
quest = QuestDefinition(
|
||||
id="leaderboard_test",
|
||||
name="Leaderboard Test",
|
||||
description="Test quest",
|
||||
reward_tokens=100,
|
||||
quest_type=QuestType.ISSUE_COUNT,
|
||||
enabled=True,
|
||||
repeatable=True,
|
||||
cooldown_hours=0,
|
||||
criteria={"target_count": 1},
|
||||
notification_message="Done!",
|
||||
)
|
||||
|
||||
from timmy.quest_system import _quest_definitions, _quest_progress
|
||||
|
||||
_quest_definitions["leaderboard_test"] = quest
|
||||
|
||||
# Create progress for agent1 with 2 completions
|
||||
progress1 = QuestProgress(
|
||||
quest_id="leaderboard_test",
|
||||
agent_id="agent1",
|
||||
status=QuestStatus.NOT_STARTED,
|
||||
completion_count=2,
|
||||
)
|
||||
_quest_progress["agent1:leaderboard_test"] = progress1
|
||||
|
||||
# Create progress for agent2 with 1 completion
|
||||
progress2 = QuestProgress(
|
||||
quest_id="leaderboard_test",
|
||||
agent_id="agent2",
|
||||
status=QuestStatus.NOT_STARTED,
|
||||
completion_count=1,
|
||||
)
|
||||
_quest_progress["agent2:leaderboard_test"] = progress2
|
||||
|
||||
leaderboard = get_quest_leaderboard()
|
||||
|
||||
assert len(leaderboard) == 2
|
||||
# agent1 should be first (more tokens)
|
||||
assert leaderboard[0]["agent_id"] == "agent1"
|
||||
assert leaderboard[0]["total_tokens"] == 200
|
||||
assert leaderboard[1]["agent_id"] == "agent2"
|
||||
assert leaderboard[1]["total_tokens"] == 100
|
||||
|
||||
del _quest_definitions["leaderboard_test"]
|
||||
|
||||
|
||||
# ── Quest Reset Tests ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestQuestReset:
|
||||
def test_reset_quest_progress_all(self):
|
||||
# Create some progress entries
|
||||
progress1 = QuestProgress(
|
||||
quest_id="quest1", agent_id="agent1", status=QuestStatus.NOT_STARTED
|
||||
)
|
||||
progress2 = QuestProgress(
|
||||
quest_id="quest2", agent_id="agent2", status=QuestStatus.NOT_STARTED
|
||||
)
|
||||
|
||||
from timmy.quest_system import _quest_progress
|
||||
|
||||
_quest_progress["agent1:quest1"] = progress1
|
||||
_quest_progress["agent2:quest2"] = progress2
|
||||
|
||||
assert len(_quest_progress) == 2
|
||||
|
||||
count = reset_quest_progress()
|
||||
assert count == 2
|
||||
assert len(_quest_progress) == 0
|
||||
|
||||
def test_reset_quest_progress_specific_quest(self):
|
||||
progress1 = QuestProgress(
|
||||
quest_id="quest1", agent_id="agent1", status=QuestStatus.NOT_STARTED
|
||||
)
|
||||
progress2 = QuestProgress(
|
||||
quest_id="quest2", agent_id="agent1", status=QuestStatus.NOT_STARTED
|
||||
)
|
||||
|
||||
from timmy.quest_system import _quest_progress
|
||||
|
||||
_quest_progress["agent1:quest1"] = progress1
|
||||
_quest_progress["agent1:quest2"] = progress2
|
||||
|
||||
count = reset_quest_progress(quest_id="quest1")
|
||||
assert count == 1
|
||||
assert "agent1:quest1" not in _quest_progress
|
||||
assert "agent1:quest2" in _quest_progress
|
||||
|
||||
def test_reset_quest_progress_specific_agent(self):
|
||||
progress1 = QuestProgress(
|
||||
quest_id="quest1", agent_id="agent1", status=QuestStatus.NOT_STARTED
|
||||
)
|
||||
progress2 = QuestProgress(
|
||||
quest_id="quest1", agent_id="agent2", status=QuestStatus.NOT_STARTED
|
||||
)
|
||||
|
||||
from timmy.quest_system import _quest_progress
|
||||
|
||||
_quest_progress["agent1:quest1"] = progress1
|
||||
_quest_progress["agent2:quest1"] = progress2
|
||||
|
||||
count = reset_quest_progress(agent_id="agent1")
|
||||
assert count == 1
|
||||
assert "agent1:quest1" not in _quest_progress
|
||||
assert "agent2:quest1" in _quest_progress
|
||||
@@ -211,6 +211,44 @@
|
||||
"agenda_time_minutes": 10
|
||||
},
|
||||
"outputs": []
|
||||
},
|
||||
{
|
||||
"id": "golden_path",
|
||||
"name": "Golden Path Generator",
|
||||
"description": "Generates coherent 30-60 minute mini-sessions from real Gitea issues — triage, fixes, and tests",
|
||||
"script": "timmy_automations/daily_run/golden_path.py",
|
||||
"category": "daily_run",
|
||||
"enabled": true,
|
||||
"trigger": "manual",
|
||||
"executable": "python3",
|
||||
"config": {
|
||||
"target_minutes": 45,
|
||||
"size_labels": ["size:XS", "size:S", "size:M"],
|
||||
"min_items": 3,
|
||||
"max_items": 5
|
||||
},
|
||||
"outputs": []
|
||||
},
|
||||
{
|
||||
"id": "weekly_narrative",
|
||||
"name": "Weekly Narrative Summary",
|
||||
"description": "Generates a human-readable weekly summary of work themes, agent contributions, and token economy shifts",
|
||||
"script": "timmy_automations/daily_run/weekly_narrative.py",
|
||||
"category": "daily_run",
|
||||
"enabled": true,
|
||||
"trigger": "scheduled",
|
||||
"schedule": "weekly",
|
||||
"executable": "python3",
|
||||
"config": {
|
||||
"lookback_days": 7,
|
||||
"output_file": ".loop/weekly_narrative.json",
|
||||
"gitea_api": "http://localhost:3000/api/v1",
|
||||
"repo_slug": "rockachopa/Timmy-time-dashboard"
|
||||
},
|
||||
"outputs": [
|
||||
".loop/weekly_narrative.json",
|
||||
".loop/weekly_narrative.md"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -17,6 +17,10 @@
|
||||
"manual": {
|
||||
"description": "Run on-demand only",
|
||||
"automations": ["agent_workspace", "kimi_bootstrap", "kimi_resume", "backfill_retro"]
|
||||
},
|
||||
"weekly": {
|
||||
"description": "Run once per week (Sundays)",
|
||||
"automations": ["weekly_narrative"]
|
||||
}
|
||||
},
|
||||
"triggers": {
|
||||
|
||||
138
timmy_automations/config/token_rules.yaml
Normal file
138
timmy_automations/config/token_rules.yaml
Normal file
@@ -0,0 +1,138 @@
|
||||
# Token Rules — Agent reward/penalty configuration for automations
|
||||
#
|
||||
# This file defines the token economy for agent actions.
|
||||
# Modify values here to adjust incentives without code changes.
|
||||
#
|
||||
# Used by: timmy_automations.utils.token_rules
|
||||
|
||||
version: "1.0.0"
|
||||
description: "Token economy rules for agent automations"
|
||||
|
||||
# ── Events ─────────────────────────────────────────────────────────────────
|
||||
# Each event type defines rewards/penalties and optional gating thresholds
|
||||
|
||||
events:
|
||||
# Triage actions
|
||||
triage_success:
|
||||
description: "Successfully triaged an issue (scored and categorized)"
|
||||
reward: 5
|
||||
category: "triage"
|
||||
|
||||
deep_triage_refinement:
|
||||
description: "LLM-driven issue refinement with acceptance criteria added"
|
||||
reward: 20
|
||||
category: "triage"
|
||||
|
||||
quarantine_candidate_found:
|
||||
description: "Identified a repeat failure issue for quarantine"
|
||||
reward: 10
|
||||
category: "triage"
|
||||
|
||||
# Daily Run completions
|
||||
daily_run_completed:
|
||||
description: "Completed a daily run cycle successfully"
|
||||
reward: 5
|
||||
category: "daily_run"
|
||||
|
||||
golden_path_generated:
|
||||
description: "Generated a coherent mini-session plan"
|
||||
reward: 3
|
||||
category: "daily_run"
|
||||
|
||||
weekly_narrative_created:
|
||||
description: "Generated weekly summary of work themes"
|
||||
reward: 15
|
||||
category: "daily_run"
|
||||
|
||||
# PR merges
|
||||
pr_merged:
|
||||
description: "Successfully merged a pull request"
|
||||
reward: 10
|
||||
category: "merge"
|
||||
# Gating: requires minimum tokens to perform
|
||||
gate_threshold: 0
|
||||
|
||||
pr_merged_with_tests:
|
||||
description: "Merged PR with all tests passing"
|
||||
reward: 15
|
||||
category: "merge"
|
||||
gate_threshold: 0
|
||||
|
||||
# Test fixes
|
||||
test_fixed:
|
||||
description: "Fixed a failing test"
|
||||
reward: 8
|
||||
category: "test"
|
||||
|
||||
test_added:
|
||||
description: "Added new test coverage"
|
||||
reward: 5
|
||||
category: "test"
|
||||
|
||||
critical_bug_fixed:
|
||||
description: "Fixed a critical bug on main"
|
||||
reward: 25
|
||||
category: "test"
|
||||
|
||||
# General operations
|
||||
automation_run:
|
||||
description: "Ran any automation (resource usage)"
|
||||
penalty: -1
|
||||
category: "operation"
|
||||
|
||||
automation_failure:
|
||||
description: "Automation failed or produced error"
|
||||
penalty: -2
|
||||
category: "operation"
|
||||
|
||||
cycle_retro_logged:
|
||||
description: "Logged structured retrospective data"
|
||||
reward: 5
|
||||
category: "operation"
|
||||
|
||||
pre_commit_passed:
|
||||
description: "Pre-commit checks passed"
|
||||
reward: 2
|
||||
category: "operation"
|
||||
|
||||
pre_commit_failed:
|
||||
description: "Pre-commit checks failed"
|
||||
penalty: -1
|
||||
category: "operation"
|
||||
|
||||
# ── Gating Thresholds ──────────────────────────────────────────────────────
|
||||
# Minimum token balances required for sensitive operations
|
||||
|
||||
gating_thresholds:
|
||||
pr_merge: 0
|
||||
sensitive_config_change: 50
|
||||
agent_workspace_create: 10
|
||||
deep_triage_run: 0
|
||||
|
||||
# ── Daily Limits ───────────────────────────────────────────────────────────
|
||||
# Maximum tokens that can be earned/spent per category per day
|
||||
|
||||
daily_limits:
|
||||
triage:
|
||||
max_earn: 100
|
||||
max_spend: 0
|
||||
daily_run:
|
||||
max_earn: 50
|
||||
max_spend: 0
|
||||
merge:
|
||||
max_earn: 100
|
||||
max_spend: 0
|
||||
test:
|
||||
max_earn: 100
|
||||
max_spend: 0
|
||||
operation:
|
||||
max_earn: 50
|
||||
max_spend: 50
|
||||
|
||||
# ── Audit Settings ─────────────────────────────────────────────────────────
|
||||
# Settings for token audit and inspection
|
||||
|
||||
audit:
|
||||
log_all_transactions: true
|
||||
log_retention_days: 30
|
||||
inspectable_by: ["orchestrator", "auditor", "timmy"]
|
||||
583
timmy_automations/daily_run/golden_path.py
Normal file
583
timmy_automations/daily_run/golden_path.py
Normal file
@@ -0,0 +1,583 @@
|
||||
"""Golden Path generator — coherent 30-60 minute mini-sessions from real issues.
|
||||
|
||||
Fetches issues from Gitea and assembles them into ordered sequences forming
|
||||
a coherent mini-session. Each Golden Path includes:
|
||||
- One small triage cleanup
|
||||
- Two micro-fixes (XS/S sized)
|
||||
- One test-improvement task
|
||||
|
||||
All tasks are real issues from the Gitea repository, never synthetic.
|
||||
|
||||
Usage:
|
||||
from timmy_automations.daily_run.golden_path import generate_golden_path
|
||||
path = generate_golden_path(minutes=45)
|
||||
print(path.to_json())
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib.request import Request, urlopen
|
||||
from urllib.error import HTTPError, URLError
|
||||
|
||||
# ── Configuration ─────────────────────────────────────────────────────────
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
|
||||
CONFIG_PATH = Path(__file__).parent.parent / "config" / "daily_run.json"
|
||||
|
||||
DEFAULT_CONFIG = {
|
||||
"gitea_api": "http://localhost:3000/api/v1",
|
||||
"repo_slug": "rockachopa/Timmy-time-dashboard",
|
||||
"token_file": "~/.hermes/gitea_token",
|
||||
"size_labels": ["size:XS", "size:S", "size:M"],
|
||||
}
|
||||
|
||||
# Time estimates (in minutes) by size and type
|
||||
TIME_ESTIMATES: dict[str, dict[str, int]] = {
|
||||
"XS": {"triage": 5, "fix": 10, "test": 10, "docs": 8, "refactor": 8},
|
||||
"S": {"triage": 10, "fix": 15, "test": 15, "docs": 12, "refactor": 12},
|
||||
"M": {"triage": 15, "fix": 25, "test": 25, "docs": 20, "refactor": 20},
|
||||
}
|
||||
|
||||
# Issue type detection patterns
|
||||
TYPE_PATTERNS: dict[str, dict[str, list[str]]] = {
|
||||
"triage": {
|
||||
"labels": ["triage", "cleanup", "organize", "sort", "categorize"],
|
||||
"title": ["triage", "cleanup", "organize", "sort", "categorize", "clean up"],
|
||||
},
|
||||
"fix": {
|
||||
"labels": ["bug", "fix", "error", "broken"],
|
||||
"title": ["fix", "bug", "error", "broken", "repair", "correct"],
|
||||
},
|
||||
"test": {
|
||||
"labels": ["test", "testing", "coverage", "pytest"],
|
||||
"title": ["test", "coverage", "pytest", "unit test", "integration test"],
|
||||
},
|
||||
"docs": {
|
||||
"labels": ["docs", "documentation", "readme", "docstring"],
|
||||
"title": ["doc", "readme", "comment", "guide", "tutorial"],
|
||||
},
|
||||
"refactor": {
|
||||
"labels": ["refactor", "cleanup", "debt", "maintainability"],
|
||||
"title": ["refactor", "cleanup", "simplify", "extract", "reorganize"],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def load_config() -> dict:
|
||||
"""Load configuration from config file with fallback to defaults."""
|
||||
config = DEFAULT_CONFIG.copy()
|
||||
if CONFIG_PATH.exists():
|
||||
try:
|
||||
file_config = json.loads(CONFIG_PATH.read_text())
|
||||
if "orchestrator" in file_config:
|
||||
config.update(file_config["orchestrator"])
|
||||
except (json.JSONDecodeError, OSError) as exc:
|
||||
print(f"[golden_path] Warning: Could not load config: {exc}", file=sys.stderr)
|
||||
|
||||
# Environment variable overrides
|
||||
if os.environ.get("TIMMY_GITEA_API"):
|
||||
config["gitea_api"] = os.environ.get("TIMMY_GITEA_API")
|
||||
if os.environ.get("TIMMY_REPO_SLUG"):
|
||||
config["repo_slug"] = os.environ.get("TIMMY_REPO_SLUG")
|
||||
if os.environ.get("TIMMY_GITEA_TOKEN"):
|
||||
config["token"] = os.environ.get("TIMMY_GITEA_TOKEN")
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def get_token(config: dict) -> str | None:
|
||||
"""Get Gitea token from environment or file."""
|
||||
if "token" in config:
|
||||
return config["token"]
|
||||
|
||||
token_file = Path(config["token_file"]).expanduser()
|
||||
if token_file.exists():
|
||||
return token_file.read_text().strip()
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# ── Gitea API Client ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
"""Simple Gitea API client with graceful degradation."""
|
||||
|
||||
def __init__(self, config: dict, token: str | None):
|
||||
self.api_base = config["gitea_api"].rstrip("/")
|
||||
self.repo_slug = config["repo_slug"]
|
||||
self.token = token
|
||||
self._available: bool | None = None
|
||||
|
||||
def _headers(self) -> dict:
|
||||
headers = {"Accept": "application/json"}
|
||||
if self.token:
|
||||
headers["Authorization"] = f"token {self.token}"
|
||||
return headers
|
||||
|
||||
def _api_url(self, path: str) -> str:
|
||||
return f"{self.api_base}/repos/{self.repo_slug}/{path}"
|
||||
|
||||
def is_available(self) -> bool:
|
||||
"""Check if Gitea API is reachable."""
|
||||
if self._available is not None:
|
||||
return self._available
|
||||
|
||||
try:
|
||||
req = Request(
|
||||
f"{self.api_base}/version",
|
||||
headers=self._headers(),
|
||||
method="GET",
|
||||
)
|
||||
with urlopen(req, timeout=5) as resp:
|
||||
self._available = resp.status == 200
|
||||
return self._available
|
||||
except (HTTPError, URLError, TimeoutError):
|
||||
self._available = False
|
||||
return False
|
||||
|
||||
def get(self, path: str, params: dict | None = None) -> list | dict:
|
||||
"""Make a GET request to the Gitea API."""
|
||||
url = self._api_url(path)
|
||||
if params:
|
||||
query = "&".join(f"{k}={v}" for k, v in params.items())
|
||||
url = f"{url}?{query}"
|
||||
|
||||
req = Request(url, headers=self._headers(), method="GET")
|
||||
with urlopen(req, timeout=15) as resp:
|
||||
return json.loads(resp.read())
|
||||
|
||||
def get_paginated(self, path: str, params: dict | None = None) -> list:
|
||||
"""Fetch all pages of a paginated endpoint."""
|
||||
all_items = []
|
||||
page = 1
|
||||
limit = 50
|
||||
|
||||
while True:
|
||||
page_params = {"limit": limit, "page": page}
|
||||
if params:
|
||||
page_params.update(params)
|
||||
|
||||
batch = self.get(path, page_params)
|
||||
if not batch:
|
||||
break
|
||||
|
||||
all_items.extend(batch)
|
||||
if len(batch) < limit:
|
||||
break
|
||||
page += 1
|
||||
|
||||
return all_items
|
||||
|
||||
|
||||
# ── Issue Classification ──────────────────────────────────────────────────
|
||||
|
||||
|
||||
def extract_size(labels: list[dict]) -> str:
|
||||
"""Extract size label from issue labels."""
|
||||
for label in labels:
|
||||
name = label.get("name", "")
|
||||
if name.startswith("size:"):
|
||||
return name.replace("size:", "").upper()
|
||||
return "?"
|
||||
|
||||
|
||||
def classify_issue_type(issue: dict) -> str:
|
||||
"""Classify an issue into a type based on labels and title."""
|
||||
labels = [l.get("name", "").lower() for l in issue.get("labels", [])]
|
||||
title = issue.get("title", "").lower()
|
||||
|
||||
scores: dict[str, int] = {}
|
||||
|
||||
for issue_type, patterns in TYPE_PATTERNS.items():
|
||||
score = 0
|
||||
# Check labels
|
||||
for pattern in patterns["labels"]:
|
||||
if any(pattern in label for label in labels):
|
||||
score += 2
|
||||
# Check title
|
||||
for pattern in patterns["title"]:
|
||||
if pattern in title:
|
||||
score += 1
|
||||
scores[issue_type] = score
|
||||
|
||||
# Return the type with highest score, or "fix" as default
|
||||
if scores:
|
||||
best_type = max(scores, key=lambda k: scores[k])
|
||||
if scores[best_type] > 0:
|
||||
return best_type
|
||||
|
||||
return "fix" # Default to fix for uncategorized issues
|
||||
|
||||
|
||||
def estimate_time(issue: dict) -> int:
|
||||
"""Estimate time in minutes for an issue based on size and type."""
|
||||
size = extract_size(issue.get("labels", []))
|
||||
issue_type = classify_issue_type(issue)
|
||||
|
||||
# Default to fix time estimates if type not found
|
||||
type_map = issue_type if issue_type in TIME_ESTIMATES.get(size, {}) else "fix"
|
||||
|
||||
return TIME_ESTIMATES.get(size, TIME_ESTIMATES["S"]).get(type_map, 15)
|
||||
|
||||
|
||||
def score_issue_for_path(issue: dict) -> int:
|
||||
"""Score an issue for Golden Path suitability (higher = better fit)."""
|
||||
score = 0
|
||||
labels = [l.get("name", "").lower() for l in issue.get("labels", [])]
|
||||
issue_type = classify_issue_type(issue)
|
||||
|
||||
# Prefer smaller sizes for predictability
|
||||
if "size:xs" in labels:
|
||||
score += 10
|
||||
elif "size:s" in labels:
|
||||
score += 7
|
||||
elif "size:m" in labels:
|
||||
score += 3
|
||||
|
||||
# Prefer issues with clear type labels
|
||||
if issue_type in ["triage", "test", "fix"]:
|
||||
score += 3
|
||||
|
||||
# Prefer issues with acceptance criteria or good description
|
||||
body = issue.get("body", "")
|
||||
if body:
|
||||
if "## acceptance criteria" in body.lower() or "acceptance criteria" in body.lower():
|
||||
score += 3
|
||||
if len(body) > 200:
|
||||
score += 1
|
||||
|
||||
# Prefer issues with recent activity
|
||||
updated_at = issue.get("updated_at", "")
|
||||
if updated_at:
|
||||
try:
|
||||
updated = datetime.fromisoformat(updated_at.replace("Z", "+00:00"))
|
||||
days_old = (datetime.now(timezone.utc) - updated).days
|
||||
if days_old < 7:
|
||||
score += 2
|
||||
elif days_old < 30:
|
||||
score += 1
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
return score
|
||||
|
||||
|
||||
# ── Golden Path Generation ────────────────────────────────────────────────
|
||||
|
||||
|
||||
@dataclass
|
||||
class PathItem:
|
||||
"""A single item in a Golden Path."""
|
||||
|
||||
number: int
|
||||
title: str
|
||||
size: str
|
||||
issue_type: str
|
||||
estimated_minutes: int
|
||||
url: str
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"number": self.number,
|
||||
"title": self.title,
|
||||
"size": self.size,
|
||||
"type": self.issue_type,
|
||||
"estimated_minutes": self.estimated_minutes,
|
||||
"url": self.url,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class GoldenPath:
|
||||
"""A complete Golden Path sequence."""
|
||||
|
||||
generated_at: str
|
||||
target_minutes: int
|
||||
items: list[PathItem] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def total_estimated_minutes(self) -> int:
|
||||
return sum(item.estimated_minutes for item in self.items)
|
||||
|
||||
@property
|
||||
def item_count(self) -> int:
|
||||
return len(self.items)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"generated_at": self.generated_at,
|
||||
"target_minutes": self.target_minutes,
|
||||
"total_estimated_minutes": self.total_estimated_minutes,
|
||||
"item_count": self.item_count,
|
||||
"items": [item.to_dict() for item in self.items],
|
||||
}
|
||||
|
||||
def to_json(self, indent: int = 2) -> str:
|
||||
return json.dumps(self.to_dict(), indent=indent)
|
||||
|
||||
|
||||
def fetch_eligible_issues(client: GiteaClient, config: dict) -> list[dict]:
|
||||
"""Fetch open issues eligible for Golden Paths."""
|
||||
size_labels = config.get("size_labels", ["size:XS", "size:S", "size:M"])
|
||||
|
||||
try:
|
||||
# Fetch all open issues
|
||||
issues = client.get_paginated("issues", {"state": "open", "sort": "updated"})
|
||||
except (HTTPError, URLError) as exc:
|
||||
print(f"[golden_path] Warning: Failed to fetch issues: {exc}", file=sys.stderr)
|
||||
return []
|
||||
|
||||
# Filter by size labels if specified
|
||||
if size_labels:
|
||||
filtered = []
|
||||
size_names = {s.lower() for s in size_labels}
|
||||
for issue in issues:
|
||||
issue_labels = {l.get("name", "").lower() for l in issue.get("labels", [])}
|
||||
if issue_labels & size_names:
|
||||
filtered.append(issue)
|
||||
issues = filtered
|
||||
|
||||
return issues
|
||||
|
||||
|
||||
def group_issues_by_type(issues: list[dict]) -> dict[str, list[dict]]:
|
||||
"""Group issues by their classified type, sorted by score."""
|
||||
groups: dict[str, list[dict]] = {
|
||||
"triage": [],
|
||||
"fix": [],
|
||||
"test": [],
|
||||
"docs": [],
|
||||
"refactor": [],
|
||||
}
|
||||
|
||||
for issue in issues:
|
||||
issue_type = classify_issue_type(issue)
|
||||
if issue_type in groups:
|
||||
groups[issue_type].append(issue)
|
||||
|
||||
# Sort each group by score (highest first)
|
||||
for issue_type in groups:
|
||||
groups[issue_type] = sorted(
|
||||
groups[issue_type],
|
||||
key=lambda i: score_issue_for_path(i),
|
||||
reverse=True,
|
||||
)
|
||||
|
||||
return groups
|
||||
|
||||
|
||||
def build_golden_path(
|
||||
grouped_issues: dict[str, list[dict]],
|
||||
target_minutes: int = 45,
|
||||
) -> GoldenPath:
|
||||
"""Build a Golden Path from grouped issues.
|
||||
|
||||
The path follows a coherent sequence:
|
||||
1. One small triage cleanup (warm-up)
|
||||
2. One micro-fix (momentum building)
|
||||
3. One test-improvement (quality focus)
|
||||
4. One more micro-fix or docs (closure)
|
||||
"""
|
||||
path = GoldenPath(
|
||||
generated_at=datetime.now(timezone.utc).isoformat(),
|
||||
target_minutes=target_minutes,
|
||||
)
|
||||
|
||||
used_issue_numbers: set[int] = set()
|
||||
|
||||
def add_best_item(issues: list[dict], max_minutes: int | None = None) -> bool:
|
||||
"""Add the best available issue of a type to the path."""
|
||||
for issue in issues:
|
||||
number = issue.get("number", 0)
|
||||
if number in used_issue_numbers:
|
||||
continue
|
||||
|
||||
est_time = estimate_time(issue)
|
||||
if max_minutes and est_time > max_minutes:
|
||||
continue
|
||||
|
||||
used_issue_numbers.add(number)
|
||||
path.items.append(
|
||||
PathItem(
|
||||
number=number,
|
||||
title=issue.get("title", "Untitled"),
|
||||
size=extract_size(issue.get("labels", [])),
|
||||
issue_type=classify_issue_type(issue),
|
||||
estimated_minutes=est_time,
|
||||
url=issue.get("html_url", ""),
|
||||
)
|
||||
)
|
||||
return True
|
||||
return False
|
||||
|
||||
# Phase 1: Warm-up with triage (5-10 min)
|
||||
if grouped_issues["triage"]:
|
||||
add_best_item(grouped_issues["triage"], max_minutes=15)
|
||||
else:
|
||||
# Fallback: use smallest available issue
|
||||
all_issues = (
|
||||
grouped_issues["fix"]
|
||||
+ grouped_issues["docs"]
|
||||
+ grouped_issues["refactor"]
|
||||
)
|
||||
all_issues.sort(key=lambda i: score_issue_for_path(i), reverse=True)
|
||||
add_best_item(all_issues, max_minutes=10)
|
||||
|
||||
# Phase 2: First micro-fix (10-15 min)
|
||||
if grouped_issues["fix"]:
|
||||
add_best_item(grouped_issues["fix"], max_minutes=20)
|
||||
else:
|
||||
# Fallback to refactor
|
||||
add_best_item(grouped_issues["refactor"], max_minutes=15)
|
||||
|
||||
# Phase 3: Test improvement (10-15 min)
|
||||
if grouped_issues["test"]:
|
||||
add_best_item(grouped_issues["test"], max_minutes=20)
|
||||
else:
|
||||
# If no test issues, add another fix
|
||||
add_best_item(grouped_issues["fix"], max_minutes=15)
|
||||
|
||||
# Phase 4: Closure fix or docs (10-15 min)
|
||||
# Try to fill remaining time
|
||||
remaining_budget = target_minutes - path.total_estimated_minutes
|
||||
if remaining_budget >= 10:
|
||||
# Prefer fix, then docs
|
||||
if not add_best_item(grouped_issues["fix"], max_minutes=remaining_budget):
|
||||
if not add_best_item(grouped_issues["docs"], max_minutes=remaining_budget):
|
||||
add_best_item(grouped_issues["refactor"], max_minutes=remaining_budget)
|
||||
|
||||
return path
|
||||
|
||||
|
||||
def generate_golden_path(
|
||||
target_minutes: int = 45,
|
||||
config: dict | None = None,
|
||||
) -> GoldenPath:
|
||||
"""Generate a Golden Path for the specified time budget.
|
||||
|
||||
Args:
|
||||
target_minutes: Target session length (30-60 recommended)
|
||||
config: Optional config override
|
||||
|
||||
Returns:
|
||||
A GoldenPath with ordered items from real Gitea issues
|
||||
"""
|
||||
cfg = config or load_config()
|
||||
token = get_token(cfg)
|
||||
client = GiteaClient(cfg, token)
|
||||
|
||||
if not client.is_available():
|
||||
# Return empty path with error indication
|
||||
return GoldenPath(
|
||||
generated_at=datetime.now(timezone.utc).isoformat(),
|
||||
target_minutes=target_minutes,
|
||||
items=[],
|
||||
)
|
||||
|
||||
issues = fetch_eligible_issues(client, cfg)
|
||||
grouped = group_issues_by_type(issues)
|
||||
return build_golden_path(grouped, target_minutes)
|
||||
|
||||
|
||||
# ── Output Formatting ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def print_golden_path(path: GoldenPath) -> None:
|
||||
"""Print a formatted Golden Path to stdout."""
|
||||
print("=" * 60)
|
||||
print("🌟 GOLDEN PATH")
|
||||
print("=" * 60)
|
||||
print(f"Generated: {path.generated_at}")
|
||||
print(f"Target: {path.target_minutes} minutes")
|
||||
print(f"Estimated: {path.total_estimated_minutes} minutes")
|
||||
print()
|
||||
|
||||
if not path.items:
|
||||
print("No eligible issues found for a Golden Path.")
|
||||
print()
|
||||
print("To create Golden Paths, ensure issues have:")
|
||||
print(" - Size labels: size:XS, size:S, or size:M")
|
||||
print(" - Type labels: bug, test, triage, docs, refactor")
|
||||
print()
|
||||
return
|
||||
|
||||
for i, item in enumerate(path.items, 1):
|
||||
type_emoji = {
|
||||
"triage": "🧹",
|
||||
"fix": "🔧",
|
||||
"test": "🧪",
|
||||
"docs": "📚",
|
||||
"refactor": "♻️",
|
||||
}.get(item.issue_type, "📋")
|
||||
|
||||
print(f"{i}. {type_emoji} #{item.number} [{item.size}] ({item.estimated_minutes}m)")
|
||||
print(f" Title: {item.title}")
|
||||
print(f" Type: {item.issue_type.upper()}")
|
||||
if item.url:
|
||||
print(f" URL: {item.url}")
|
||||
print()
|
||||
|
||||
print("-" * 60)
|
||||
print("Instructions:")
|
||||
print(" 1. Start with the triage item to warm up")
|
||||
print(" 2. Progress through fixes to build momentum")
|
||||
print(" 3. Use the test item for quality focus")
|
||||
print(" 4. Check off items as you complete them")
|
||||
print()
|
||||
|
||||
|
||||
# ── CLI ───────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(
|
||||
description="Golden Path generator — coherent 30-60 minute mini-sessions",
|
||||
)
|
||||
p.add_argument(
|
||||
"--minutes",
|
||||
"-m",
|
||||
type=int,
|
||||
default=45,
|
||||
help="Target session length in minutes (default: 45)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--json",
|
||||
"-j",
|
||||
action="store_true",
|
||||
help="Output as JSON instead of formatted text",
|
||||
)
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
|
||||
# Validate target minutes
|
||||
target = max(30, min(60, args.minutes))
|
||||
if target != args.minutes:
|
||||
print(
|
||||
f"[golden_path] Warning: Clamped {args.minutes}m to {target}m range",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
path = generate_golden_path(target_minutes=target)
|
||||
|
||||
if args.json:
|
||||
print(path.to_json())
|
||||
else:
|
||||
print_golden_path(path)
|
||||
|
||||
return 0 if path.items else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -22,6 +22,14 @@ from typing import Any
|
||||
from urllib.request import Request, urlopen
|
||||
from urllib.error import HTTPError, URLError
|
||||
|
||||
# ── Token Economy Integration ──────────────────────────────────────────────
|
||||
# Import token rules helpers for tracking Daily Run rewards
|
||||
|
||||
sys.path.insert(
|
||||
0, str(Path(__file__).resolve().parent.parent)
|
||||
)
|
||||
from utils.token_rules import TokenRules, compute_token_reward
|
||||
|
||||
# ── Configuration ─────────────────────────────────────────────────────────
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
|
||||
@@ -490,6 +498,43 @@ def parse_args() -> argparse.Namespace:
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def compute_daily_run_tokens(success: bool = True) -> dict[str, Any]:
|
||||
"""Compute token rewards for Daily Run completion.
|
||||
|
||||
Uses the centralized token_rules configuration to calculate
|
||||
rewards/penalties for automation actions.
|
||||
|
||||
Args:
|
||||
success: Whether the Daily Run completed successfully
|
||||
|
||||
Returns:
|
||||
Token transaction details
|
||||
"""
|
||||
rules = TokenRules()
|
||||
|
||||
if success:
|
||||
# Daily run completed successfully
|
||||
transaction = compute_token_reward("daily_run_completed", current_tokens=0)
|
||||
|
||||
# Also compute golden path generation if agenda was created
|
||||
agenda_transaction = compute_token_reward("golden_path_generated", current_tokens=0)
|
||||
|
||||
return {
|
||||
"daily_run": transaction,
|
||||
"golden_path": agenda_transaction,
|
||||
"total_delta": transaction.get("delta", 0) + agenda_transaction.get("delta", 0),
|
||||
"config_version": rules.get_config_version(),
|
||||
}
|
||||
else:
|
||||
# Automation failed
|
||||
transaction = compute_token_reward("automation_failure", current_tokens=0)
|
||||
return {
|
||||
"automation_failure": transaction,
|
||||
"total_delta": transaction.get("delta", 0),
|
||||
"config_version": rules.get_config_version(),
|
||||
}
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
config = load_config()
|
||||
@@ -503,10 +548,13 @@ def main() -> int:
|
||||
# Check Gitea availability
|
||||
if not client.is_available():
|
||||
error_msg = "[orchestrator] Error: Gitea API is not available"
|
||||
# Compute failure tokens even when unavailable
|
||||
tokens = compute_daily_run_tokens(success=False)
|
||||
if args.json:
|
||||
print(json.dumps({"error": error_msg}))
|
||||
print(json.dumps({"error": error_msg, "tokens": tokens}))
|
||||
else:
|
||||
print(error_msg, file=sys.stderr)
|
||||
print(f"[tokens] Failure penalty: {tokens['total_delta']}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
# Fetch candidates and generate agenda
|
||||
@@ -521,9 +569,12 @@ def main() -> int:
|
||||
cycles = load_cycle_data()
|
||||
day_summary = generate_day_summary(activity, cycles)
|
||||
|
||||
# Compute token rewards for successful completion
|
||||
tokens = compute_daily_run_tokens(success=True)
|
||||
|
||||
# Output
|
||||
if args.json:
|
||||
output = {"agenda": agenda}
|
||||
output = {"agenda": agenda, "tokens": tokens}
|
||||
if day_summary:
|
||||
output["day_summary"] = day_summary
|
||||
print(json.dumps(output, indent=2))
|
||||
@@ -531,6 +582,15 @@ def main() -> int:
|
||||
print_agenda(agenda)
|
||||
if day_summary and activity:
|
||||
print_day_summary(day_summary, activity)
|
||||
# Show token rewards
|
||||
print("─" * 60)
|
||||
print("🪙 Token Rewards")
|
||||
print("─" * 60)
|
||||
print(f"Daily Run completed: +{tokens['daily_run']['delta']} tokens")
|
||||
if candidates:
|
||||
print(f"Golden path generated: +{tokens['golden_path']['delta']} tokens")
|
||||
print(f"Total: +{tokens['total_delta']} tokens")
|
||||
print(f"Config version: {tokens['config_version']}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
745
timmy_automations/daily_run/weekly_narrative.py
Normal file
745
timmy_automations/daily_run/weekly_narrative.py
Normal file
@@ -0,0 +1,745 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Weekly narrative summary generator — human-readable loop analysis.
|
||||
|
||||
Analyzes the past week's activity across the development loop to produce
|
||||
a narrative summary of:
|
||||
- What changed (themes, areas of focus)
|
||||
- How agents and Timmy contributed
|
||||
- Any shifts in tests, triage, or token economy
|
||||
|
||||
The output is designed to be skimmable — a quick read that gives context
|
||||
on the week's progress without drowning in metrics.
|
||||
|
||||
Run: python3 timmy_automations/daily_run/weekly_narrative.py [--json]
|
||||
Env: See timmy_automations/config/automations.json for configuration
|
||||
|
||||
Refs: #719
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from collections import Counter
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
# ── Configuration ─────────────────────────────────────────────────────────
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
|
||||
CONFIG_PATH = Path(__file__).parent.parent / "config" / "automations.json"
|
||||
|
||||
DEFAULT_CONFIG = {
|
||||
"gitea_api": "http://localhost:3000/api/v1",
|
||||
"repo_slug": "rockachopa/Timmy-time-dashboard",
|
||||
"token_file": "~/.hermes/gitea_token",
|
||||
"lookback_days": 7,
|
||||
"output_file": ".loop/weekly_narrative.json",
|
||||
"enabled": True,
|
||||
}
|
||||
|
||||
|
||||
# ── Data Loading ───────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def load_automation_config() -> dict:
|
||||
"""Load configuration for weekly_narrative from automations manifest."""
|
||||
config = DEFAULT_CONFIG.copy()
|
||||
if CONFIG_PATH.exists():
|
||||
try:
|
||||
manifest = json.loads(CONFIG_PATH.read_text())
|
||||
for auto in manifest.get("automations", []):
|
||||
if auto.get("id") == "weekly_narrative":
|
||||
config.update(auto.get("config", {}))
|
||||
config["enabled"] = auto.get("enabled", True)
|
||||
break
|
||||
except (json.JSONDecodeError, OSError) as exc:
|
||||
print(f"[weekly_narrative] Warning: Could not load config: {exc}", file=sys.stderr)
|
||||
|
||||
# Environment variable overrides
|
||||
if os.environ.get("TIMMY_GITEA_API"):
|
||||
config["gitea_api"] = os.environ.get("TIMMY_GITEA_API")
|
||||
if os.environ.get("TIMMY_REPO_SLUG"):
|
||||
config["repo_slug"] = os.environ.get("TIMMY_REPO_SLUG")
|
||||
if os.environ.get("TIMMY_GITEA_TOKEN"):
|
||||
config["token"] = os.environ.get("TIMMY_GITEA_TOKEN")
|
||||
if os.environ.get("TIMMY_WEEKLY_NARRATIVE_ENABLED"):
|
||||
config["enabled"] = os.environ.get("TIMMY_WEEKLY_NARRATIVE_ENABLED", "true").lower() == "true"
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def get_token(config: dict) -> str | None:
|
||||
"""Get Gitea token from environment or file."""
|
||||
if "token" in config:
|
||||
return config["token"]
|
||||
|
||||
token_file = Path(config["token_file"]).expanduser()
|
||||
if token_file.exists():
|
||||
return token_file.read_text().strip()
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def load_jsonl(path: Path) -> list[dict]:
|
||||
"""Load a JSONL file, skipping bad lines."""
|
||||
if not path.exists():
|
||||
return []
|
||||
entries = []
|
||||
for line in path.read_text().strip().splitlines():
|
||||
try:
|
||||
entries.append(json.loads(line))
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
return entries
|
||||
|
||||
|
||||
def parse_ts(ts_str: str) -> datetime | None:
|
||||
"""Parse an ISO timestamp, tolerating missing tz."""
|
||||
if not ts_str:
|
||||
return None
|
||||
try:
|
||||
dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=UTC)
|
||||
return dt
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
|
||||
# ── Gitea API Client ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
"""Simple Gitea API client with graceful degradation."""
|
||||
|
||||
def __init__(self, config: dict, token: str | None):
|
||||
self.api_base = config["gitea_api"].rstrip("/")
|
||||
self.repo_slug = config["repo_slug"]
|
||||
self.token = token
|
||||
self._available: bool | None = None
|
||||
|
||||
def _headers(self) -> dict:
|
||||
headers = {"Accept": "application/json"}
|
||||
if self.token:
|
||||
headers["Authorization"] = f"token {self.token}"
|
||||
return headers
|
||||
|
||||
def _api_url(self, path: str) -> str:
|
||||
return f"{self.api_base}/repos/{self.repo_slug}/{path}"
|
||||
|
||||
def is_available(self) -> bool:
|
||||
"""Check if Gitea API is reachable."""
|
||||
if self._available is not None:
|
||||
return self._available
|
||||
|
||||
try:
|
||||
req = Request(
|
||||
f"{self.api_base}/version",
|
||||
headers=self._headers(),
|
||||
method="GET",
|
||||
)
|
||||
with urlopen(req, timeout=5) as resp:
|
||||
self._available = resp.status == 200
|
||||
return self._available
|
||||
except (HTTPError, URLError, TimeoutError):
|
||||
self._available = False
|
||||
return False
|
||||
|
||||
def get_paginated(self, path: str, params: dict | None = None) -> list:
|
||||
"""Fetch all pages of a paginated endpoint."""
|
||||
all_items = []
|
||||
page = 1
|
||||
limit = 50
|
||||
|
||||
while True:
|
||||
url = self._api_url(path)
|
||||
query_parts = [f"limit={limit}", f"page={page}"]
|
||||
if params:
|
||||
for key, val in params.items():
|
||||
query_parts.append(f"{key}={val}")
|
||||
url = f"{url}?{'&'.join(query_parts)}"
|
||||
|
||||
req = Request(url, headers=self._headers(), method="GET")
|
||||
with urlopen(req, timeout=15) as resp:
|
||||
batch = json.loads(resp.read())
|
||||
|
||||
if not batch:
|
||||
break
|
||||
|
||||
all_items.extend(batch)
|
||||
if len(batch) < limit:
|
||||
break
|
||||
page += 1
|
||||
|
||||
return all_items
|
||||
|
||||
|
||||
# ── Data Collection ────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def collect_cycles_data(since: datetime) -> dict:
|
||||
"""Load cycle retrospective data from the lookback period."""
|
||||
cycles_file = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
|
||||
if not cycles_file.exists():
|
||||
return {"cycles": [], "total": 0, "successes": 0, "failures": 0}
|
||||
|
||||
entries = load_jsonl(cycles_file)
|
||||
recent = []
|
||||
for e in entries:
|
||||
ts = parse_ts(e.get("timestamp", ""))
|
||||
if ts and ts >= since:
|
||||
recent.append(e)
|
||||
|
||||
successes = [e for e in recent if e.get("success")]
|
||||
failures = [e for e in recent if not e.get("success")]
|
||||
|
||||
return {
|
||||
"cycles": recent,
|
||||
"total": len(recent),
|
||||
"successes": len(successes),
|
||||
"failures": len(failures),
|
||||
"success_rate": round(len(successes) / len(recent), 2) if recent else 0,
|
||||
}
|
||||
|
||||
|
||||
def collect_issues_data(client: GiteaClient, since: datetime) -> dict:
|
||||
"""Collect issue activity from Gitea."""
|
||||
if not client.is_available():
|
||||
return {"error": "Gitea unavailable", "issues": [], "closed": [], "opened": []}
|
||||
|
||||
try:
|
||||
issues = client.get_paginated("issues", {"state": "all", "sort": "updated", "limit": 100})
|
||||
except (HTTPError, URLError) as exc:
|
||||
return {"error": str(exc), "issues": [], "closed": [], "opened": []}
|
||||
|
||||
touched = []
|
||||
closed = []
|
||||
opened = []
|
||||
|
||||
for issue in issues:
|
||||
updated_at = issue.get("updated_at", "")
|
||||
created_at = issue.get("created_at", "")
|
||||
|
||||
updated = parse_ts(updated_at)
|
||||
created = parse_ts(created_at)
|
||||
|
||||
if updated and updated >= since:
|
||||
touched.append(issue)
|
||||
|
||||
if issue.get("state") == "closed":
|
||||
closed_at = issue.get("closed_at", "")
|
||||
closed_dt = parse_ts(closed_at)
|
||||
if closed_dt and closed_dt >= since:
|
||||
closed.append(issue)
|
||||
elif created and created >= since:
|
||||
opened.append(issue)
|
||||
|
||||
return {
|
||||
"issues": touched,
|
||||
"closed": closed,
|
||||
"opened": opened,
|
||||
"touched_count": len(touched),
|
||||
"closed_count": len(closed),
|
||||
"opened_count": len(opened),
|
||||
}
|
||||
|
||||
|
||||
def collect_prs_data(client: GiteaClient, since: datetime) -> dict:
|
||||
"""Collect PR activity from Gitea."""
|
||||
if not client.is_available():
|
||||
return {"error": "Gitea unavailable", "prs": [], "merged": [], "opened": []}
|
||||
|
||||
try:
|
||||
prs = client.get_paginated("pulls", {"state": "all", "sort": "updated", "limit": 100})
|
||||
except (HTTPError, URLError) as exc:
|
||||
return {"error": str(exc), "prs": [], "merged": [], "opened": []}
|
||||
|
||||
touched = []
|
||||
merged = []
|
||||
opened = []
|
||||
|
||||
for pr in prs:
|
||||
updated_at = pr.get("updated_at", "")
|
||||
created_at = pr.get("created_at", "")
|
||||
merged_at = pr.get("merged_at", "")
|
||||
|
||||
updated = parse_ts(updated_at)
|
||||
created = parse_ts(created_at)
|
||||
merged_dt = parse_ts(merged_at) if merged_at else None
|
||||
|
||||
if updated and updated >= since:
|
||||
touched.append(pr)
|
||||
|
||||
if pr.get("merged") and merged_dt and merged_dt >= since:
|
||||
merged.append(pr)
|
||||
elif created and created >= since:
|
||||
opened.append(pr)
|
||||
|
||||
return {
|
||||
"prs": touched,
|
||||
"merged": merged,
|
||||
"opened": opened,
|
||||
"touched_count": len(touched),
|
||||
"merged_count": len(merged),
|
||||
"opened_count": len(opened),
|
||||
}
|
||||
|
||||
|
||||
def collect_triage_data(since: datetime) -> dict:
|
||||
"""Load triage and introspection data."""
|
||||
triage_file = REPO_ROOT / ".loop" / "retro" / "triage.jsonl"
|
||||
insights_file = REPO_ROOT / ".loop" / "retro" / "insights.json"
|
||||
|
||||
triage_entries = load_jsonl(triage_file)
|
||||
recent_triage = [
|
||||
e for e in triage_entries
|
||||
if parse_ts(e.get("timestamp", "")) and parse_ts(e.get("timestamp", "")) >= since
|
||||
]
|
||||
|
||||
insights = {}
|
||||
if insights_file.exists():
|
||||
try:
|
||||
insights = json.loads(insights_file.read_text())
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
|
||||
return {
|
||||
"triage_runs": len(recent_triage),
|
||||
"triage_entries": recent_triage,
|
||||
"latest_insights": insights,
|
||||
}
|
||||
|
||||
|
||||
def collect_token_data(since: datetime) -> dict:
|
||||
"""Load token economy data from the lightning ledger."""
|
||||
# The ledger is in-memory but we can look for any persisted data
|
||||
# For now, return placeholder that will be filled by the ledger module
|
||||
return {
|
||||
"note": "Token economy data is ephemeral — check dashboard for live metrics",
|
||||
"balance_sats": 0, # Placeholder
|
||||
"transactions_week": 0,
|
||||
}
|
||||
|
||||
|
||||
# ── Analysis Functions ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def extract_themes(issues: list[dict]) -> list[dict]:
|
||||
"""Extract themes from issue labels."""
|
||||
label_counts = Counter()
|
||||
layer_counts = Counter()
|
||||
type_counts = Counter()
|
||||
|
||||
for issue in issues:
|
||||
for label in issue.get("labels", []):
|
||||
name = label.get("name", "")
|
||||
label_counts[name] += 1
|
||||
|
||||
if name.startswith("layer:"):
|
||||
layer_counts[name.replace("layer:", "")] += 1
|
||||
if name in ("bug", "feature", "refactor", "docs", "test", "chore"):
|
||||
type_counts[name] += 1
|
||||
|
||||
# Top themes (labels excluding layer prefixes)
|
||||
themes = [
|
||||
{"name": name, "count": count}
|
||||
for name, count in label_counts.most_common(10)
|
||||
if not name.startswith(("layer:", "size:"))
|
||||
]
|
||||
|
||||
# Layers
|
||||
layers = [
|
||||
{"name": name, "count": count}
|
||||
for name, count in layer_counts.most_common()
|
||||
]
|
||||
|
||||
# Types
|
||||
types = [
|
||||
{"name": name, "count": count}
|
||||
for name, count in type_counts.most_common()
|
||||
]
|
||||
|
||||
return {
|
||||
"top_labels": themes,
|
||||
"layers": layers,
|
||||
"types": types,
|
||||
}
|
||||
|
||||
|
||||
def extract_agent_contributions(issues: list[dict], prs: list[dict], cycles: list[dict]) -> dict:
|
||||
"""Extract agent contribution patterns."""
|
||||
# Count by assignee
|
||||
assignee_counts = Counter()
|
||||
for issue in issues:
|
||||
assignee = issue.get("assignee")
|
||||
if assignee and isinstance(assignee, dict):
|
||||
assignee_counts[assignee.get("login", "unknown")] += 1
|
||||
|
||||
# Count PR authors
|
||||
pr_authors = Counter()
|
||||
for pr in prs:
|
||||
user = pr.get("user")
|
||||
if user and isinstance(user, dict):
|
||||
pr_authors[user.get("login", "unknown")] += 1
|
||||
|
||||
# Check for Kimi mentions in cycle notes
|
||||
kimi_mentions = sum(
|
||||
1 for c in cycles
|
||||
if "kimi" in c.get("notes", "").lower() or "kimi" in c.get("reason", "").lower()
|
||||
)
|
||||
|
||||
return {
|
||||
"active_assignees": [
|
||||
{"login": login, "issues_count": count}
|
||||
for login, count in assignee_counts.most_common()
|
||||
],
|
||||
"pr_authors": [
|
||||
{"login": login, "prs_count": count}
|
||||
for login, count in pr_authors.most_common()
|
||||
],
|
||||
"kimi_mentioned_cycles": kimi_mentions,
|
||||
}
|
||||
|
||||
|
||||
def analyze_test_shifts(cycles: list[dict]) -> dict:
|
||||
"""Analyze shifts in test patterns."""
|
||||
if not cycles:
|
||||
return {"note": "No cycle data available"}
|
||||
|
||||
total_tests_passed = sum(c.get("tests_passed", 0) for c in cycles)
|
||||
total_tests_added = sum(c.get("tests_added", 0) for c in cycles)
|
||||
avg_tests_per_cycle = round(total_tests_passed / len(cycles), 1) if cycles else 0
|
||||
|
||||
# Look for test-related issues
|
||||
test_focused = [
|
||||
c for c in cycles
|
||||
if c.get("type") == "test" or "test" in c.get("notes", "").lower()
|
||||
]
|
||||
|
||||
return {
|
||||
"total_tests_passed": total_tests_passed,
|
||||
"total_tests_added": total_tests_added,
|
||||
"avg_tests_per_cycle": avg_tests_per_cycle,
|
||||
"test_focused_cycles": len(test_focused),
|
||||
}
|
||||
|
||||
|
||||
def analyze_triage_shifts(triage_data: dict) -> dict:
|
||||
"""Analyze shifts in triage patterns."""
|
||||
insights = triage_data.get("latest_insights", {})
|
||||
recommendations = insights.get("recommendations", [])
|
||||
|
||||
high_priority_recs = [
|
||||
r for r in recommendations
|
||||
if r.get("severity") == "high"
|
||||
]
|
||||
|
||||
return {
|
||||
"triage_runs": triage_data.get("triage_runs", 0),
|
||||
"insights_generated": insights.get("generated_at") is not None,
|
||||
"high_priority_recommendations": len(high_priority_recs),
|
||||
"recent_recommendations": recommendations[:3] if recommendations else [],
|
||||
}
|
||||
|
||||
|
||||
def generate_vibe_summary(
|
||||
cycles_data: dict,
|
||||
issues_data: dict,
|
||||
prs_data: dict,
|
||||
themes: dict,
|
||||
agent_contrib: dict,
|
||||
test_shifts: dict,
|
||||
triage_shifts: dict,
|
||||
) -> dict:
|
||||
"""Generate the human-readable 'vibe' summary."""
|
||||
# Determine overall vibe
|
||||
success_rate = cycles_data.get("success_rate", 0)
|
||||
failures = cycles_data.get("failures", 0)
|
||||
closed_count = issues_data.get("closed_count", 0)
|
||||
merged_count = prs_data.get("merged_count", 0)
|
||||
|
||||
if success_rate >= 0.9 and closed_count > 0:
|
||||
vibe = "productive"
|
||||
vibe_description = "A strong week with solid delivery and healthy success rates."
|
||||
elif success_rate >= 0.7:
|
||||
vibe = "steady"
|
||||
vibe_description = "Steady progress with some bumps. Things are moving forward."
|
||||
elif failures > cycles_data.get("successes", 0):
|
||||
vibe = "struggling"
|
||||
vibe_description = "A challenging week with more failures than successes. Time to regroup."
|
||||
else:
|
||||
vibe = "quiet"
|
||||
vibe_description = "A lighter week with limited activity."
|
||||
|
||||
# Focus areas from themes
|
||||
focus_areas = []
|
||||
for layer in themes.get("layers", [])[:3]:
|
||||
focus_areas.append(f"{layer['name']} ({layer['count']} items)")
|
||||
|
||||
# Agent activity summary
|
||||
agent_summary = ""
|
||||
active_assignees = agent_contrib.get("active_assignees", [])
|
||||
if active_assignees:
|
||||
top_agent = active_assignees[0]
|
||||
agent_summary = f"{top_agent['login']} led with {top_agent['issues_count']} assigned issues."
|
||||
|
||||
# Notable events
|
||||
notable = []
|
||||
if merged_count > 5:
|
||||
notable.append(f"{merged_count} PRs merged — high integration velocity")
|
||||
if triage_shifts.get("high_priority_recommendations", 0) > 0:
|
||||
notable.append("High-priority recommendations from loop introspection")
|
||||
if test_shifts.get("test_focused_cycles", 0) > 3:
|
||||
notable.append("Strong test coverage focus")
|
||||
if not notable:
|
||||
notable.append("Regular development flow")
|
||||
|
||||
return {
|
||||
"overall": vibe,
|
||||
"description": vibe_description,
|
||||
"focus_areas": focus_areas,
|
||||
"agent_summary": agent_summary,
|
||||
"notable_events": notable,
|
||||
}
|
||||
|
||||
|
||||
# ── Narrative Generation ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
def generate_narrative(
|
||||
cycles_data: dict,
|
||||
issues_data: dict,
|
||||
prs_data: dict,
|
||||
triage_data: dict,
|
||||
themes: dict,
|
||||
agent_contrib: dict,
|
||||
test_shifts: dict,
|
||||
triage_shifts: dict,
|
||||
token_data: dict,
|
||||
since: datetime,
|
||||
until: datetime,
|
||||
) -> dict:
|
||||
"""Generate the complete weekly narrative."""
|
||||
vibe = generate_vibe_summary(
|
||||
cycles_data, issues_data, prs_data, themes, agent_contrib, test_shifts, triage_shifts
|
||||
)
|
||||
|
||||
return {
|
||||
"generated_at": datetime.now(UTC).isoformat(),
|
||||
"period": {
|
||||
"start": since.isoformat(),
|
||||
"end": until.isoformat(),
|
||||
"days": 7,
|
||||
},
|
||||
"vibe": vibe,
|
||||
"activity": {
|
||||
"cycles": {
|
||||
"total": cycles_data.get("total", 0),
|
||||
"successes": cycles_data.get("successes", 0),
|
||||
"failures": cycles_data.get("failures", 0),
|
||||
"success_rate": cycles_data.get("success_rate", 0),
|
||||
},
|
||||
"issues": {
|
||||
"touched": issues_data.get("touched_count", 0),
|
||||
"closed": issues_data.get("closed_count", 0),
|
||||
"opened": issues_data.get("opened_count", 0),
|
||||
},
|
||||
"pull_requests": {
|
||||
"touched": prs_data.get("touched_count", 0),
|
||||
"merged": prs_data.get("merged_count", 0),
|
||||
"opened": prs_data.get("opened_count", 0),
|
||||
},
|
||||
},
|
||||
"themes": themes,
|
||||
"agents": agent_contrib,
|
||||
"test_health": test_shifts,
|
||||
"triage_health": triage_shifts,
|
||||
"token_economy": token_data,
|
||||
}
|
||||
|
||||
|
||||
def generate_markdown_summary(narrative: dict) -> str:
|
||||
"""Generate a human-readable markdown summary."""
|
||||
vibe = narrative.get("vibe", {})
|
||||
activity = narrative.get("activity", {})
|
||||
cycles = activity.get("cycles", {})
|
||||
issues = activity.get("issues", {})
|
||||
prs = activity.get("pull_requests", {})
|
||||
|
||||
lines = [
|
||||
"# Weekly Narrative Summary",
|
||||
"",
|
||||
f"**Period:** {narrative['period']['start'][:10]} to {narrative['period']['end'][:10]}",
|
||||
f"**Vibe:** {vibe.get('overall', 'unknown').title()}",
|
||||
"",
|
||||
f"{vibe.get('description', '')}",
|
||||
"",
|
||||
"## Activity Highlights",
|
||||
"",
|
||||
f"- **Development Cycles:** {cycles.get('total', 0)} total ({cycles.get('successes', 0)} success, {cycles.get('failures', 0)} failure)",
|
||||
f"- **Issues:** {issues.get('closed', 0)} closed, {issues.get('opened', 0)} opened",
|
||||
f"- **Pull Requests:** {prs.get('merged', 0)} merged, {prs.get('opened', 0)} opened",
|
||||
"",
|
||||
]
|
||||
|
||||
# Focus areas
|
||||
focus = vibe.get("focus_areas", [])
|
||||
if focus:
|
||||
lines.append("## Focus Areas")
|
||||
lines.append("")
|
||||
for area in focus:
|
||||
lines.append(f"- {area}")
|
||||
lines.append("")
|
||||
|
||||
# Agent contributions
|
||||
agent_summary = vibe.get("agent_summary", "")
|
||||
if agent_summary:
|
||||
lines.append("## Agent Activity")
|
||||
lines.append("")
|
||||
lines.append(agent_summary)
|
||||
lines.append("")
|
||||
|
||||
# Notable events
|
||||
notable = vibe.get("notable_events", [])
|
||||
if notable:
|
||||
lines.append("## Notable Events")
|
||||
lines.append("")
|
||||
for event in notable:
|
||||
lines.append(f"- {event}")
|
||||
lines.append("")
|
||||
|
||||
# Triage health
|
||||
triage = narrative.get("triage_health", {})
|
||||
if triage.get("high_priority_recommendations", 0) > 0:
|
||||
lines.append("## Triage Notes")
|
||||
lines.append("")
|
||||
lines.append(f"⚠️ {triage['high_priority_recommendations']} high-priority recommendation(s) from loop introspection.")
|
||||
lines.append("")
|
||||
for rec in triage.get("recent_recommendations", [])[:2]:
|
||||
lines.append(f"- **{rec.get('category', 'general')}:** {rec.get('finding', '')}")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ── Main ───────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(
|
||||
description="Generate weekly narrative summary of work and vibes",
|
||||
)
|
||||
p.add_argument(
|
||||
"--json", "-j",
|
||||
action="store_true",
|
||||
help="Output as JSON instead of markdown",
|
||||
)
|
||||
p.add_argument(
|
||||
"--output", "-o",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Output file path (default from config)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--days",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Override lookback days (default 7)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--force",
|
||||
action="store_true",
|
||||
help="Run even if disabled in config",
|
||||
)
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
config = load_automation_config()
|
||||
|
||||
# Check if enabled
|
||||
if not config.get("enabled", True) and not args.force:
|
||||
print("[weekly_narrative] Skipped — weekly narrative is disabled in config")
|
||||
print("[weekly_narrative] Use --force to run anyway")
|
||||
return 0
|
||||
|
||||
# Determine lookback period
|
||||
days = args.days if args.days is not None else config.get("lookback_days", 7)
|
||||
until = datetime.now(UTC)
|
||||
since = until - timedelta(days=days)
|
||||
|
||||
print(f"[weekly_narrative] Generating narrative for the past {days} days...")
|
||||
|
||||
# Setup Gitea client
|
||||
token = get_token(config)
|
||||
client = GiteaClient(config, token)
|
||||
|
||||
if not client.is_available():
|
||||
print("[weekly_narrative] Warning: Gitea API unavailable — will use local data only")
|
||||
|
||||
# Collect data
|
||||
cycles_data = collect_cycles_data(since)
|
||||
issues_data = collect_issues_data(client, since)
|
||||
prs_data = collect_prs_data(client, since)
|
||||
triage_data = collect_triage_data(since)
|
||||
token_data = collect_token_data(since)
|
||||
|
||||
# Analyze
|
||||
themes = extract_themes(issues_data.get("issues", []))
|
||||
agent_contrib = extract_agent_contributions(
|
||||
issues_data.get("issues", []),
|
||||
prs_data.get("prs", []),
|
||||
cycles_data.get("cycles", []),
|
||||
)
|
||||
test_shifts = analyze_test_shifts(cycles_data.get("cycles", []))
|
||||
triage_shifts = analyze_triage_shifts(triage_data)
|
||||
|
||||
# Generate narrative
|
||||
narrative = generate_narrative(
|
||||
cycles_data,
|
||||
issues_data,
|
||||
prs_data,
|
||||
triage_data,
|
||||
themes,
|
||||
agent_contrib,
|
||||
test_shifts,
|
||||
triage_shifts,
|
||||
token_data,
|
||||
since,
|
||||
until,
|
||||
)
|
||||
|
||||
# Determine output path
|
||||
output_path = args.output or config.get("output_file", ".loop/weekly_narrative.json")
|
||||
output_file = REPO_ROOT / output_path
|
||||
output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Write JSON output
|
||||
output_file.write_text(json.dumps(narrative, indent=2) + "\n")
|
||||
|
||||
# Write markdown summary alongside JSON
|
||||
md_output_file = output_file.with_suffix(".md")
|
||||
md_output_file.write_text(generate_markdown_summary(narrative))
|
||||
|
||||
# Print output
|
||||
if args.json:
|
||||
print(json.dumps(narrative, indent=2))
|
||||
else:
|
||||
print()
|
||||
print(generate_markdown_summary(narrative))
|
||||
|
||||
print(f"\n[weekly_narrative] Written to: {output_file}")
|
||||
print(f"[weekly_narrative] Markdown summary: {md_output_file}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
6
timmy_automations/utils/__init__.py
Normal file
6
timmy_automations/utils/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""Timmy Automations utilities.
|
||||
|
||||
Shared helper modules for automations.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
389
timmy_automations/utils/token_rules.py
Normal file
389
timmy_automations/utils/token_rules.py
Normal file
@@ -0,0 +1,389 @@
|
||||
"""Token rules helper — Compute token deltas for agent actions.
|
||||
|
||||
This module loads token economy configuration from YAML and provides
|
||||
functions for automations to compute token rewards/penalties.
|
||||
|
||||
Usage:
|
||||
from timmy_automations.utils.token_rules import TokenRules
|
||||
|
||||
rules = TokenRules()
|
||||
delta = rules.get_delta("pr_merged")
|
||||
print(f"PR merge reward: {delta}") # 10
|
||||
|
||||
# Check if agent can perform sensitive operation
|
||||
can_merge = rules.check_gate("pr_merge", current_tokens=25)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass
|
||||
class TokenEvent:
|
||||
"""Represents a single token event configuration."""
|
||||
|
||||
name: str
|
||||
description: str
|
||||
reward: int
|
||||
penalty: int
|
||||
category: str
|
||||
gate_threshold: int | None = None
|
||||
|
||||
@property
|
||||
def delta(self) -> int:
|
||||
"""Net token delta (reward + penalty)."""
|
||||
return self.reward + self.penalty
|
||||
|
||||
|
||||
@dataclass
|
||||
class TokenCategoryLimits:
|
||||
"""Daily limits for a token category."""
|
||||
|
||||
max_earn: int
|
||||
max_spend: int
|
||||
|
||||
|
||||
class TokenRules:
|
||||
"""Token economy rules loader and calculator.
|
||||
|
||||
Loads configuration from timmy_automations/config/token_rules.yaml
|
||||
and provides methods to compute token deltas and check gating.
|
||||
"""
|
||||
|
||||
CONFIG_PATH = Path(__file__).parent.parent / "config" / "token_rules.yaml"
|
||||
|
||||
def __init__(self, config_path: Path | None = None) -> None:
|
||||
"""Initialize token rules from configuration file.
|
||||
|
||||
Args:
|
||||
config_path: Optional override for config file location.
|
||||
"""
|
||||
self._config_path = config_path or self.CONFIG_PATH
|
||||
self._events: dict[str, TokenEvent] = {}
|
||||
self._gating: dict[str, int] = {}
|
||||
self._daily_limits: dict[str, TokenCategoryLimits] = {}
|
||||
self._audit: dict[str, Any] = {}
|
||||
self._version: str = "unknown"
|
||||
self._load_config()
|
||||
|
||||
def _load_config(self) -> None:
|
||||
"""Load configuration from YAML file."""
|
||||
# Graceful degradation if yaml not available or file missing
|
||||
try:
|
||||
import yaml
|
||||
except ImportError:
|
||||
# YAML not installed, use fallback defaults
|
||||
self._load_fallback_defaults()
|
||||
return
|
||||
|
||||
if not self._config_path.exists():
|
||||
self._load_fallback_defaults()
|
||||
return
|
||||
|
||||
try:
|
||||
config = yaml.safe_load(self._config_path.read_text())
|
||||
if not config:
|
||||
self._load_fallback_defaults()
|
||||
return
|
||||
|
||||
self._version = config.get("version", "unknown")
|
||||
self._parse_events(config.get("events", {}))
|
||||
self._parse_gating(config.get("gating_thresholds", {}))
|
||||
self._parse_daily_limits(config.get("daily_limits", {}))
|
||||
self._audit = config.get("audit", {})
|
||||
|
||||
except Exception:
|
||||
# Any error loading config, use fallbacks
|
||||
self._load_fallback_defaults()
|
||||
|
||||
def _load_fallback_defaults(self) -> None:
|
||||
"""Load minimal fallback defaults if config unavailable."""
|
||||
self._version = "fallback"
|
||||
self._events = {
|
||||
"pr_merged": TokenEvent(
|
||||
name="pr_merged",
|
||||
description="Successfully merged a pull request",
|
||||
reward=10,
|
||||
penalty=0,
|
||||
category="merge",
|
||||
gate_threshold=0,
|
||||
),
|
||||
"test_fixed": TokenEvent(
|
||||
name="test_fixed",
|
||||
description="Fixed a failing test",
|
||||
reward=8,
|
||||
penalty=0,
|
||||
category="test",
|
||||
),
|
||||
"automation_failure": TokenEvent(
|
||||
name="automation_failure",
|
||||
description="Automation failed",
|
||||
reward=0,
|
||||
penalty=-2,
|
||||
category="operation",
|
||||
),
|
||||
}
|
||||
self._gating = {"pr_merge": 0}
|
||||
self._daily_limits = {}
|
||||
self._audit = {"log_all_transactions": True}
|
||||
|
||||
def _parse_events(self, events_config: dict) -> None:
|
||||
"""Parse event configurations from YAML."""
|
||||
for name, config in events_config.items():
|
||||
if not isinstance(config, dict):
|
||||
continue
|
||||
|
||||
self._events[name] = TokenEvent(
|
||||
name=name,
|
||||
description=config.get("description", ""),
|
||||
reward=config.get("reward", 0),
|
||||
penalty=config.get("penalty", 0),
|
||||
category=config.get("category", "unknown"),
|
||||
gate_threshold=config.get("gate_threshold"),
|
||||
)
|
||||
|
||||
def _parse_gating(self, gating_config: dict) -> None:
|
||||
"""Parse gating thresholds from YAML."""
|
||||
for name, threshold in gating_config.items():
|
||||
if isinstance(threshold, int):
|
||||
self._gating[name] = threshold
|
||||
|
||||
def _parse_daily_limits(self, limits_config: dict) -> None:
|
||||
"""Parse daily limits from YAML."""
|
||||
for category, limits in limits_config.items():
|
||||
if isinstance(limits, dict):
|
||||
self._daily_limits[category] = TokenCategoryLimits(
|
||||
max_earn=limits.get("max_earn", 0),
|
||||
max_spend=limits.get("max_spend", 0),
|
||||
)
|
||||
|
||||
def get_delta(self, event_name: str) -> int:
|
||||
"""Get token delta for an event.
|
||||
|
||||
Args:
|
||||
event_name: Name of the event (e.g., "pr_merged", "test_fixed")
|
||||
|
||||
Returns:
|
||||
Net token delta (positive for reward, negative for penalty)
|
||||
"""
|
||||
event = self._events.get(event_name)
|
||||
if event:
|
||||
return event.delta
|
||||
return 0
|
||||
|
||||
def get_event(self, event_name: str) -> TokenEvent | None:
|
||||
"""Get full event configuration.
|
||||
|
||||
Args:
|
||||
event_name: Name of the event
|
||||
|
||||
Returns:
|
||||
TokenEvent object or None if not found
|
||||
"""
|
||||
return self._events.get(event_name)
|
||||
|
||||
def list_events(self, category: str | None = None) -> list[TokenEvent]:
|
||||
"""List all configured events.
|
||||
|
||||
Args:
|
||||
category: Optional category filter
|
||||
|
||||
Returns:
|
||||
List of TokenEvent objects
|
||||
"""
|
||||
events = list(self._events.values())
|
||||
if category:
|
||||
events = [e for e in events if e.category == category]
|
||||
return events
|
||||
|
||||
def check_gate(self, operation: str, current_tokens: int) -> bool:
|
||||
"""Check if agent meets token threshold for an operation.
|
||||
|
||||
Args:
|
||||
operation: Operation name (e.g., "pr_merge")
|
||||
current_tokens: Agent's current token balance
|
||||
|
||||
Returns:
|
||||
True if agent can perform the operation
|
||||
"""
|
||||
threshold = self._gating.get(operation)
|
||||
if threshold is None:
|
||||
return True # No gate defined, allow
|
||||
return current_tokens >= threshold
|
||||
|
||||
def get_gate_threshold(self, operation: str) -> int | None:
|
||||
"""Get the gating threshold for an operation.
|
||||
|
||||
Args:
|
||||
operation: Operation name
|
||||
|
||||
Returns:
|
||||
Threshold value or None if no gate defined
|
||||
"""
|
||||
return self._gating.get(operation)
|
||||
|
||||
def get_daily_limits(self, category: str) -> TokenCategoryLimits | None:
|
||||
"""Get daily limits for a category.
|
||||
|
||||
Args:
|
||||
category: Category name
|
||||
|
||||
Returns:
|
||||
TokenCategoryLimits or None if not defined
|
||||
"""
|
||||
return self._daily_limits.get(category)
|
||||
|
||||
def compute_transaction(
|
||||
self,
|
||||
event_name: str,
|
||||
current_tokens: int = 0,
|
||||
current_daily_earned: dict[str, int] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Compute a complete token transaction.
|
||||
|
||||
This is the main entry point for agents to use. It returns
|
||||
a complete transaction record with delta, gating check, and limits.
|
||||
|
||||
Args:
|
||||
event_name: Name of the event
|
||||
current_tokens: Agent's current token balance
|
||||
current_daily_earned: Dict of category -> tokens earned today
|
||||
|
||||
Returns:
|
||||
Transaction dict with:
|
||||
- event: Event name
|
||||
- delta: Token delta
|
||||
- allowed: Whether operation is allowed (gating)
|
||||
- new_balance: Projected new balance
|
||||
- limit_reached: Whether daily limit would be exceeded
|
||||
"""
|
||||
event = self._events.get(event_name)
|
||||
if not event:
|
||||
return {
|
||||
"event": event_name,
|
||||
"delta": 0,
|
||||
"allowed": False,
|
||||
"reason": "unknown_event",
|
||||
"new_balance": current_tokens,
|
||||
"limit_reached": False,
|
||||
}
|
||||
|
||||
delta = event.delta
|
||||
new_balance = current_tokens + delta
|
||||
|
||||
# Check gating (for penalties, we don't check gates)
|
||||
allowed = True
|
||||
gate_reason = None
|
||||
if delta > 0 and event.gate_threshold is not None: # Only check gates for positive operations with thresholds
|
||||
allowed = current_tokens >= event.gate_threshold
|
||||
if not allowed:
|
||||
gate_reason = f"requires {event.gate_threshold} tokens"
|
||||
|
||||
# Check daily limits
|
||||
limit_reached = False
|
||||
limit_reason = None
|
||||
if current_daily_earned and event.category in current_daily_earned:
|
||||
limits = self._daily_limits.get(event.category)
|
||||
if limits:
|
||||
current_earned = current_daily_earned.get(event.category, 0)
|
||||
if delta > 0 and current_earned + delta > limits.max_earn:
|
||||
limit_reached = True
|
||||
limit_reason = f"daily earn limit ({limits.max_earn}) reached"
|
||||
|
||||
result = {
|
||||
"event": event_name,
|
||||
"delta": delta,
|
||||
"category": event.category,
|
||||
"allowed": allowed and not limit_reached,
|
||||
"new_balance": new_balance,
|
||||
"limit_reached": limit_reached,
|
||||
}
|
||||
|
||||
if gate_reason:
|
||||
result["gate_reason"] = gate_reason
|
||||
if limit_reason:
|
||||
result["limit_reason"] = limit_reason
|
||||
|
||||
return result
|
||||
|
||||
def get_config_version(self) -> str:
|
||||
"""Get the loaded configuration version."""
|
||||
return self._version
|
||||
|
||||
def get_categories(self) -> list[str]:
|
||||
"""Get list of all configured categories."""
|
||||
categories = {e.category for e in self._events.values()}
|
||||
return sorted(categories)
|
||||
|
||||
def is_auditable(self) -> bool:
|
||||
"""Check if transactions should be logged for audit."""
|
||||
return self._audit.get("log_all_transactions", True)
|
||||
|
||||
|
||||
# Convenience functions for simple use cases
|
||||
|
||||
def get_token_delta(event_name: str) -> int:
|
||||
"""Get token delta for an event (convenience function).
|
||||
|
||||
Args:
|
||||
event_name: Name of the event
|
||||
|
||||
Returns:
|
||||
Token delta (positive for reward, negative for penalty)
|
||||
"""
|
||||
return TokenRules().get_delta(event_name)
|
||||
|
||||
|
||||
def check_operation_gate(operation: str, current_tokens: int) -> bool:
|
||||
"""Check if agent can perform operation (convenience function).
|
||||
|
||||
Args:
|
||||
operation: Operation name
|
||||
current_tokens: Agent's current token balance
|
||||
|
||||
Returns:
|
||||
True if operation is allowed
|
||||
"""
|
||||
return TokenRules().check_gate(operation, current_tokens)
|
||||
|
||||
|
||||
def compute_token_reward(
|
||||
event_name: str,
|
||||
current_tokens: int = 0,
|
||||
) -> dict[str, Any]:
|
||||
"""Compute token reward for an event (convenience function).
|
||||
|
||||
Args:
|
||||
event_name: Name of the event
|
||||
current_tokens: Agent's current token balance
|
||||
|
||||
Returns:
|
||||
Transaction dict with delta, allowed status, new balance
|
||||
"""
|
||||
return TokenRules().compute_transaction(event_name, current_tokens)
|
||||
|
||||
|
||||
def list_token_events(category: str | None = None) -> list[dict[str, Any]]:
|
||||
"""List all token events (convenience function).
|
||||
|
||||
Args:
|
||||
category: Optional category filter
|
||||
|
||||
Returns:
|
||||
List of event dicts with name, description, delta, category
|
||||
"""
|
||||
rules = TokenRules()
|
||||
events = rules.list_events(category)
|
||||
return [
|
||||
{
|
||||
"name": e.name,
|
||||
"description": e.description,
|
||||
"delta": e.delta,
|
||||
"category": e.category,
|
||||
"gate_threshold": e.gate_threshold,
|
||||
}
|
||||
for e in events
|
||||
]
|
||||
Reference in New Issue
Block a user