Compare commits

...

9 Commits

Author SHA1 Message Date
kimi
36de0b491d feat(matrix-ui): add Fund Session modal with explanatory text about sats
- Add Fund Session button to Matrix UI overlay (green plus icon)
- Create modal with three explanatory sections:
  * What are Sats? - explains satoshis as smallest Bitcoin unit
  * Why Fund Your Session? - explains how sats power Workshop AI agents
  * Approximate Costs - shows cost ranges for different interaction types
- Add input field for funding amount (min 100 sats)
- Style modal with green theme to match Lightning/sats concept
- Add proper keyboard support (Enter to submit, Escape to close)
- Mobile-responsive design

Fixes #753
2026-03-21 18:21:00 -04:00
e99b09f700 [kimi] Add About/Info panel to Matrix UI (#755) (#831) 2026-03-21 22:06:18 +00:00
2ab6539564 [kimi] Add ConnectionPool class with unit tests (#769) (#830) 2026-03-21 22:02:08 +00:00
28b8673584 [kimi] Add unit tests for voice_tts.py (#768) (#829) 2026-03-21 21:56:45 +00:00
2f15435fed [kimi] Implement quick health snapshot before coding (#710) (#828) 2026-03-21 21:53:40 +00:00
dfe40f5fe6 [kimi] Centralize agent token rules and hooks for automations (#711) (#792) 2026-03-21 21:44:35 +00:00
6dd48685e7 [kimi] Weekly narrative summary generator (#719) (#791) 2026-03-21 21:36:40 +00:00
a95cf806c8 [kimi] Implement token quest system for agents (#713) (#789) 2026-03-21 20:45:35 +00:00
19367d6e41 [kimi] OpenClaw architecture and deployment research report (#721) (#788) 2026-03-21 20:36:23 +00:00
30 changed files with 7378 additions and 3 deletions

1
.gitignore vendored
View File

@@ -73,7 +73,6 @@ morning_briefing.txt
markdown_report.md
data/timmy_soul.jsonl
scripts/migrate_to_zeroclaw.py
src/infrastructure/db_pool.py
workspace/
# Loop orchestration state

178
config/quests.yaml Normal file
View File

@@ -0,0 +1,178 @@
# ── Token Quest System Configuration ─────────────────────────────────────────
#
# Quests are special objectives that agents (and humans) can complete for
# bonus tokens. Each quest has:
# - id: Unique identifier
# - name: Display name
# - description: What the quest requires
# - reward_tokens: Number of tokens awarded on completion
# - criteria: Detection rules for completion
# - enabled: Whether this quest is active
# - repeatable: Whether this quest can be completed multiple times
# - cooldown_hours: Minimum hours between completions (if repeatable)
#
# Quest Types:
# - issue_count: Complete when N issues matching criteria are closed
# - issue_reduce: Complete when open issue count drops by N
# - docs_update: Complete when documentation files are updated
# - test_improve: Complete when test coverage/cases improve
# - daily_run: Complete Daily Run session objectives
# - custom: Special quests with manual completion
#
# ── Active Quests ─────────────────────────────────────────────────────────────
quests:
# ── Daily Run & Test Improvement Quests ───────────────────────────────────
close_flaky_tests:
id: close_flaky_tests
name: Flaky Test Hunter
description: Close 3 issues labeled "flaky-test"
reward_tokens: 150
type: issue_count
enabled: true
repeatable: true
cooldown_hours: 24
criteria:
issue_labels:
- flaky-test
target_count: 3
issue_state: closed
lookback_days: 7
notification_message: "Quest Complete! You closed 3 flaky-test issues and earned {tokens} tokens."
reduce_p1_issues:
id: reduce_p1_issues
name: Priority Firefighter
description: Reduce open P1 Daily Run issues by 2
reward_tokens: 200
type: issue_reduce
enabled: true
repeatable: true
cooldown_hours: 48
criteria:
issue_labels:
- layer:triage
- P1
target_reduction: 2
lookback_days: 3
notification_message: "Quest Complete! You reduced P1 issues by 2 and earned {tokens} tokens."
improve_test_coverage:
id: improve_test_coverage
name: Coverage Champion
description: Improve test coverage by 5% or add 10 new test cases
reward_tokens: 300
type: test_improve
enabled: true
repeatable: false
criteria:
coverage_increase_percent: 5
min_new_tests: 10
notification_message: "Quest Complete! You improved test coverage and earned {tokens} tokens."
complete_daily_run_session:
id: complete_daily_run_session
name: Daily Runner
description: Successfully complete 5 Daily Run sessions in a week
reward_tokens: 250
type: daily_run
enabled: true
repeatable: true
cooldown_hours: 168 # 1 week
criteria:
min_sessions: 5
lookback_days: 7
notification_message: "Quest Complete! You completed 5 Daily Run sessions and earned {tokens} tokens."
# ── Documentation & Maintenance Quests ────────────────────────────────────
improve_automation_docs:
id: improve_automation_docs
name: Documentation Hero
description: Improve documentation for automations (update 3+ doc files)
reward_tokens: 100
type: docs_update
enabled: true
repeatable: true
cooldown_hours: 72
criteria:
file_patterns:
- "docs/**/*.md"
- "**/README.md"
- "timmy_automations/**/*.md"
min_files_changed: 3
lookback_days: 7
notification_message: "Quest Complete! You improved automation docs and earned {tokens} tokens."
close_micro_fixes:
id: close_micro_fixes
name: Micro Fix Master
description: Close 5 issues labeled "layer:micro-fix"
reward_tokens: 125
type: issue_count
enabled: true
repeatable: true
cooldown_hours: 24
criteria:
issue_labels:
- layer:micro-fix
target_count: 5
issue_state: closed
lookback_days: 7
notification_message: "Quest Complete! You closed 5 micro-fix issues and earned {tokens} tokens."
# ── Special Achievements ──────────────────────────────────────────────────
first_contribution:
id: first_contribution
name: First Steps
description: Make your first contribution (close any issue)
reward_tokens: 50
type: issue_count
enabled: true
repeatable: false
criteria:
target_count: 1
issue_state: closed
lookback_days: 30
notification_message: "Welcome! You completed your first contribution and earned {tokens} tokens."
bug_squasher:
id: bug_squasher
name: Bug Squasher
description: Close 10 issues labeled "bug"
reward_tokens: 500
type: issue_count
enabled: true
repeatable: true
cooldown_hours: 168 # 1 week
criteria:
issue_labels:
- bug
target_count: 10
issue_state: closed
lookback_days: 7
notification_message: "Quest Complete! You squashed 10 bugs and earned {tokens} tokens."
# ── Quest System Settings ───────────────────────────────────────────────────
settings:
# Enable/disable quest notifications
notifications_enabled: true
# Maximum number of concurrent active quests per agent
max_concurrent_quests: 5
# Auto-detect quest completions on Daily Run metrics update
auto_detect_on_daily_run: true
# Gitea issue labels that indicate quest-related work
quest_work_labels:
- layer:triage
- layer:micro-fix
- layer:tests
- layer:economy
- flaky-test
- bug
- documentation

View File

@@ -0,0 +1,912 @@
# OpenClaw Architecture, Deployment Modes, and Ollama Integration
## Research Report for Timmy Time Dashboard Project
**Issue:** #721 — [Kimi Research] OpenClaw architecture, deployment modes, and Ollama integration
**Date:** 2026-03-21
**Author:** Kimi (Moonshot AI)
**Status:** Complete
---
## Executive Summary
OpenClaw is an open-source AI agent framework that bridges messaging platforms (WhatsApp, Telegram, Slack, Discord, iMessage) to AI coding agents through a centralized gateway. Originally known as Clawdbot and Moltbot, it was rebranded to OpenClaw in early 2026. This report provides a comprehensive analysis of OpenClaw's architecture, deployment options, Ollama integration capabilities, and suitability for deployment on resource-constrained VPS environments like the Hermes DigitalOcean droplet (2GB RAM / 1 vCPU).
**Key Finding:** Running OpenClaw with local LLMs on a 2GB RAM VPS is **not recommended**. The absolute minimum for a text-only agent with external API models is 4GB RAM. For local model inference via Ollama, 8-16GB RAM is the practical minimum. A hybrid approach using OpenRouter as the primary provider with Ollama as fallback is the most viable configuration for small VPS deployments.
---
## 1. Architecture Overview
### 1.1 Core Components
OpenClaw follows a **hub-and-spoke (轴辐式)** architecture optimized for multi-agent task execution:
```
┌─────────────────────────────────────────────────────────────────────────┐
│ OPENCLAW ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ WhatsApp │ │ Telegram │ │ Discord │ │
│ │ Channel │ │ Channel │ │ Channel │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └────────────────────┼────────────────────┘ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Gateway │◄─────── WebSocket/API │
│ │ (Port 18789) │ Control Plane │
│ └────────┬─────────┘ │
│ │ │
│ ┌──────────────┼──────────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Agent A │ │ Agent B │ │ Pi Agent│ │
│ │ (main) │ │ (coder) │ │(delegate)│ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ ▼ │
│ ┌────────────────────────┐ │
│ │ LLM Router │ │
│ │ (Primary/Fallback) │ │
│ └───────────┬────────────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Ollama │ │ OpenAI │ │Anthropic│ │
│ │(local) │ │(cloud) │ │(cloud) │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │ ┌─────┐ │
│ └────────────────────────────────────────────────────►│ MCP │ │
│ │Tools│ │
│ └─────┘ │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Memory │ │ Skills │ │ Workspace │ │
│ │ (SOUL.md) │ │ (SKILL.md) │ │ (sessions) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
### 1.2 Component Deep Dive
| Component | Purpose | Configuration File |
|-----------|---------|-------------------|
| **Gateway** | Central control plane, WebSocket/API server, session management | `gateway` section in `openclaw.json` |
| **Pi Agent** | Core agent runner, "指挥中心" - schedules LLM calls, tool execution, error handling | `agents` section in `openclaw.json` |
| **Channels** | Messaging platform integrations (Telegram, WhatsApp, Slack, Discord, iMessage) | `channels` section in `openclaw.json` |
| **SOUL.md** | Agent persona definition - personality, communication style, behavioral guidelines | `~/.openclaw/workspace/SOUL.md` |
| **AGENTS.md** | Multi-agent configuration, routing rules, agent specialization definitions | `~/.openclaw/workspace/AGENTS.md` |
| **Workspace** | File system for agent state, session data, temporary files | `~/.openclaw/workspace/` |
| **Skills** | Bundled tools, prompts, configurations that teach agents specific tasks | `~/.openclaw/workspace/skills/` |
| **Sessions** | Conversation history, context persistence between interactions | `~/.openclaw/agents/<agent>/sessions/` |
| **MCP Tools** | Model Context Protocol integration for external tool access | Via `mcporter` or native MCP |
### 1.3 Agent Runner Execution Flow
According to OpenClaw documentation, a complete agent run follows these stages:
1. **Queuing** - Session-level queue (serializes same-session requests) → Global queue (controls total concurrency)
2. **Preparation** - Parse workspace, provider/model, thinking level parameters
3. **Plugin Loading** - Load relevant skills based on task context
4. **Memory Retrieval** - Fetch relevant context from SOUL.md and conversation history
5. **LLM Inference** - Send prompt to configured provider with tool definitions
6. **Tool Execution** - Execute any tool calls returned by the LLM
7. **Response Generation** - Format and return final response to the channel
8. **Memory Storage** - Persist conversation and results to session storage
---
## 2. Deployment Modes
### 2.1 Comparison Matrix
| Deployment Mode | Best For | Setup Complexity | Resource Overhead | Stability |
|----------------|----------|------------------|-------------------|-----------|
| **npm global** | Development, quick testing | Low | Minimal (~200MB) | Moderate |
| **Docker** | Production, isolation, reproducibility | Medium | Higher (~2.5GB base image) | High |
| **Docker Compose** | Multi-service stacks, complex setups | Medium-High | Higher | High |
| **Bare metal/systemd** | Maximum performance, dedicated hardware | High | Minimal | Moderate |
### 2.2 NPM Global Installation (Recommended for Quick Start)
```bash
# One-line installer
curl -fsSL https://openclaw.ai/install.sh | bash
# Or manual npm install
npm install -g openclaw
# Initialize configuration
openclaw onboard
# Start gateway
openclaw gateway
```
**Pros:**
- Fastest setup (~30 seconds)
- Direct access to host resources
- Easy updates via `npm update -g openclaw`
**Cons:**
- Node.js 22+ dependency required
- No process isolation
- Manual dependency management
### 2.3 Docker Deployment (Recommended for Production)
```bash
# Pull and run
docker pull openclaw/openclaw:latest
docker run -d \
--name openclaw \
-p 127.0.0.1:18789:18789 \
-v ~/.openclaw:/root/.openclaw \
-e ANTHROPIC_API_KEY=sk-ant-... \
openclaw/openclaw:latest
# Or with Docker Compose
docker compose -f compose.yml --env-file .env up -d --build
```
**Docker Compose Configuration (production-ready):**
```yaml
version: '3.8'
services:
openclaw:
image: openclaw/openclaw:latest
container_name: openclaw
restart: unless-stopped
ports:
- "127.0.0.1:18789:18789" # Never expose to 0.0.0.0
volumes:
- ./openclaw-data:/root/.openclaw
- ./workspace:/root/.openclaw/workspace
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- OPENROUTER_API_KEY=${OPENROUTER_API_KEY}
- OLLAMA_API_KEY=ollama-local
networks:
- openclaw-net
# Resource limits for small VPS
deploy:
resources:
limits:
cpus: '1.5'
memory: 3G
reservations:
cpus: '0.5'
memory: 1G
networks:
openclaw-net:
driver: bridge
```
### 2.4 Bare Metal / Systemd Installation
For running as a system service on Linux:
```bash
# Create systemd service
sudo tee /etc/systemd/system/openclaw.service > /dev/null <<EOF
[Unit]
Description=OpenClaw Gateway
After=network.target
[Service]
Type=simple
User=openclaw
Group=openclaw
WorkingDirectory=/home/openclaw
Environment="PATH=/usr/local/bin:/usr/bin:/bin"
Environment="NODE_ENV=production"
Environment="ANTHROPIC_API_KEY=sk-ant-..."
ExecStart=/usr/local/bin/openclaw gateway
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
sudo systemctl daemon-reload
sudo systemctl enable openclaw
sudo systemctl start openclaw
```
### 2.5 Recommended Deployment for 2GB RAM VPS
**⚠️ Critical Finding:** OpenClaw's official minimum is 4GB RAM. On a 2GB VPS:
1. **Do NOT run local LLMs** - Use external API providers exclusively
2. **Use npm installation** - Docker overhead is too heavy
3. **Disable browser automation** - Chromium requires 2-4GB alone
4. **Enable swap** - Critical for preventing OOM kills
5. **Use OpenRouter** - Cheap/free tier models reduce costs
**Setup script for 2GB VPS:**
```bash
#!/bin/bash
# openclaw-minimal-vps.sh
# Setup for 2GB RAM VPS - EXTERNAL API ONLY
# Create 4GB swap
sudo fallocate -l 4G /swapfile
sudo chmod 600 /swapfile
sudo mkswap /swapfile
sudo swapon /swapfile
echo '/swapfile none swap sw 0 0' | sudo tee -a /etc/fstab
# Install Node.js 22
curl -fsSL https://deb.nodesource.com/setup_22.x | sudo bash -
sudo apt-get install -y nodejs
# Install OpenClaw
npm install -g openclaw
# Configure for minimal resource usage
mkdir -p ~/.openclaw
cat > ~/.openclaw/openclaw.json <<'EOF'
{
"gateway": {
"bind": "127.0.0.1",
"port": 18789,
"mode": "local"
},
"agents": {
"defaults": {
"model": {
"primary": "openrouter/google/gemma-3-4b-it:free",
"fallbacks": [
"openrouter/meta/llama-3.1-8b-instruct:free"
]
},
"maxIterations": 15,
"timeout": 120
}
},
"channels": {
"telegram": {
"enabled": true,
"dmPolicy": "pairing"
}
}
}
EOF
# Set OpenRouter API key
export OPENROUTER_API_KEY="sk-or-v1-..."
# Start gateway
openclaw gateway &
```
---
## 3. Ollama Integration
### 3.1 Architecture
OpenClaw integrates with Ollama through its native `/api/chat` endpoint, supporting both streaming responses and tool calling simultaneously:
```
┌──────────────┐ HTTP/JSON ┌──────────────┐ GGUF/CPU/GPU ┌──────────┐
│ OpenClaw │◄───────────────────►│ Ollama │◄────────────────────►│ Local │
│ Gateway │ /api/chat │ Server │ Model inference │ LLM │
│ │ Port 11434 │ Port 11434 │ │ │
└──────────────┘ └──────────────┘ └──────────┘
```
### 3.2 Configuration
**Basic Ollama Setup:**
```bash
# Install Ollama
curl -fsSL https://ollama.com/install.sh | sh
# Start server
ollama serve
# Pull a tool-capable model
ollama pull qwen2.5-coder:7b
ollama pull llama3.1:8b
# Configure OpenClaw
export OLLAMA_API_KEY="ollama-local" # Any non-empty string works
```
**OpenClaw Configuration for Ollama:**
```json
{
"models": {
"providers": {
"ollama": {
"baseUrl": "http://localhost:11434",
"apiKey": "ollama-local",
"api": "ollama",
"models": [
{
"id": "qwen2.5-coder:7b",
"name": "Qwen 2.5 Coder 7B",
"contextWindow": 32768,
"maxTokens": 8192,
"cost": { "input": 0, "output": 0 }
},
{
"id": "llama3.1:8b",
"name": "Llama 3.1 8B",
"contextWindow": 128000,
"maxTokens": 8192,
"cost": { "input": 0, "output": 0 }
}
]
}
}
},
"agents": {
"defaults": {
"model": {
"primary": "ollama/qwen2.5-coder:7b",
"fallbacks": ["ollama/llama3.1:8b"]
}
}
}
}
```
### 3.3 Context Window Requirements
**⚠️ Critical Requirement:** OpenClaw requires a minimum **64K token context window** for reliable multi-step task execution.
| Model | Parameters | Context Window | Tool Support | OpenClaw Compatible |
|-------|-----------|----------------|--------------|---------------------|
| **llama3.1** | 8B | 128K | ✅ Yes | ✅ Yes |
| **qwen2.5-coder** | 7B | 32K | ✅ Yes | ⚠️ Below minimum |
| **qwen2.5-coder** | 32B | 128K | ✅ Yes | ✅ Yes |
| **gpt-oss** | 20B | 128K | ✅ Yes | ✅ Yes |
| **glm-4.7-flash** | - | 128K | ✅ Yes | ✅ Yes |
| **deepseek-coder-v2** | 33B | 128K | ✅ Yes | ✅ Yes |
| **mistral-small3.1** | - | 128K | ✅ Yes | ✅ Yes |
**Context Window Configuration:**
For models that don't report context window via Ollama's API:
```bash
# Create custom Modelfile with extended context
cat > ~/qwen-custom.modelfile <<EOF
FROM qwen2.5-coder:7b
PARAMETER num_ctx 65536
PARAMETER temperature 0.7
EOF
# Create custom model
ollama create qwen2.5-coder-64k -f ~/qwen-custom.modelfile
```
### 3.4 Models for Small VPS (≤8B Parameters)
For resource-constrained environments (2-4GB RAM):
| Model | Quantization | RAM Required | VRAM Required | Performance |
|-------|-------------|--------------|---------------|-------------|
| **Llama 3.1 8B** | Q4_K_M | ~5GB | ~6GB | Good |
| **Llama 3.2 3B** | Q4_K_M | ~2.5GB | ~3GB | Basic |
| **Qwen 2.5 7B** | Q4_K_M | ~5GB | ~6GB | Good |
| **Qwen 2.5 3B** | Q4_K_M | ~2.5GB | ~3GB | Basic |
| **DeepSeek 7B** | Q4_K_M | ~5GB | ~6GB | Good |
| **Phi-4 4B** | Q4_K_M | ~3GB | ~4GB | Moderate |
**⚠️ Verdict for 2GB VPS:** Running local LLMs is **NOT viable**. Use external APIs only.
---
## 4. OpenRouter Integration (Fallback Strategy)
### 4.1 Overview
OpenRouter provides a unified API gateway to multiple LLM providers, enabling:
- Single API key access to 200+ models
- Automatic failover between providers
- Free tier models for cost-conscious deployments
- Unified billing and usage tracking
### 4.2 Configuration
**Environment Variable Setup:**
```bash
export OPENROUTER_API_KEY="sk-or-v1-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
```
**OpenClaw Configuration:**
```json
{
"models": {
"providers": {
"openrouter": {
"apiKey": "${OPENROUTER_API_KEY}",
"baseUrl": "https://openrouter.ai/api/v1"
}
}
},
"agents": {
"defaults": {
"model": {
"primary": "openrouter/anthropic/claude-sonnet-4-6",
"fallbacks": [
"openrouter/google/gemini-3.1-pro",
"openrouter/meta/llama-3.3-70b-instruct",
"openrouter/google/gemma-3-4b-it:free"
]
}
}
}
}
```
### 4.3 Recommended Free/Cheap Models on OpenRouter
For cost-conscious VPS deployments:
| Model | Cost | Context | Best For |
|-------|------|---------|----------|
| **google/gemma-3-4b-it:free** | Free | 128K | General tasks, simple automation |
| **meta/llama-3.1-8b-instruct:free** | Free | 128K | General tasks, longer contexts |
| **deepseek/deepseek-chat-v3.2** | $0.53/M | 64K | Code generation, reasoning |
| **xiaomi/mimo-v2-flash** | $0.40/M | 128K | Fast responses, basic tasks |
| **qwen/qwen3-coder-next** | $1.20/M | 128K | Code-focused tasks |
### 4.4 Hybrid Configuration (Recommended for Timmy)
A production-ready configuration for the Hermes VPS:
```json
{
"models": {
"providers": {
"openrouter": {
"apiKey": "${OPENROUTER_API_KEY}",
"models": [
{
"id": "google/gemma-3-4b-it:free",
"name": "Gemma 3 4B (Free)",
"contextWindow": 131072,
"maxTokens": 8192,
"cost": { "input": 0, "output": 0 }
},
{
"id": "deepseek/deepseek-chat-v3.2",
"name": "DeepSeek V3.2",
"contextWindow": 64000,
"maxTokens": 8192,
"cost": { "input": 0.00053, "output": 0.00053 }
}
]
},
"ollama": {
"baseUrl": "http://localhost:11434",
"apiKey": "ollama-local",
"models": [
{
"id": "llama3.2:3b",
"name": "Llama 3.2 3B (Local Fallback)",
"contextWindow": 128000,
"maxTokens": 4096,
"cost": { "input": 0, "output": 0 }
}
]
}
}
},
"agents": {
"defaults": {
"model": {
"primary": "openrouter/google/gemma-3-4b-it:free",
"fallbacks": [
"openrouter/deepseek/deepseek-chat-v3.2",
"ollama/llama3.2:3b"
]
},
"maxIterations": 10,
"timeout": 90
}
}
}
```
---
## 5. Hardware Constraints & VPS Viability
### 5.1 System Requirements Summary
| Component | Minimum | Recommended | Notes |
|-----------|---------|-------------|-------|
| **CPU** | 2 vCPU | 4 vCPU | Dedicated preferred over shared |
| **RAM** | 4 GB | 8 GB | 2GB causes OOM with external APIs |
| **Storage** | 40 GB SSD | 80 GB NVMe | Docker images are ~10-15GB |
| **Network** | 100 Mbps | 1 Gbps | For API calls and model downloads |
| **OS** | Ubuntu 22.04/Debian 12 | Ubuntu 24.04 LTS | Linux required for production |
### 5.2 2GB RAM VPS Analysis
**Can it work?** Yes, with severe limitations:
**What works:**
- Text-only agents with external API providers
- Single Telegram/Discord channel
- Basic file operations and shell commands
- No browser automation
**What doesn't work:**
- Local LLM inference via Ollama
- Browser automation (Chromium needs 2-4GB)
- Multiple concurrent channels
- Python environment-heavy skills
**Required mitigations for 2GB VPS:**
```bash
# 1. Create substantial swap
sudo fallocate -l 4G /swapfile
sudo chmod 600 /swapfile
sudo mkswap /swapfile
sudo swapon /swapfile
# 2. Configure swappiness
echo 'vm.swappiness=60' | sudo tee -a /etc/sysctl.conf
sudo sysctl -p
# 3. Limit Node.js memory
export NODE_OPTIONS="--max-old-space-size=1536"
# 4. Use external APIs only - NO OLLAMA
# 5. Disable browser skills
# 6. Set conservative concurrency limits
```
### 5.3 4-bit Quantization Viability
**Qwen 2.5 7B Q4_K_M on 2GB VPS:**
- Model size: ~4.5GB
- RAM required at runtime: ~5-6GB
- **Verdict:** Will cause immediate OOM on 2GB VPS
- **Even with 4GB VPS:** Marginal, heavy swap usage, poor performance
**Viable models for 4GB VPS with Ollama:**
- Llama 3.2 3B Q4_K_M (~2.5GB RAM)
- Qwen 2.5 3B Q4_K_M (~2.5GB RAM)
- Phi-4 4B Q4_K_M (~3GB RAM)
---
## 6. Security Configuration
### 6.1 Network Ports
| Port | Purpose | Exposure |
|------|---------|----------|
| **18789/tcp** | OpenClaw Gateway (WebSocket/HTTP) | **NEVER expose to internet** |
| **11434/tcp** | Ollama API (if running locally) | Localhost only |
| **22/tcp** | SSH | Restrict to known IPs |
**⚠️ CRITICAL:** Never expose port 18789 to the public internet. Use Tailscale or SSH tunnels for remote access.
### 6.2 Tailscale Integration
Tailscale provides zero-configuration VPN mesh for secure remote access:
```bash
# Install Tailscale
curl -fsSL https://tailscale.com/install.sh | sh
sudo tailscale up
# Get Tailscale IP
tailscale ip
# Returns: 100.x.y.z
# Configure OpenClaw to bind to Tailscale
cat > ~/.openclaw/openclaw.json <<EOF
{
"gateway": {
"bind": "tailnet",
"port": 18789
},
"tailscale": {
"mode": "on",
"resetOnExit": false
}
}
EOF
```
**Tailscale vs SSH Tunnel:**
| Feature | Tailscale | SSH Tunnel |
|---------|-----------|------------|
| Setup | Very easy | Moderate |
| Persistence | Automatic | Requires autossh |
| Multiple devices | Built-in | One tunnel per connection |
| NAT traversal | Works | Requires exposed SSH |
| Access control | Tailscale ACL | SSH keys |
### 6.3 Firewall Configuration (UFW)
```bash
# Default deny
sudo ufw default deny incoming
sudo ufw default allow outgoing
# Allow SSH
sudo ufw allow 22/tcp
# Allow Tailscale only (if using)
sudo ufw allow in on tailscale0 to any port 18789
# Block public access to OpenClaw
# (bind is 127.0.0.1, so this is defense in depth)
sudo ufw enable
```
### 6.4 Authentication Configuration
```json
{
"gateway": {
"bind": "127.0.0.1",
"port": 18789,
"auth": {
"mode": "token",
"token": "your-64-char-hex-token-here"
},
"controlUi": {
"allowedOrigins": [
"http://localhost:18789",
"https://your-domain.tailnet-name.ts.net"
],
"allowInsecureAuth": false,
"dangerouslyDisableDeviceAuth": false
}
}
}
```
**Generate secure token:**
```bash
openssl rand -hex 32
```
### 6.5 Sandboxing Considerations
OpenClaw executes arbitrary shell commands and file operations by default. For production:
1. **Run as non-root user:**
```bash
sudo useradd -r -s /bin/false openclaw
sudo mkdir -p /home/openclaw/.openclaw
sudo chown -R openclaw:openclaw /home/openclaw
```
2. **Use Docker for isolation:**
```bash
docker run --security-opt=no-new-privileges \
--cap-drop=ALL \
--read-only \
--tmpfs /tmp:noexec,nosuid,size=100m \
openclaw/openclaw:latest
```
3. **Enable dmPolicy for channels:**
```json
{
"channels": {
"telegram": {
"dmPolicy": "pairing" // Require one-time code for new contacts
}
}
}
```
---
## 7. MCP (Model Context Protocol) Tools
### 7.1 Overview
MCP is an open standard created by Anthropic (donated to Linux Foundation in Dec 2025) that lets AI applications connect to external tools through a universal interface. Think of it as "USB-C for AI."
### 7.2 MCP vs OpenClaw Skills
| Aspect | MCP | OpenClaw Skills |
|--------|-----|-----------------|
| **Protocol** | Standardized (Anthropic) | OpenClaw-specific |
| **Isolation** | Process-isolated | Runs in agent context |
| **Security** | Higher (sandboxed) | Lower (full system access) |
| **Discovery** | Automatic via protocol | Manual via SKILL.md |
| **Ecosystem** | 10,000+ servers | 5400+ skills |
**Note:** OpenClaw currently has limited native MCP support. Use `mcporter` tool for MCP integration.
### 7.3 Using MCPorter (MCP Bridge)
```bash
# Install mcporter
clawhub install mcporter
# Configure MCP server
mcporter config add github \
--url "https://api.github.com/mcp" \
--token "ghp_..."
# List available tools
mcporter list
# Call MCP tool
mcporter call github.list_repos --owner "rockachopa"
```
### 7.4 Popular MCP Servers
| Server | Purpose | Integration |
|--------|---------|-------------|
| **GitHub** | Repo management, PRs, issues | `mcp-github` |
| **Slack** | Messaging, channel management | `mcp-slack` |
| **PostgreSQL** | Database queries | `mcp-postgres` |
| **Filesystem** | File operations (sandboxed) | `mcp-filesystem` |
| **Brave Search** | Web search | `mcp-brave` |
---
## 8. Recommendations for Timmy Time Dashboard
### 8.1 Deployment Strategy for Hermes VPS (2GB RAM)
Given the hardware constraints, here's the recommended approach:
**Option A: External API Only (Recommended)**
```
┌─────────────────────────────────────────┐
│ Hermes VPS (2GB RAM) │
│ ┌─────────────────────────────────┐ │
│ │ OpenClaw Gateway │ │
│ │ (npm global install) │ │
│ └─────────────┬───────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ OpenRouter API (Free Tier) │ │
│ │ google/gemma-3-4b-it:free │ │
│ └─────────────────────────────────┘ │
│ │
│ NO OLLAMA - insufficient RAM │
└─────────────────────────────────────────┘
```
**Option B: Hybrid with External Ollama**
```
┌──────────────────────┐ ┌──────────────────────────┐
│ Hermes VPS (2GB) │ │ Separate Ollama Host │
│ ┌────────────────┐ │ │ ┌────────────────────┐ │
│ │ OpenClaw │ │◄────►│ │ Ollama Server │ │
│ │ (external API) │ │ │ │ (8GB+ RAM required)│ │
│ └────────────────┘ │ │ └────────────────────┘ │
└──────────────────────┘ └──────────────────────────┘
```
### 8.2 Configuration Summary
```json
{
"gateway": {
"bind": "127.0.0.1",
"port": 18789,
"auth": {
"mode": "token",
"token": "GENERATE_WITH_OPENSSL_RAND"
}
},
"models": {
"providers": {
"openrouter": {
"apiKey": "${OPENROUTER_API_KEY}",
"models": [
{
"id": "google/gemma-3-4b-it:free",
"contextWindow": 131072,
"maxTokens": 4096
}
]
}
}
},
"agents": {
"defaults": {
"model": {
"primary": "openrouter/google/gemma-3-4b-it:free"
},
"maxIterations": 10,
"timeout": 90,
"maxConcurrent": 2
}
},
"channels": {
"telegram": {
"enabled": true,
"dmPolicy": "pairing"
}
}
}
```
### 8.3 Migration Path (Future)
When upgrading to a larger VPS (4-8GB RAM):
1. **Phase 1:** Enable Ollama with Llama 3.2 3B as fallback
2. **Phase 2:** Add browser automation skills (requires 4GB+ RAM)
3. **Phase 3:** Enable multi-agent routing with specialized agents
4. **Phase 4:** Add MCP server integration for external tools
---
## 9. References
1. OpenClaw Official Documentation: https://docs.openclaw.ai
2. Ollama Integration Guide: https://docs.ollama.com/integrations/openclaw
3. OpenRouter Documentation: https://openrouter.ai/docs
4. MCP Specification: https://modelcontextprotocol.io
5. OpenClaw Community Discord: https://discord.gg/openclaw
6. GitHub Repository: https://github.com/openclaw/openclaw
---
## 10. Appendix: Quick Command Reference
```bash
# Installation
curl -fsSL https://openclaw.ai/install.sh | bash
# Configuration
openclaw onboard # Interactive setup
openclaw configure # Edit config
openclaw config set <key> <value> # Set specific value
# Gateway management
openclaw gateway # Start gateway
openclaw gateway --verbose # Start with logs
openclaw gateway status # Check status
openclaw gateway restart # Restart gateway
openclaw gateway stop # Stop gateway
# Model management
openclaw models list # List available models
openclaw models set <model> # Set default model
openclaw models status # Check model status
# Diagnostics
openclaw doctor # System health check
openclaw doctor --repair # Auto-fix issues
openclaw security audit # Security check
# Dashboard
openclaw dashboard # Open web UI
```
---
*End of Research Report*

View File

@@ -330,6 +330,13 @@ class Settings(BaseSettings):
autoresearch_max_iterations: int = 100
autoresearch_metric: str = "val_bpb" # metric to optimise (lower = better)
# ── Weekly Narrative Summary ───────────────────────────────────────
# Generates a human-readable weekly summary of development activity.
# Disabling this will stop the weekly narrative generation.
weekly_narrative_enabled: bool = True
weekly_narrative_lookback_days: int = 7
weekly_narrative_output_dir: str = ".loop"
# ── Local Hands (Shell + Git) ──────────────────────────────────────
# Enable local shell/git execution hands.
hands_shell_enabled: bool = True

View File

@@ -43,6 +43,7 @@ from dashboard.routes.memory import router as memory_router
from dashboard.routes.mobile import router as mobile_router
from dashboard.routes.models import api_router as models_api_router
from dashboard.routes.models import router as models_router
from dashboard.routes.quests import router as quests_router
from dashboard.routes.spark import router as spark_router
from dashboard.routes.system import router as system_router
from dashboard.routes.tasks import router as tasks_router
@@ -627,6 +628,7 @@ app.include_router(world_router)
app.include_router(matrix_router)
app.include_router(tower_router)
app.include_router(daily_run_router)
app.include_router(quests_router)
@app.websocket("/ws")

View File

@@ -365,6 +365,15 @@ async def daily_run_metrics_api(lookback_days: int = 7):
status_code=503,
)
# Check for quest completions based on Daily Run metrics
quest_rewards = []
try:
from dashboard.routes.quests import check_daily_run_quests
quest_rewards = await check_daily_run_quests(agent_id="system")
except Exception as exc:
logger.debug("Quest checking failed: %s", exc)
return JSONResponse(
{
"status": "ok",
@@ -389,6 +398,7 @@ async def daily_run_metrics_api(lookback_days: int = 7):
"previous": metrics.total_touched_previous,
},
"generated_at": metrics.generated_at,
"quest_rewards": quest_rewards,
}
)

View File

@@ -275,3 +275,54 @@ async def component_status():
},
"timestamp": datetime.now(UTC).isoformat(),
}
@router.get("/health/snapshot")
async def health_snapshot():
"""Quick health snapshot before coding.
Returns a concise status summary including:
- CI pipeline status (pass/fail/unknown)
- Critical issues count (P0/P1)
- Test flakiness rate
- Token economy temperature
Fast execution (< 5 seconds) for pre-work checks.
Refs: #710
"""
import sys
from pathlib import Path
# Import the health snapshot module
snapshot_path = Path(settings.repo_root) / "timmy_automations" / "daily_run"
if str(snapshot_path) not in sys.path:
sys.path.insert(0, str(snapshot_path))
try:
from health_snapshot import generate_snapshot, get_token, load_config
config = load_config()
token = get_token(config)
# Run the health snapshot (in thread to avoid blocking)
snapshot = await asyncio.to_thread(generate_snapshot, config, token)
return snapshot.to_dict()
except Exception as exc:
logger.warning("Health snapshot failed: %s", exc)
# Return graceful fallback
return {
"timestamp": datetime.now(UTC).isoformat(),
"overall_status": "unknown",
"error": str(exc),
"ci": {"status": "unknown", "message": "Snapshot failed"},
"issues": {"count": 0, "p0_count": 0, "p1_count": 0, "issues": []},
"flakiness": {
"status": "unknown",
"recent_failures": 0,
"recent_cycles": 0,
"failure_rate": 0.0,
"message": "Snapshot failed",
},
"tokens": {"status": "unknown", "message": "Snapshot failed"},
}

View File

@@ -0,0 +1,377 @@
"""Quest system routes for agent token rewards.
Provides API endpoints for:
- Listing quests and their status
- Claiming quest rewards
- Getting quest leaderboard
- Quest progress tracking
"""
from __future__ import annotations
import logging
from typing import Any
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse, JSONResponse
from pydantic import BaseModel
from dashboard.templating import templates
from timmy.quest_system import (
QuestStatus,
auto_evaluate_all_quests,
claim_quest_reward,
evaluate_quest_progress,
get_active_quests,
get_agent_quests_status,
get_quest_definition,
get_quest_leaderboard,
load_quest_config,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/quests", tags=["quests"])
class ClaimQuestRequest(BaseModel):
"""Request to claim a quest reward."""
agent_id: str
quest_id: str
class EvaluateQuestRequest(BaseModel):
"""Request to manually evaluate quest progress."""
agent_id: str
quest_id: str
# ---------------------------------------------------------------------------
# API Endpoints
# ---------------------------------------------------------------------------
@router.get("/api/definitions")
async def get_quest_definitions_api() -> JSONResponse:
"""Get all quest definitions.
Returns:
JSON list of all quest definitions with their criteria.
"""
definitions = get_active_quests()
return JSONResponse(
{
"quests": [
{
"id": q.id,
"name": q.name,
"description": q.description,
"reward_tokens": q.reward_tokens,
"type": q.quest_type.value,
"repeatable": q.repeatable,
"cooldown_hours": q.cooldown_hours,
"criteria": q.criteria,
}
for q in definitions
]
}
)
@router.get("/api/status/{agent_id}")
async def get_agent_quest_status(agent_id: str) -> JSONResponse:
"""Get quest status for a specific agent.
Returns:
Complete quest status including progress, completion counts,
and tokens earned.
"""
status = get_agent_quests_status(agent_id)
return JSONResponse(status)
@router.post("/api/claim")
async def claim_quest_reward_api(request: ClaimQuestRequest) -> JSONResponse:
"""Claim a quest reward for an agent.
The quest must be completed but not yet claimed.
"""
reward = claim_quest_reward(request.quest_id, request.agent_id)
if not reward:
return JSONResponse(
{
"success": False,
"error": "Quest not completed, already claimed, or on cooldown",
},
status_code=400,
)
return JSONResponse(
{
"success": True,
"reward": reward,
}
)
@router.post("/api/evaluate")
async def evaluate_quest_api(request: EvaluateQuestRequest) -> JSONResponse:
"""Manually evaluate quest progress with provided context.
This is useful for testing or when the quest completion
needs to be triggered manually.
"""
quest = get_quest_definition(request.quest_id)
if not quest:
return JSONResponse(
{"success": False, "error": "Quest not found"},
status_code=404,
)
# Build evaluation context based on quest type
context = await _build_evaluation_context(quest)
progress = evaluate_quest_progress(request.quest_id, request.agent_id, context)
if not progress:
return JSONResponse(
{"success": False, "error": "Failed to evaluate quest"},
status_code=500,
)
# Auto-claim if completed
reward = None
if progress.status == QuestStatus.COMPLETED:
reward = claim_quest_reward(request.quest_id, request.agent_id)
return JSONResponse(
{
"success": True,
"progress": progress.to_dict(),
"reward": reward,
"completed": progress.status == QuestStatus.COMPLETED,
}
)
@router.get("/api/leaderboard")
async def get_leaderboard_api() -> JSONResponse:
"""Get the quest completion leaderboard.
Returns agents sorted by total tokens earned.
"""
leaderboard = get_quest_leaderboard()
return JSONResponse(
{
"leaderboard": leaderboard,
}
)
@router.post("/api/reload")
async def reload_quest_config_api() -> JSONResponse:
"""Reload quest configuration from quests.yaml.
Useful for applying quest changes without restarting.
"""
definitions, quest_settings = load_quest_config()
return JSONResponse(
{
"success": True,
"quests_loaded": len(definitions),
"settings": quest_settings,
}
)
# ---------------------------------------------------------------------------
# Dashboard UI Endpoints
# ---------------------------------------------------------------------------
@router.get("", response_class=HTMLResponse)
async def quests_dashboard(request: Request) -> HTMLResponse:
"""Main quests dashboard page."""
return templates.TemplateResponse(
request,
"quests.html",
{"agent_id": "current_user"},
)
@router.get("/panel/{agent_id}", response_class=HTMLResponse)
async def quests_panel(request: Request, agent_id: str) -> HTMLResponse:
"""Quest panel for HTMX partial updates."""
status = get_agent_quests_status(agent_id)
return templates.TemplateResponse(
request,
"partials/quests_panel.html",
{
"agent_id": agent_id,
"quests": status["quests"],
"total_tokens": status["total_tokens_earned"],
"completed_count": status["total_quests_completed"],
},
)
# ---------------------------------------------------------------------------
# Internal Functions
# ---------------------------------------------------------------------------
async def _build_evaluation_context(quest) -> dict[str, Any]:
"""Build evaluation context for a quest based on its type."""
context: dict[str, Any] = {}
if quest.quest_type.value == "issue_count":
# Fetch closed issues with relevant labels
context["closed_issues"] = await _fetch_closed_issues(
quest.criteria.get("issue_labels", [])
)
elif quest.quest_type.value == "issue_reduce":
# Fetch current and previous issue counts
labels = quest.criteria.get("issue_labels", [])
context["current_issue_count"] = await _fetch_open_issue_count(labels)
context["previous_issue_count"] = await _fetch_previous_issue_count(
labels, quest.criteria.get("lookback_days", 7)
)
elif quest.quest_type.value == "daily_run":
# Fetch Daily Run metrics
metrics = await _fetch_daily_run_metrics()
context["sessions_completed"] = metrics.get("sessions_completed", 0)
return context
async def _fetch_closed_issues(labels: list[str]) -> list[dict]:
"""Fetch closed issues matching the given labels."""
try:
from dashboard.routes.daily_run import GiteaClient, _load_config
config = _load_config()
token = _get_gitea_token(config)
client = GiteaClient(config, token)
if not client.is_available():
return []
# Build label filter
label_filter = ",".join(labels) if labels else ""
issues = client.get_paginated(
"issues",
{"state": "closed", "labels": label_filter, "limit": 100},
)
return issues
except Exception as exc:
logger.debug("Failed to fetch closed issues: %s", exc)
return []
async def _fetch_open_issue_count(labels: list[str]) -> int:
"""Fetch count of open issues with given labels."""
try:
from dashboard.routes.daily_run import GiteaClient, _load_config
config = _load_config()
token = _get_gitea_token(config)
client = GiteaClient(config, token)
if not client.is_available():
return 0
label_filter = ",".join(labels) if labels else ""
issues = client.get_paginated(
"issues",
{"state": "open", "labels": label_filter, "limit": 100},
)
return len(issues)
except Exception as exc:
logger.debug("Failed to fetch open issue count: %s", exc)
return 0
async def _fetch_previous_issue_count(labels: list[str], lookback_days: int) -> int:
"""Fetch previous issue count (simplified - uses current for now)."""
# This is a simplified implementation
# In production, you'd query historical data
return await _fetch_open_issue_count(labels)
async def _fetch_daily_run_metrics() -> dict[str, Any]:
"""Fetch Daily Run metrics."""
try:
from dashboard.routes.daily_run import _get_metrics
metrics = _get_metrics(lookback_days=7)
if metrics:
return {
"sessions_completed": metrics.sessions_completed,
"sessions_previous": metrics.sessions_previous,
}
except Exception as exc:
logger.debug("Failed to fetch Daily Run metrics: %s", exc)
return {"sessions_completed": 0, "sessions_previous": 0}
def _get_gitea_token(config: dict) -> str | None:
"""Get Gitea token from config."""
if "token" in config:
return config["token"]
from pathlib import Path
token_file = Path(config.get("token_file", "~/.hermes/gitea_token")).expanduser()
if token_file.exists():
return token_file.read_text().strip()
return None
# ---------------------------------------------------------------------------
# Daily Run Integration
# ---------------------------------------------------------------------------
async def check_daily_run_quests(agent_id: str = "system") -> list[dict]:
"""Check and award Daily Run related quests.
Called by the Daily Run system when metrics are updated.
Returns:
List of rewards awarded
"""
# Check if auto-detect is enabled
_, quest_settings = load_quest_config()
if not quest_settings.get("auto_detect_on_daily_run", True):
return []
# Build context from Daily Run metrics
metrics = await _fetch_daily_run_metrics()
context = {
"sessions_completed": metrics.get("sessions_completed", 0),
"sessions_previous": metrics.get("sessions_previous", 0),
}
# Add closed issues for issue_count quests
active_quests = get_active_quests()
for quest in active_quests:
if quest.quest_type.value == "issue_count":
labels = quest.criteria.get("issue_labels", [])
context["closed_issues"] = await _fetch_closed_issues(labels)
break # Only need to fetch once
# Evaluate all quests
rewards = auto_evaluate_all_quests(agent_id, context)
return rewards

View File

@@ -0,0 +1,80 @@
{% from "macros.html" import panel %}
<div class="quests-summary mb-4">
<div class="row">
<div class="col-md-4">
<div class="stat-card">
<div class="stat-value">{{ total_tokens }}</div>
<div class="stat-label">Tokens Earned</div>
</div>
</div>
<div class="col-md-4">
<div class="stat-card">
<div class="stat-value">{{ completed_count }}</div>
<div class="stat-label">Quests Completed</div>
</div>
</div>
<div class="col-md-4">
<div class="stat-card">
<div class="stat-value">{{ quests|selectattr('enabled', 'equalto', true)|list|length }}</div>
<div class="stat-label">Active Quests</div>
</div>
</div>
</div>
</div>
<div class="quests-list">
{% for quest in quests %}
{% if quest.enabled %}
<div class="quest-card quest-status-{{ quest.status }}">
<div class="quest-header">
<h5 class="quest-name">{{ quest.name }}</h5>
<span class="quest-reward">+{{ quest.reward_tokens }} ⚡</span>
</div>
<p class="quest-description">{{ quest.description }}</p>
<div class="quest-progress">
{% if quest.status == 'completed' %}
<div class="progress">
<div class="progress-bar bg-success" style="width: 100%"></div>
</div>
<span class="quest-status-badge completed">Completed</span>
{% elif quest.status == 'claimed' %}
<div class="progress">
<div class="progress-bar bg-success" style="width: 100%"></div>
</div>
<span class="quest-status-badge claimed">Reward Claimed</span>
{% elif quest.on_cooldown %}
<div class="progress">
<div class="progress-bar bg-secondary" style="width: 100%"></div>
</div>
<span class="quest-status-badge cooldown">
Cooldown: {{ quest.cooldown_hours_remaining }}h remaining
</span>
{% else %}
<div class="progress">
<div class="progress-bar" style="width: {{ (quest.current_value / quest.target_value * 100)|int }}%"></div>
</div>
<span class="quest-progress-text">{{ quest.current_value }} / {{ quest.target_value }}</span>
{% endif %}
</div>
<div class="quest-meta">
<span class="quest-type">{{ quest.type }}</span>
{% if quest.repeatable %}
<span class="quest-repeatable">↻ Repeatable</span>
{% endif %}
{% if quest.completion_count > 0 %}
<span class="quest-completions">Completed {{ quest.completion_count }} time{% if quest.completion_count != 1 %}s{% endif %}</span>
{% endif %}
</div>
</div>
{% endif %}
{% endfor %}
</div>
{% if not quests|selectattr('enabled', 'equalto', true)|list|length %}
<div class="alert alert-info">
No active quests available. Check back later or contact an administrator.
</div>
{% endif %}

View File

@@ -0,0 +1,50 @@
{% extends "base.html" %}
{% block title %}Quests — Mission Control{% endblock %}
{% block content %}
<div class="container-fluid">
<div class="row">
<div class="col-12">
<h1 class="mc-title">Token Quests</h1>
<p class="mc-subtitle">Complete quests to earn bonus tokens</p>
</div>
</div>
<div class="row mt-4">
<div class="col-md-8">
<div id="quests-panel" hx-get="/quests/panel/{{ agent_id }}" hx-trigger="load, every 30s">
<div class="mc-loading">Loading quests...</div>
</div>
</div>
<div class="col-md-4">
<div class="card mc-panel">
<div class="card-header">
<h5 class="mb-0">Leaderboard</h5>
</div>
<div class="card-body">
<div id="leaderboard" hx-get="/quests/api/leaderboard" hx-trigger="load, every 60s">
<div class="mc-loading">Loading leaderboard...</div>
</div>
</div>
</div>
<div class="card mc-panel mt-4">
<div class="card-header">
<h5 class="mb-0">About Quests</h5>
</div>
<div class="card-body">
<p class="mb-2">Quests are special objectives that reward tokens upon completion.</p>
<ul class="mc-list mb-0">
<li>Complete Daily Run sessions</li>
<li>Close flaky-test issues</li>
<li>Reduce P1 issue backlog</li>
<li>Improve documentation</li>
</ul>
</div>
</div>
</div>
</div>
</div>
{% endblock %}

View File

@@ -0,0 +1,84 @@
"""Thread-local SQLite connection pool.
Provides a ConnectionPool class that manages SQLite connections per thread,
with support for context managers and automatic cleanup.
"""
import sqlite3
import threading
from collections.abc import Generator
from contextlib import contextmanager
from pathlib import Path
class ConnectionPool:
"""Thread-local SQLite connection pool.
Each thread gets its own connection, which is reused for subsequent
requests from the same thread. Connections are automatically cleaned
up when close_connection() is called or the context manager exits.
"""
def __init__(self, db_path: Path | str) -> None:
"""Initialize the connection pool.
Args:
db_path: Path to the SQLite database file.
"""
self._db_path = Path(db_path)
self._local = threading.local()
def _ensure_db_exists(self) -> None:
"""Ensure the database directory exists."""
self._db_path.parent.mkdir(parents=True, exist_ok=True)
def get_connection(self) -> sqlite3.Connection:
"""Get a connection for the current thread.
Creates a new connection if one doesn't exist for this thread,
otherwise returns the existing connection.
Returns:
A sqlite3 Connection object.
"""
if not hasattr(self._local, "conn") or self._local.conn is None:
self._ensure_db_exists()
self._local.conn = sqlite3.connect(str(self._db_path), check_same_thread=False)
self._local.conn.row_factory = sqlite3.Row
return self._local.conn
def close_connection(self) -> None:
"""Close the connection for the current thread.
Cleans up the thread-local storage. Safe to call even if
no connection exists for this thread.
"""
if hasattr(self._local, "conn") and self._local.conn is not None:
self._local.conn.close()
self._local.conn = None
@contextmanager
def connection(self) -> Generator[sqlite3.Connection, None, None]:
"""Context manager for getting and automatically closing a connection.
Yields:
A sqlite3 Connection object.
Example:
with pool.connection() as conn:
cursor = conn.execute("SELECT 1")
result = cursor.fetchone()
"""
conn = self.get_connection()
try:
yield conn
finally:
self.close_connection()
def close_all(self) -> None:
"""Close all connections (useful for testing).
Note: This only closes the connection for the current thread.
In a multi-threaded environment, each thread must close its own.
"""
self.close_connection()

View File

@@ -489,5 +489,43 @@ def focus(
typer.echo("No active focus (broad mode).")
@app.command(name="healthcheck")
def healthcheck(
json_output: bool = typer.Option(False, "--json", "-j", help="Output as JSON"),
verbose: bool = typer.Option(
False, "--verbose", "-v", help="Show verbose output including issue details"
),
quiet: bool = typer.Option(False, "--quiet", "-q", help="Only show status line (no details)"),
):
"""Quick health snapshot before coding.
Shows CI status, critical issues (P0/P1), test flakiness, and token economy.
Fast execution (< 5 seconds) for pre-work checks.
Refs: #710
"""
import subprocess
import sys
from pathlib import Path
script_path = (
Path(__file__).resolve().parent.parent.parent
/ "timmy_automations"
/ "daily_run"
/ "health_snapshot.py"
)
cmd = [sys.executable, str(script_path)]
if json_output:
cmd.append("--json")
if verbose:
cmd.append("--verbose")
if quiet:
cmd.append("--quiet")
result = subprocess.run(cmd)
raise typer.Exit(result.returncode)
def main():
app()

581
src/timmy/quest_system.py Normal file
View File

@@ -0,0 +1,581 @@
"""Token Quest System for agent rewards.
Provides quest definitions, progress tracking, completion detection,
and token awards for agent accomplishments.
Quests are defined in config/quests.yaml and loaded at runtime.
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from enum import StrEnum
from pathlib import Path
from typing import Any
import yaml
from config import settings
logger = logging.getLogger(__name__)
# Path to quest configuration
QUEST_CONFIG_PATH = Path(settings.repo_root) / "config" / "quests.yaml"
class QuestType(StrEnum):
"""Types of quests supported by the system."""
ISSUE_COUNT = "issue_count"
ISSUE_REDUCE = "issue_reduce"
DOCS_UPDATE = "docs_update"
TEST_IMPROVE = "test_improve"
DAILY_RUN = "daily_run"
CUSTOM = "custom"
class QuestStatus(StrEnum):
"""Status of a quest for an agent."""
NOT_STARTED = "not_started"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
CLAIMED = "claimed"
EXPIRED = "expired"
@dataclass
class QuestDefinition:
"""Definition of a quest from configuration."""
id: str
name: str
description: str
reward_tokens: int
quest_type: QuestType
enabled: bool
repeatable: bool
cooldown_hours: int
criteria: dict[str, Any]
notification_message: str
@classmethod
def from_dict(cls, data: dict[str, Any]) -> QuestDefinition:
"""Create a QuestDefinition from a dictionary."""
return cls(
id=data["id"],
name=data.get("name", "Unnamed Quest"),
description=data.get("description", ""),
reward_tokens=data.get("reward_tokens", 0),
quest_type=QuestType(data.get("type", "custom")),
enabled=data.get("enabled", True),
repeatable=data.get("repeatable", False),
cooldown_hours=data.get("cooldown_hours", 0),
criteria=data.get("criteria", {}),
notification_message=data.get(
"notification_message", "Quest Complete! You earned {tokens} tokens."
),
)
@dataclass
class QuestProgress:
"""Progress of a quest for a specific agent."""
quest_id: str
agent_id: str
status: QuestStatus
current_value: int = 0
target_value: int = 0
started_at: str = ""
completed_at: str = ""
claimed_at: str = ""
completion_count: int = 0
last_completed_at: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"quest_id": self.quest_id,
"agent_id": self.agent_id,
"status": self.status.value,
"current_value": self.current_value,
"target_value": self.target_value,
"started_at": self.started_at,
"completed_at": self.completed_at,
"claimed_at": self.claimed_at,
"completion_count": self.completion_count,
"last_completed_at": self.last_completed_at,
"metadata": self.metadata,
}
# In-memory storage for quest progress
_quest_progress: dict[str, QuestProgress] = {}
_quest_definitions: dict[str, QuestDefinition] = {}
_quest_settings: dict[str, Any] = {}
def _get_progress_key(quest_id: str, agent_id: str) -> str:
"""Generate a unique key for quest progress."""
return f"{agent_id}:{quest_id}"
def load_quest_config() -> tuple[dict[str, QuestDefinition], dict[str, Any]]:
"""Load quest definitions from quests.yaml.
Returns:
Tuple of (quest definitions dict, settings dict)
"""
global _quest_definitions, _quest_settings
if not QUEST_CONFIG_PATH.exists():
logger.warning("Quest config not found at %s", QUEST_CONFIG_PATH)
return {}, {}
try:
raw = QUEST_CONFIG_PATH.read_text()
config = yaml.safe_load(raw)
if not isinstance(config, dict):
logger.warning("Invalid quest config format")
return {}, {}
# Load quest definitions
quests_data = config.get("quests", {})
definitions = {}
for quest_id, quest_data in quests_data.items():
quest_data["id"] = quest_id
try:
definition = QuestDefinition.from_dict(quest_data)
definitions[quest_id] = definition
except (ValueError, KeyError) as exc:
logger.warning("Failed to load quest %s: %s", quest_id, exc)
# Load settings
_quest_settings = config.get("settings", {})
_quest_definitions = definitions
logger.debug("Loaded %d quest definitions", len(definitions))
return definitions, _quest_settings
except (OSError, yaml.YAMLError) as exc:
logger.warning("Failed to load quest config: %s", exc)
return {}, {}
def get_quest_definitions() -> dict[str, QuestDefinition]:
"""Get all quest definitions, loading if necessary."""
global _quest_definitions
if not _quest_definitions:
_quest_definitions, _ = load_quest_config()
return _quest_definitions
def get_quest_definition(quest_id: str) -> QuestDefinition | None:
"""Get a specific quest definition by ID."""
definitions = get_quest_definitions()
return definitions.get(quest_id)
def get_active_quests() -> list[QuestDefinition]:
"""Get all enabled quest definitions."""
definitions = get_quest_definitions()
return [q for q in definitions.values() if q.enabled]
def get_quest_progress(quest_id: str, agent_id: str) -> QuestProgress | None:
"""Get progress for a specific quest and agent."""
key = _get_progress_key(quest_id, agent_id)
return _quest_progress.get(key)
def get_or_create_progress(quest_id: str, agent_id: str) -> QuestProgress:
"""Get existing progress or create new for quest/agent."""
key = _get_progress_key(quest_id, agent_id)
if key not in _quest_progress:
quest = get_quest_definition(quest_id)
if not quest:
raise ValueError(f"Quest {quest_id} not found")
target = _get_target_value(quest)
_quest_progress[key] = QuestProgress(
quest_id=quest_id,
agent_id=agent_id,
status=QuestStatus.NOT_STARTED,
current_value=0,
target_value=target,
started_at=datetime.now(UTC).isoformat(),
)
return _quest_progress[key]
def _get_target_value(quest: QuestDefinition) -> int:
"""Extract target value from quest criteria."""
criteria = quest.criteria
if quest.quest_type == QuestType.ISSUE_COUNT:
return criteria.get("target_count", 1)
elif quest.quest_type == QuestType.ISSUE_REDUCE:
return criteria.get("target_reduction", 1)
elif quest.quest_type == QuestType.DAILY_RUN:
return criteria.get("min_sessions", 1)
elif quest.quest_type == QuestType.DOCS_UPDATE:
return criteria.get("min_files_changed", 1)
elif quest.quest_type == QuestType.TEST_IMPROVE:
return criteria.get("min_new_tests", 1)
return 1
def update_quest_progress(
quest_id: str,
agent_id: str,
current_value: int,
metadata: dict[str, Any] | None = None,
) -> QuestProgress:
"""Update progress for a quest."""
progress = get_or_create_progress(quest_id, agent_id)
progress.current_value = current_value
if metadata:
progress.metadata.update(metadata)
# Check if quest is now complete
if progress.current_value >= progress.target_value:
if progress.status not in (QuestStatus.COMPLETED, QuestStatus.CLAIMED):
progress.status = QuestStatus.COMPLETED
progress.completed_at = datetime.now(UTC).isoformat()
logger.info("Quest %s completed for agent %s", quest_id, agent_id)
return progress
def _is_on_cooldown(progress: QuestProgress, quest: QuestDefinition) -> bool:
"""Check if a repeatable quest is on cooldown."""
if not quest.repeatable or not progress.last_completed_at:
return False
if quest.cooldown_hours <= 0:
return False
try:
last_completed = datetime.fromisoformat(progress.last_completed_at)
cooldown_end = last_completed + timedelta(hours=quest.cooldown_hours)
return datetime.now(UTC) < cooldown_end
except (ValueError, TypeError):
return False
def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
"""Claim the token reward for a completed quest.
Returns:
Reward info dict if successful, None if not claimable
"""
progress = get_quest_progress(quest_id, agent_id)
if not progress:
return None
quest = get_quest_definition(quest_id)
if not quest:
return None
# Check if quest is completed but not yet claimed
if progress.status != QuestStatus.COMPLETED:
return None
# Check cooldown for repeatable quests
if _is_on_cooldown(progress, quest):
return None
try:
# Award tokens via ledger
from lightning.ledger import create_invoice_entry, mark_settled
# Create a mock invoice for the reward
invoice_entry = create_invoice_entry(
payment_hash=f"quest_{quest_id}_{agent_id}_{int(time.time())}",
amount_sats=quest.reward_tokens,
memo=f"Quest reward: {quest.name}",
source="quest_reward",
agent_id=agent_id,
)
# Mark as settled immediately (quest rewards are auto-settled)
mark_settled(invoice_entry.payment_hash, preimage=f"quest_{quest_id}")
# Update progress
progress.status = QuestStatus.CLAIMED
progress.claimed_at = datetime.now(UTC).isoformat()
progress.completion_count += 1
progress.last_completed_at = progress.claimed_at
# Reset for repeatable quests
if quest.repeatable:
progress.status = QuestStatus.NOT_STARTED
progress.current_value = 0
progress.completed_at = ""
progress.claimed_at = ""
notification = quest.notification_message.format(tokens=quest.reward_tokens)
return {
"quest_id": quest_id,
"agent_id": agent_id,
"tokens_awarded": quest.reward_tokens,
"notification": notification,
"completion_count": progress.completion_count,
}
except Exception as exc:
logger.error("Failed to award quest reward: %s", exc)
return None
def check_issue_count_quest(
quest: QuestDefinition,
agent_id: str,
closed_issues: list[dict],
) -> QuestProgress | None:
"""Check progress for issue_count type quest."""
criteria = quest.criteria
target_labels = set(criteria.get("issue_labels", []))
# target_count is available in criteria but not used directly here
# Count matching issues
matching_count = 0
for issue in closed_issues:
issue_labels = {label.get("name", "") for label in issue.get("labels", [])}
if target_labels.issubset(issue_labels) or (not target_labels and issue_labels):
matching_count += 1
progress = update_quest_progress(
quest.id, agent_id, matching_count, {"matching_issues": matching_count}
)
return progress
def check_issue_reduce_quest(
quest: QuestDefinition,
agent_id: str,
previous_count: int,
current_count: int,
) -> QuestProgress | None:
"""Check progress for issue_reduce type quest."""
# target_reduction available in quest.criteria but we track actual reduction
reduction = max(0, previous_count - current_count)
progress = update_quest_progress(quest.id, agent_id, reduction, {"reduction": reduction})
return progress
def check_daily_run_quest(
quest: QuestDefinition,
agent_id: str,
sessions_completed: int,
) -> QuestProgress | None:
"""Check progress for daily_run type quest."""
# min_sessions available in quest.criteria but we track actual sessions
progress = update_quest_progress(
quest.id, agent_id, sessions_completed, {"sessions": sessions_completed}
)
return progress
def evaluate_quest_progress(
quest_id: str,
agent_id: str,
context: dict[str, Any],
) -> QuestProgress | None:
"""Evaluate quest progress based on quest type and context.
Args:
quest_id: The quest to evaluate
agent_id: The agent to evaluate for
context: Context data for evaluation (issues, metrics, etc.)
Returns:
Updated QuestProgress or None if evaluation failed
"""
quest = get_quest_definition(quest_id)
if not quest or not quest.enabled:
return None
progress = get_quest_progress(quest_id, agent_id)
# Check cooldown for repeatable quests
if progress and _is_on_cooldown(progress, quest):
return progress
try:
if quest.quest_type == QuestType.ISSUE_COUNT:
closed_issues = context.get("closed_issues", [])
return check_issue_count_quest(quest, agent_id, closed_issues)
elif quest.quest_type == QuestType.ISSUE_REDUCE:
prev_count = context.get("previous_issue_count", 0)
curr_count = context.get("current_issue_count", 0)
return check_issue_reduce_quest(quest, agent_id, prev_count, curr_count)
elif quest.quest_type == QuestType.DAILY_RUN:
sessions = context.get("sessions_completed", 0)
return check_daily_run_quest(quest, agent_id, sessions)
elif quest.quest_type == QuestType.CUSTOM:
# Custom quests require manual completion
return progress
else:
logger.debug("Quest type %s not yet implemented", quest.quest_type)
return progress
except Exception as exc:
logger.warning("Quest evaluation failed for %s: %s", quest_id, exc)
return progress
def auto_evaluate_all_quests(agent_id: str, context: dict[str, Any]) -> list[dict]:
"""Evaluate all active quests for an agent and award rewards.
Returns:
List of reward info for newly completed quests
"""
rewards = []
active_quests = get_active_quests()
for quest in active_quests:
progress = evaluate_quest_progress(quest.id, agent_id, context)
if progress and progress.status == QuestStatus.COMPLETED:
# Auto-claim the reward
reward = claim_quest_reward(quest.id, agent_id)
if reward:
rewards.append(reward)
return rewards
def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
"""Get complete quest status for an agent."""
definitions = get_quest_definitions()
quests_status = []
total_rewards = 0
completed_count = 0
for quest_id, quest in definitions.items():
progress = get_quest_progress(quest_id, agent_id)
if not progress:
progress = get_or_create_progress(quest_id, agent_id)
is_on_cooldown = _is_on_cooldown(progress, quest) if quest.repeatable else False
quest_info = {
"quest_id": quest_id,
"name": quest.name,
"description": quest.description,
"reward_tokens": quest.reward_tokens,
"type": quest.quest_type.value,
"enabled": quest.enabled,
"repeatable": quest.repeatable,
"status": progress.status.value,
"current_value": progress.current_value,
"target_value": progress.target_value,
"completion_count": progress.completion_count,
"on_cooldown": is_on_cooldown,
"cooldown_hours_remaining": 0,
}
if is_on_cooldown and progress.last_completed_at:
try:
last = datetime.fromisoformat(progress.last_completed_at)
cooldown_end = last + timedelta(hours=quest.cooldown_hours)
hours_remaining = (cooldown_end - datetime.now(UTC)).total_seconds() / 3600
quest_info["cooldown_hours_remaining"] = round(max(0, hours_remaining), 1)
except (ValueError, TypeError):
pass
quests_status.append(quest_info)
total_rewards += progress.completion_count * quest.reward_tokens
completed_count += progress.completion_count
return {
"agent_id": agent_id,
"quests": quests_status,
"total_tokens_earned": total_rewards,
"total_quests_completed": completed_count,
"active_quests_count": len([q for q in quests_status if q["enabled"]]),
}
def reset_quest_progress(quest_id: str | None = None, agent_id: str | None = None) -> int:
"""Reset quest progress. Useful for testing.
Args:
quest_id: Specific quest to reset, or None for all
agent_id: Specific agent to reset, or None for all
Returns:
Number of progress entries reset
"""
global _quest_progress
count = 0
keys_to_reset = []
for key, _progress in _quest_progress.items():
key_agent, key_quest = key.split(":", 1)
if (quest_id is None or key_quest == quest_id) and (
agent_id is None or key_agent == agent_id
):
keys_to_reset.append(key)
for key in keys_to_reset:
del _quest_progress[key]
count += 1
return count
def get_quest_leaderboard() -> list[dict[str, Any]]:
"""Get a leaderboard of agents by quest completion."""
agent_stats: dict[str, dict[str, Any]] = {}
for _key, progress in _quest_progress.items():
agent_id = progress.agent_id
if agent_id not in agent_stats:
agent_stats[agent_id] = {
"agent_id": agent_id,
"total_completions": 0,
"total_tokens": 0,
"quests_completed": set(),
}
quest = get_quest_definition(progress.quest_id)
if quest:
agent_stats[agent_id]["total_completions"] += progress.completion_count
agent_stats[agent_id]["total_tokens"] += progress.completion_count * quest.reward_tokens
if progress.completion_count > 0:
agent_stats[agent_id]["quests_completed"].add(quest.id)
leaderboard = []
for stats in agent_stats.values():
leaderboard.append(
{
"agent_id": stats["agent_id"],
"total_completions": stats["total_completions"],
"total_tokens": stats["total_tokens"],
"unique_quests_completed": len(stats["quests_completed"]),
}
)
# Sort by total tokens (descending)
leaderboard.sort(key=lambda x: x["total_tokens"], reverse=True)
return leaderboard
# Initialize on module load
load_quest_config()

View File

@@ -13,11 +13,121 @@
<div class="mood" id="mood-text">focused</div>
</div>
<div id="connection-dot"></div>
<button id="info-btn" class="info-button" aria-label="About The Matrix" title="About The Matrix">
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round">
<circle cx="12" cy="12" r="10"></circle>
<line x1="12" y1="16" x2="12" y2="12"></line>
<line x1="12" y1="8" x2="12.01" y2="8"></line>
</svg>
</button>
<button id="fund-btn" class="fund-button" aria-label="Fund Session" title="Fund Session">
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round">
<path d="M12 2v20M2 12h20"></path>
</svg>
</button>
<div id="speech-area">
<div class="bubble" id="speech-bubble"></div>
</div>
</div>
<!-- Fund Session Modal -->
<div id="fund-modal" class="fund-modal">
<div class="fund-modal-content">
<button id="fund-close" class="fund-close" aria-label="Close">
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round">
<line x1="18" y1="6" x2="6" y2="18"></line>
<line x1="6" y1="6" x2="18" y2="18"></line>
</svg>
</button>
<h2>Fund Session</h2>
<section class="fund-info">
<h3>⚡ What are Sats?</h3>
<p><strong>Sats</strong> (satoshis) are the smallest unit of Bitcoin—like cents to a dollar. There are 100 million sats in 1 bitcoin. They enable tiny payments perfect for AI interactions.</p>
</section>
<section class="fund-info">
<h3>🛠️ Why Fund Your Session?</h3>
<p>Your sats power the Workshop AI agents. When you fund a session:</p>
<ul>
<li>Timmy and the agent swarm can process your requests</li>
<li>You get priority access to compute resources</li>
<li>Agents are compensated for their work</li>
</ul>
</section>
<section class="fund-info">
<h3>💰 Approximate Costs</h3>
<div class="cost-table">
<div class="cost-row">
<span>Simple chat message</span>
<span class="cost-value">~10-50 sats</span>
</div>
<div class="cost-row">
<span>Code generation task</span>
<span class="cost-value">~100-500 sats</span>
</div>
<div class="cost-row">
<span>Complex multi-agent job</span>
<span class="cost-value">~1,000-5,000 sats</span>
</div>
</div>
<p class="cost-note">Costs vary based on model and complexity. Unused sats remain in your balance.</p>
</section>
<div class="fund-actions">
<div class="fund-input-group">
<label for="fund-amount">Amount (sats)</label>
<input type="number" id="fund-amount" class="fund-input" placeholder="1000" min="100" step="100">
</div>
<button id="fund-submit" class="fund-submit-btn">Fund Session</button>
</div>
<div class="fund-footer">
<span>⚡ Lightning Network · No subscriptions · Pay as you go</span>
</div>
</div>
<div id="fund-backdrop" class="fund-backdrop"></div>
</div>
<!-- About Panel -->
<div id="about-panel" class="about-panel">
<div class="about-panel-content">
<button id="about-close" class="about-close" aria-label="Close">
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round">
<line x1="18" y1="6" x2="6" y2="18"></line>
<line x1="6" y1="6" x2="18" y2="18"></line>
</svg>
</button>
<h2>Welcome to The Matrix</h2>
<section>
<h3>🌌 The Matrix</h3>
<p>The Matrix is a 3D visualization of Timmy's AI agent workspace. Enter the workshop to see Timmy at work—pondering the arcane arts of code, managing tasks, and orchestrating autonomous agents in real-time.</p>
</section>
<section>
<h3>🛠️ The Workshop</h3>
<p>The Workshop is where you interact directly with Timmy:</p>
<ul>
<li><strong>Submit Jobs</strong> — Create tasks, delegate work, and track progress</li>
<li><strong>Chat with Agents</strong> — Converse with Timmy and his swarm of specialized agents</li>
<li><strong>Fund Sessions</strong> — Power your work with satoshis via Lightning Network</li>
</ul>
</section>
<section>
<h3>⚡ Lightning & Sats</h3>
<p>The Matrix runs on Bitcoin. Sessions are funded with satoshis (sats) over the Lightning Network—enabling fast, cheap micropayments that keep Timmy energized and working for you. No subscriptions, no limits—pay as you go.</p>
</section>
<div class="about-footer">
<span>Sovereign AI · Soul on Bitcoin</span>
</div>
</div>
<div id="about-backdrop" class="about-backdrop"></div>
</div>
<script type="importmap">
{
"imports": {
@@ -74,6 +184,81 @@
});
stateReader.connect();
// --- Fund Session Modal ---
const fundBtn = document.getElementById("fund-btn");
const fundModal = document.getElementById("fund-modal");
const fundClose = document.getElementById("fund-close");
const fundBackdrop = document.getElementById("fund-backdrop");
const fundSubmit = document.getElementById("fund-submit");
const fundAmount = document.getElementById("fund-amount");
function openFundModal() {
fundModal.classList.add("open");
document.body.style.overflow = "hidden";
// Focus the input when opening
setTimeout(() => fundAmount.focus(), 100);
}
function closeFundModal() {
fundModal.classList.remove("open");
document.body.style.overflow = "";
}
function handleFundSubmit() {
const amount = parseInt(fundAmount.value, 10);
if (!amount || amount < 100) {
alert("Please enter a valid amount (minimum 100 sats)");
return;
}
// TODO: Integrate with Lightning payment API
console.log("Funding session with", amount, "sats");
alert("Lightning payment integration coming soon! Amount: " + amount + " sats");
closeFundModal();
}
fundBtn.addEventListener("click", openFundModal);
fundClose.addEventListener("click", closeFundModal);
fundBackdrop.addEventListener("click", closeFundModal);
fundSubmit.addEventListener("click", handleFundSubmit);
// Allow Enter key to submit
fundAmount.addEventListener("keypress", (e) => {
if (e.key === "Enter") {
handleFundSubmit();
}
});
// --- About Panel ---
const infoBtn = document.getElementById("info-btn");
const aboutPanel = document.getElementById("about-panel");
const aboutClose = document.getElementById("about-close");
const aboutBackdrop = document.getElementById("about-backdrop");
function openAboutPanel() {
aboutPanel.classList.add("open");
document.body.style.overflow = "hidden";
}
function closeAboutPanel() {
aboutPanel.classList.remove("open");
document.body.style.overflow = "";
}
infoBtn.addEventListener("click", openAboutPanel);
aboutClose.addEventListener("click", closeAboutPanel);
aboutBackdrop.addEventListener("click", closeAboutPanel);
// Close on Escape key
document.addEventListener("keydown", (e) => {
if (e.key === "Escape") {
if (fundModal.classList.contains("open")) {
closeFundModal();
} else if (aboutPanel.classList.contains("open")) {
closeAboutPanel();
}
}
});
// --- Resize ---
window.addEventListener("resize", () => {
camera.aspect = window.innerWidth / window.innerHeight;

View File

@@ -87,3 +87,490 @@ canvas {
#connection-dot.connected {
background: #00b450;
}
/* Info button */
.info-button {
position: absolute;
top: 14px;
right: 70px;
width: 28px;
height: 28px;
padding: 0;
background: rgba(10, 10, 20, 0.7);
border: 1px solid rgba(218, 165, 32, 0.4);
border-radius: 50%;
color: #daa520;
cursor: pointer;
pointer-events: auto;
transition: all 0.2s ease;
display: flex;
align-items: center;
justify-content: center;
}
.info-button:hover {
background: rgba(218, 165, 32, 0.15);
border-color: rgba(218, 165, 32, 0.7);
transform: scale(1.05);
}
.info-button svg {
width: 16px;
height: 16px;
}
/* Fund Session button */
.fund-button {
position: absolute;
top: 14px;
right: 36px;
width: 28px;
height: 28px;
padding: 0;
background: rgba(10, 10, 20, 0.7);
border: 1px solid rgba(0, 180, 80, 0.4);
border-radius: 50%;
color: #00b450;
cursor: pointer;
pointer-events: auto;
transition: all 0.2s ease;
display: flex;
align-items: center;
justify-content: center;
}
.fund-button:hover {
background: rgba(0, 180, 80, 0.15);
border-color: rgba(0, 180, 80, 0.7);
transform: scale(1.05);
}
.fund-button svg {
width: 16px;
height: 16px;
}
/* Fund Session Modal */
.fund-modal {
position: fixed;
top: 0;
left: 0;
width: 100%;
height: 100%;
z-index: 100;
pointer-events: none;
visibility: hidden;
opacity: 0;
transition: opacity 0.3s ease, visibility 0.3s ease;
}
.fund-modal.open {
pointer-events: auto;
visibility: visible;
opacity: 1;
}
.fund-modal-content {
position: absolute;
top: 0;
right: 0;
width: 420px;
max-width: 90%;
height: 100%;
background: rgba(10, 10, 20, 0.97);
border-left: 1px solid rgba(0, 180, 80, 0.3);
padding: 60px 24px 24px 24px;
overflow-y: auto;
transform: translateX(100%);
transition: transform 0.3s ease;
box-shadow: -4px 0 20px rgba(0, 0, 0, 0.5);
}
.fund-modal.open .fund-modal-content {
transform: translateX(0);
}
.fund-close {
position: absolute;
top: 16px;
right: 16px;
width: 32px;
height: 32px;
padding: 0;
background: transparent;
border: 1px solid rgba(160, 160, 160, 0.3);
border-radius: 50%;
color: #aaa;
cursor: pointer;
transition: all 0.2s ease;
display: flex;
align-items: center;
justify-content: center;
}
.fund-close:hover {
background: rgba(255, 255, 255, 0.1);
border-color: rgba(0, 180, 80, 0.5);
color: #00b450;
}
.fund-close svg {
width: 18px;
height: 18px;
}
.fund-modal-content h2 {
font-size: 20px;
color: #00b450;
margin-bottom: 24px;
font-weight: 600;
}
.fund-info {
margin-bottom: 24px;
}
.fund-info h3 {
font-size: 14px;
color: #e0e0e0;
margin-bottom: 10px;
font-weight: 600;
}
.fund-info p {
font-size: 13px;
line-height: 1.6;
color: #aaa;
margin-bottom: 10px;
}
.fund-info ul {
list-style: none;
padding: 0;
margin: 0;
}
.fund-info li {
font-size: 13px;
line-height: 1.6;
color: #aaa;
margin-bottom: 8px;
padding-left: 16px;
position: relative;
}
.fund-info li::before {
content: "•";
position: absolute;
left: 0;
color: #00b450;
}
.fund-info li strong {
color: #ccc;
}
/* Cost table */
.cost-table {
background: rgba(0, 0, 0, 0.3);
border: 1px solid rgba(0, 180, 80, 0.2);
border-radius: 8px;
padding: 12px;
margin-bottom: 10px;
}
.cost-row {
display: flex;
justify-content: space-between;
padding: 8px 0;
border-bottom: 1px solid rgba(255, 255, 255, 0.1);
font-size: 13px;
}
.cost-row:last-child {
border-bottom: none;
}
.cost-row span:first-child {
color: #aaa;
}
.cost-value {
color: #00b450;
font-weight: 600;
}
.cost-note {
font-size: 12px;
color: #666;
font-style: italic;
margin-top: 8px;
}
/* Fund actions */
.fund-actions {
margin-top: 32px;
padding: 20px;
background: rgba(0, 0, 0, 0.3);
border: 1px solid rgba(0, 180, 80, 0.2);
border-radius: 8px;
}
.fund-input-group {
margin-bottom: 16px;
}
.fund-input-group label {
display: block;
font-size: 13px;
color: #ccc;
margin-bottom: 6px;
}
.fund-input {
width: 100%;
padding: 10px 12px;
background: rgba(255, 255, 255, 0.05);
border: 1px solid rgba(0, 180, 80, 0.3);
border-radius: 6px;
color: #e0e0e0;
font-family: "Courier New", monospace;
font-size: 16px;
transition: all 0.2s ease;
}
.fund-input:focus {
outline: none;
border-color: rgba(0, 180, 80, 0.6);
background: rgba(255, 255, 255, 0.08);
}
.fund-input::placeholder {
color: #666;
}
.fund-submit-btn {
width: 100%;
padding: 12px;
background: linear-gradient(135deg, rgba(0, 180, 80, 0.8), rgba(0, 140, 60, 0.9));
border: none;
border-radius: 6px;
color: #fff;
font-family: "Courier New", monospace;
font-size: 14px;
font-weight: 600;
cursor: pointer;
transition: all 0.2s ease;
text-transform: uppercase;
letter-spacing: 0.5px;
}
.fund-submit-btn:hover {
background: linear-gradient(135deg, rgba(0, 200, 90, 0.9), rgba(0, 160, 70, 1));
transform: translateY(-1px);
box-shadow: 0 4px 12px rgba(0, 180, 80, 0.3);
}
.fund-submit-btn:active {
transform: translateY(0);
}
.fund-footer {
margin-top: 24px;
padding-top: 16px;
border-top: 1px solid rgba(160, 160, 160, 0.2);
font-size: 12px;
color: #666;
text-align: center;
}
.fund-backdrop {
position: absolute;
top: 0;
left: 0;
width: 100%;
height: 100%;
background: rgba(0, 0, 0, 0.5);
opacity: 0;
transition: opacity 0.3s ease;
}
.fund-modal.open .fund-backdrop {
opacity: 1;
}
/* About Panel */
.about-panel {
position: fixed;
top: 0;
right: 0;
width: 100%;
height: 100%;
z-index: 100;
pointer-events: none;
visibility: hidden;
opacity: 0;
transition: opacity 0.3s ease, visibility 0.3s ease;
}
.about-panel.open {
pointer-events: auto;
visibility: visible;
opacity: 1;
}
.about-panel-content {
position: absolute;
top: 0;
right: 0;
width: 380px;
max-width: 90%;
height: 100%;
background: rgba(10, 10, 20, 0.97);
border-left: 1px solid rgba(218, 165, 32, 0.3);
padding: 60px 24px 24px 24px;
overflow-y: auto;
transform: translateX(100%);
transition: transform 0.3s ease;
box-shadow: -4px 0 20px rgba(0, 0, 0, 0.5);
}
.about-panel.open .about-panel-content {
transform: translateX(0);
}
.about-close {
position: absolute;
top: 16px;
right: 16px;
width: 32px;
height: 32px;
padding: 0;
background: transparent;
border: 1px solid rgba(160, 160, 160, 0.3);
border-radius: 50%;
color: #aaa;
cursor: pointer;
transition: all 0.2s ease;
display: flex;
align-items: center;
justify-content: center;
}
.about-close:hover {
background: rgba(255, 255, 255, 0.1);
border-color: rgba(218, 165, 32, 0.5);
color: #daa520;
}
.about-close svg {
width: 18px;
height: 18px;
}
.about-panel-content h2 {
font-size: 20px;
color: #daa520;
margin-bottom: 24px;
font-weight: 600;
}
.about-panel-content section {
margin-bottom: 24px;
}
.about-panel-content h3 {
font-size: 14px;
color: #e0e0e0;
margin-bottom: 10px;
font-weight: 600;
}
.about-panel-content p {
font-size: 13px;
line-height: 1.6;
color: #aaa;
margin-bottom: 10px;
}
.about-panel-content ul {
list-style: none;
padding: 0;
margin: 0;
}
.about-panel-content li {
font-size: 13px;
line-height: 1.6;
color: #aaa;
margin-bottom: 8px;
padding-left: 16px;
position: relative;
}
.about-panel-content li::before {
content: "•";
position: absolute;
left: 0;
color: #daa520;
}
.about-panel-content li strong {
color: #ccc;
}
.about-footer {
margin-top: 32px;
padding-top: 16px;
border-top: 1px solid rgba(160, 160, 160, 0.2);
font-size: 12px;
color: #666;
text-align: center;
}
.about-backdrop {
position: absolute;
top: 0;
left: 0;
width: 100%;
height: 100%;
background: rgba(0, 0, 0, 0.5);
opacity: 0;
transition: opacity 0.3s ease;
}
.about-panel.open .about-backdrop {
opacity: 1;
}
/* Mobile adjustments */
@media (max-width: 480px) {
.about-panel-content,
.fund-modal-content {
width: 100%;
max-width: 100%;
padding: 56px 20px 20px 20px;
}
.info-button {
right: 66px;
width: 26px;
height: 26px;
}
.info-button svg {
width: 14px;
height: 14px;
}
.fund-button {
right: 32px;
width: 26px;
height: 26px;
}
.fund-button svg {
width: 14px;
height: 14px;
}
}

View File

@@ -0,0 +1,288 @@
"""Tests for infrastructure.db_pool module."""
import sqlite3
import threading
import time
from pathlib import Path
import pytest
from infrastructure.db_pool import ConnectionPool
class TestConnectionPoolInit:
"""Test ConnectionPool initialization."""
def test_init_with_string_path(self, tmp_path):
"""Pool can be initialized with a string path."""
db_path = str(tmp_path / "test.db")
pool = ConnectionPool(db_path)
assert pool._db_path == Path(db_path)
def test_init_with_path_object(self, tmp_path):
"""Pool can be initialized with a Path object."""
db_path = tmp_path / "test.db"
pool = ConnectionPool(db_path)
assert pool._db_path == db_path
def test_init_creates_thread_local(self, tmp_path):
"""Pool initializes thread-local storage."""
pool = ConnectionPool(tmp_path / "test.db")
assert hasattr(pool, "_local")
assert isinstance(pool._local, threading.local)
class TestGetConnection:
"""Test get_connection() method."""
def test_get_connection_returns_valid_sqlite3_connection(self, tmp_path):
"""get_connection() returns a valid sqlite3 connection."""
pool = ConnectionPool(tmp_path / "test.db")
conn = pool.get_connection()
assert isinstance(conn, sqlite3.Connection)
# Verify it's a working connection
cursor = conn.execute("SELECT 1")
assert cursor.fetchone()[0] == 1
def test_get_connection_creates_db_file(self, tmp_path):
"""get_connection() creates the database file if it doesn't exist."""
db_path = tmp_path / "subdir" / "test.db"
assert not db_path.exists()
pool = ConnectionPool(db_path)
pool.get_connection()
assert db_path.exists()
def test_get_connection_sets_row_factory(self, tmp_path):
"""get_connection() sets row_factory to sqlite3.Row."""
pool = ConnectionPool(tmp_path / "test.db")
conn = pool.get_connection()
assert conn.row_factory is sqlite3.Row
def test_multiple_calls_same_thread_reuse_connection(self, tmp_path):
"""Multiple calls from same thread reuse the same connection."""
pool = ConnectionPool(tmp_path / "test.db")
conn1 = pool.get_connection()
conn2 = pool.get_connection()
assert conn1 is conn2
def test_different_threads_get_different_connections(self, tmp_path):
"""Different threads get different connections."""
pool = ConnectionPool(tmp_path / "test.db")
connections = []
def get_conn():
connections.append(pool.get_connection())
t1 = threading.Thread(target=get_conn)
t2 = threading.Thread(target=get_conn)
t1.start()
t2.start()
t1.join()
t2.join()
assert len(connections) == 2
assert connections[0] is not connections[1]
class TestCloseConnection:
"""Test close_connection() method."""
def test_close_connection_closes_sqlite_connection(self, tmp_path):
"""close_connection() closes the underlying sqlite connection."""
pool = ConnectionPool(tmp_path / "test.db")
conn = pool.get_connection()
pool.close_connection()
# Connection should be closed
with pytest.raises(sqlite3.ProgrammingError):
conn.execute("SELECT 1")
def test_close_connection_cleans_up_thread_local(self, tmp_path):
"""close_connection() cleans up thread-local storage."""
pool = ConnectionPool(tmp_path / "test.db")
pool.get_connection()
assert hasattr(pool._local, "conn")
assert pool._local.conn is not None
pool.close_connection()
# Should either not have the attr or it should be None
assert not hasattr(pool._local, "conn") or pool._local.conn is None
def test_close_connection_without_getting_connection_is_safe(self, tmp_path):
"""close_connection() is safe to call even without getting a connection first."""
pool = ConnectionPool(tmp_path / "test.db")
# Should not raise
pool.close_connection()
def test_close_connection_multiple_calls_is_safe(self, tmp_path):
"""close_connection() can be called multiple times safely."""
pool = ConnectionPool(tmp_path / "test.db")
pool.get_connection()
pool.close_connection()
# Should not raise
pool.close_connection()
class TestContextManager:
"""Test the connection() context manager."""
def test_connection_yields_valid_connection(self, tmp_path):
"""connection() context manager yields a valid sqlite3 connection."""
pool = ConnectionPool(tmp_path / "test.db")
with pool.connection() as conn:
assert isinstance(conn, sqlite3.Connection)
cursor = conn.execute("SELECT 42")
assert cursor.fetchone()[0] == 42
def test_connection_closes_on_exit(self, tmp_path):
"""connection() context manager closes connection on exit."""
pool = ConnectionPool(tmp_path / "test.db")
with pool.connection() as conn:
pass
# Connection should be closed after context exit
with pytest.raises(sqlite3.ProgrammingError):
conn.execute("SELECT 1")
def test_connection_closes_on_exception(self, tmp_path):
"""connection() context manager closes connection even on exception."""
pool = ConnectionPool(tmp_path / "test.db")
conn_ref = None
try:
with pool.connection() as conn:
conn_ref = conn
raise ValueError("Test exception")
except ValueError:
pass
# Connection should still be closed
with pytest.raises(sqlite3.ProgrammingError):
conn_ref.execute("SELECT 1")
def test_connection_context_manager_is_reusable(self, tmp_path):
"""connection() context manager can be used multiple times."""
pool = ConnectionPool(tmp_path / "test.db")
with pool.connection() as conn1:
result1 = conn1.execute("SELECT 1").fetchone()[0]
with pool.connection() as conn2:
result2 = conn2.execute("SELECT 2").fetchone()[0]
assert result1 == 1
assert result2 == 2
class TestThreadSafety:
"""Test thread-safety of the connection pool."""
def test_concurrent_access(self, tmp_path):
"""Multiple threads can use the pool concurrently."""
pool = ConnectionPool(tmp_path / "test.db")
results = []
errors = []
def worker(worker_id):
try:
with pool.connection() as conn:
conn.execute("CREATE TABLE IF NOT EXISTS test (id INTEGER)")
conn.execute("INSERT INTO test VALUES (?)", (worker_id,))
conn.commit()
time.sleep(0.01) # Small delay to increase contention
results.append(worker_id)
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0, f"Errors occurred: {errors}"
assert len(results) == 5
def test_thread_isolation(self, tmp_path):
"""Each thread has isolated connections (verified by thread-local data)."""
pool = ConnectionPool(tmp_path / "test.db")
results = []
def worker(worker_id):
# Get connection and write worker-specific data
conn = pool.get_connection()
conn.execute("CREATE TABLE IF NOT EXISTS isolation_test (thread_id INTEGER)")
conn.execute("DELETE FROM isolation_test") # Clear previous data
conn.execute("INSERT INTO isolation_test VALUES (?)", (worker_id,))
conn.commit()
# Read back the data
result = conn.execute("SELECT thread_id FROM isolation_test").fetchone()[0]
results.append((worker_id, result))
pool.close_connection()
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
# Each thread should have written and read its own ID
assert len(results) == 3
for worker_id, read_id in results:
assert worker_id == read_id, f"Thread {worker_id} read {read_id} instead"
class TestCloseAll:
"""Test close_all() method."""
def test_close_all_closes_current_thread_connection(self, tmp_path):
"""close_all() closes the connection for the current thread."""
pool = ConnectionPool(tmp_path / "test.db")
conn = pool.get_connection()
pool.close_all()
# Connection should be closed
with pytest.raises(sqlite3.ProgrammingError):
conn.execute("SELECT 1")
class TestIntegration:
"""Integration tests for real-world usage patterns."""
def test_basic_crud_operations(self, tmp_path):
"""Can perform basic CRUD operations through the pool."""
pool = ConnectionPool(tmp_path / "test.db")
with pool.connection() as conn:
# Create table
conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
# Insert
conn.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
conn.execute("INSERT INTO users (name) VALUES (?)", ("Bob",))
conn.commit()
# Query
cursor = conn.execute("SELECT * FROM users ORDER BY id")
rows = cursor.fetchall()
assert len(rows) == 2
assert rows[0]["name"] == "Alice"
assert rows[1]["name"] == "Bob"
def test_multiple_pools_different_databases(self, tmp_path):
"""Multiple pools can manage different databases independently."""
pool1 = ConnectionPool(tmp_path / "db1.db")
pool2 = ConnectionPool(tmp_path / "db2.db")
with pool1.connection() as conn1:
conn1.execute("CREATE TABLE test (val INTEGER)")
conn1.execute("INSERT INTO test VALUES (1)")
conn1.commit()
with pool2.connection() as conn2:
conn2.execute("CREATE TABLE test (val INTEGER)")
conn2.execute("INSERT INTO test VALUES (2)")
conn2.commit()
# Verify isolation
with pool1.connection() as conn1:
result = conn1.execute("SELECT val FROM test").fetchone()[0]
assert result == 1
with pool2.connection() as conn2:
result = conn2.execute("SELECT val FROM test").fetchone()[0]
assert result == 2

View File

@@ -130,6 +130,13 @@ class TestAPIEndpoints:
r = client.get("/health/sovereignty")
assert r.status_code == 200
def test_health_snapshot(self, client):
r = client.get("/health/snapshot")
assert r.status_code == 200
data = r.json()
assert "overall_status" in data
assert data["overall_status"] in ["green", "yellow", "red", "unknown"]
def test_queue_status(self, client):
r = client.get("/api/queue/status")
assert r.status_code == 200
@@ -186,6 +193,7 @@ class TestNo500:
"/health",
"/health/status",
"/health/sovereignty",
"/health/snapshot",
"/health/components",
"/agents/default/panel",
"/agents/default/history",

View File

@@ -0,0 +1,280 @@
"""Unit tests for timmy_serve.voice_tts.
Mocks pyttsx3 so tests run without audio hardware.
"""
import threading
from unittest.mock import MagicMock, patch
class TestVoiceTTSInit:
"""Test VoiceTTS initialization with/without pyttsx3."""
def test_init_success(self):
"""When pyttsx3 is available, engine initializes with given rate/volume."""
mock_pyttsx3 = MagicMock()
mock_engine = MagicMock()
mock_pyttsx3.init.return_value = mock_engine
with patch.dict("sys.modules", {"pyttsx3": mock_pyttsx3}):
from timmy_serve.voice_tts import VoiceTTS
tts = VoiceTTS(rate=200, volume=0.8)
assert tts.available is True
assert tts._rate == 200
assert tts._volume == 0.8
mock_engine.setProperty.assert_any_call("rate", 200)
mock_engine.setProperty.assert_any_call("volume", 0.8)
def test_init_import_failure(self):
"""When pyttsx3 import fails, VoiceTTS degrades gracefully."""
with patch.dict("sys.modules", {"pyttsx3": None}):
# Force reimport by clearing cache
import sys
modules_to_clear = [k for k in sys.modules.keys() if "voice_tts" in k]
for mod in modules_to_clear:
del sys.modules[mod]
from timmy_serve.voice_tts import VoiceTTS
tts = VoiceTTS()
assert tts.available is False
assert tts._engine is None
class TestVoiceTTSSpeak:
"""Test VoiceTTS speak methods."""
def test_speak_skips_when_not_available(self):
"""speak() should skip gracefully when TTS is not available."""
from timmy_serve.voice_tts import VoiceTTS
tts = VoiceTTS.__new__(VoiceTTS)
tts._engine = None
tts._available = False
tts._lock = threading.Lock()
# Should not raise
tts.speak("hello world")
def test_speak_sync_skips_when_not_available(self):
"""speak_sync() should skip gracefully when TTS is not available."""
from timmy_serve.voice_tts import VoiceTTS
tts = VoiceTTS.__new__(VoiceTTS)
tts._engine = None
tts._available = False
tts._lock = threading.Lock()
# Should not raise
tts.speak_sync("hello world")
def test_speak_runs_in_background_thread(self):
"""speak() should run speech in a background thread."""
from timmy_serve.voice_tts import VoiceTTS
tts = VoiceTTS.__new__(VoiceTTS)
tts._engine = MagicMock()
tts._available = True
tts._lock = threading.Lock()
captured_threads = []
original_thread = threading.Thread
def capture_thread(*args, **kwargs):
t = original_thread(*args, **kwargs)
captured_threads.append(t)
return t
with patch.object(threading, "Thread", side_effect=capture_thread):
tts.speak("test message")
# Wait for threads to complete
for t in captured_threads:
t.join(timeout=1)
tts._engine.say.assert_called_with("test message")
tts._engine.runAndWait.assert_called_once()
class TestVoiceTTSProperties:
"""Test VoiceTTS property setters."""
def test_set_rate_updates_property(self):
"""set_rate() updates internal rate and engine property."""
from timmy_serve.voice_tts import VoiceTTS
tts = VoiceTTS.__new__(VoiceTTS)
tts._engine = MagicMock()
tts._rate = 175
tts.set_rate(220)
assert tts._rate == 220
tts._engine.setProperty.assert_called_with("rate", 220)
def test_set_rate_without_engine(self):
"""set_rate() updates internal rate even when engine is None."""
from timmy_serve.voice_tts import VoiceTTS
tts = VoiceTTS.__new__(VoiceTTS)
tts._engine = None
tts._rate = 175
tts.set_rate(220)
assert tts._rate == 220
def test_set_volume_clamped_to_max(self):
"""set_volume() clamps volume to maximum of 1.0."""
from timmy_serve.voice_tts import VoiceTTS
tts = VoiceTTS.__new__(VoiceTTS)
tts._engine = MagicMock()
tts._volume = 0.9
tts.set_volume(1.5)
assert tts._volume == 1.0
tts._engine.setProperty.assert_called_with("volume", 1.0)
def test_set_volume_clamped_to_min(self):
"""set_volume() clamps volume to minimum of 0.0."""
from timmy_serve.voice_tts import VoiceTTS
tts = VoiceTTS.__new__(VoiceTTS)
tts._engine = MagicMock()
tts._volume = 0.9
tts.set_volume(-0.5)
assert tts._volume == 0.0
tts._engine.setProperty.assert_called_with("volume", 0.0)
def test_set_volume_within_range(self):
"""set_volume() accepts values within 0.0-1.0 range."""
from timmy_serve.voice_tts import VoiceTTS
tts = VoiceTTS.__new__(VoiceTTS)
tts._engine = MagicMock()
tts._volume = 0.9
tts.set_volume(0.5)
assert tts._volume == 0.5
tts._engine.setProperty.assert_called_with("volume", 0.5)
class TestVoiceTTSGetVoices:
"""Test VoiceTTS get_voices() method."""
def test_get_voices_returns_empty_list_when_no_engine(self):
"""get_voices() returns empty list when engine is None."""
from timmy_serve.voice_tts import VoiceTTS
tts = VoiceTTS.__new__(VoiceTTS)
tts._engine = None
result = tts.get_voices()
assert result == []
def test_get_voices_returns_formatted_voice_list(self):
"""get_voices() returns list of voice dicts with id, name, languages."""
from timmy_serve.voice_tts import VoiceTTS
tts = VoiceTTS.__new__(VoiceTTS)
mock_voice1 = MagicMock()
mock_voice1.id = "com.apple.voice.compact.en-US.Samantha"
mock_voice1.name = "Samantha"
mock_voice1.languages = ["en-US"]
mock_voice2 = MagicMock()
mock_voice2.id = "com.apple.voice.compact.en-GB.Daniel"
mock_voice2.name = "Daniel"
mock_voice2.languages = ["en-GB"]
tts._engine = MagicMock()
tts._engine.getProperty.return_value = [mock_voice1, mock_voice2]
voices = tts.get_voices()
assert len(voices) == 2
assert voices[0]["id"] == "com.apple.voice.compact.en-US.Samantha"
assert voices[0]["name"] == "Samantha"
assert voices[0]["languages"] == ["en-US"]
assert voices[1]["id"] == "com.apple.voice.compact.en-GB.Daniel"
assert voices[1]["name"] == "Daniel"
assert voices[1]["languages"] == ["en-GB"]
def test_get_voices_handles_missing_languages_attr(self):
"""get_voices() handles voices without languages attribute."""
from timmy_serve.voice_tts import VoiceTTS
tts = VoiceTTS.__new__(VoiceTTS)
mock_voice = MagicMock()
mock_voice.id = "voice1"
mock_voice.name = "Default Voice"
# No languages attribute
del mock_voice.languages
tts._engine = MagicMock()
tts._engine.getProperty.return_value = [mock_voice]
voices = tts.get_voices()
assert len(voices) == 1
assert voices[0]["languages"] == []
def test_get_voices_handles_exception(self):
"""get_voices() returns empty list on exception."""
from timmy_serve.voice_tts import VoiceTTS
tts = VoiceTTS.__new__(VoiceTTS)
tts._engine = MagicMock()
tts._engine.getProperty.side_effect = RuntimeError("engine error")
result = tts.get_voices()
assert result == []
class TestVoiceTTSSetVoice:
"""Test VoiceTTS set_voice() method."""
def test_set_voice_updates_property(self):
"""set_voice() updates engine voice property when engine exists."""
from timmy_serve.voice_tts import VoiceTTS
tts = VoiceTTS.__new__(VoiceTTS)
tts._engine = MagicMock()
tts.set_voice("com.apple.voice.compact.en-US.Samantha")
tts._engine.setProperty.assert_called_with(
"voice", "com.apple.voice.compact.en-US.Samantha"
)
def test_set_voice_skips_when_no_engine(self):
"""set_voice() does nothing when engine is None."""
from timmy_serve.voice_tts import VoiceTTS
tts = VoiceTTS.__new__(VoiceTTS)
tts._engine = None
# Should not raise
tts.set_voice("some_voice_id")
class TestVoiceTTSAvailableProperty:
"""Test VoiceTTS available property."""
def test_available_returns_true_when_initialized(self):
"""available property returns True when engine initialized."""
from timmy_serve.voice_tts import VoiceTTS
tts = VoiceTTS.__new__(VoiceTTS)
tts._available = True
assert tts.available is True
def test_available_returns_false_when_not_initialized(self):
"""available property returns False when engine not initialized."""
from timmy_serve.voice_tts import VoiceTTS
tts = VoiceTTS.__new__(VoiceTTS)
tts._available = False
assert tts.available is False

View File

@@ -0,0 +1,401 @@
"""Tests for health_snapshot module."""
from __future__ import annotations
import json
import sys
from pathlib import Path
from unittest.mock import patch
# Add timmy_automations to path for imports
sys.path.insert(
0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations" / "daily_run")
)
from datetime import UTC
import health_snapshot as hs
class TestLoadConfig:
"""Test configuration loading."""
def test_loads_default_config(self):
"""Load default configuration."""
config = hs.load_config()
assert "gitea_api" in config
assert "repo_slug" in config
assert "critical_labels" in config
assert "flakiness_lookback_cycles" in config
def test_environment_overrides(self, monkeypatch):
"""Environment variables override defaults."""
monkeypatch.setenv("TIMMY_GITEA_API", "http://test:3000/api/v1")
monkeypatch.setenv("TIMMY_REPO_SLUG", "test/repo")
config = hs.load_config()
assert config["gitea_api"] == "http://test:3000/api/v1"
assert config["repo_slug"] == "test/repo"
class TestGetToken:
"""Test token retrieval."""
def test_returns_config_token(self):
"""Return token from config if present."""
config = {"token": "test-token-123"}
token = hs.get_token(config)
assert token == "test-token-123"
def test_reads_from_file(self, tmp_path, monkeypatch):
"""Read token from file if no config token."""
token_file = tmp_path / "gitea_token"
token_file.write_text("file-token-456")
config = {"token_file": str(token_file)}
token = hs.get_token(config)
assert token == "file-token-456"
def test_returns_none_when_no_token(self):
"""Return None when no token available."""
config = {"token_file": "/nonexistent/path"}
token = hs.get_token(config)
assert token is None
class TestCISignal:
"""Test CISignal dataclass."""
def test_default_details(self):
"""Details defaults to empty dict."""
signal = hs.CISignal(status="pass", message="CI passing")
assert signal.details == {}
def test_with_details(self):
"""Can include details."""
signal = hs.CISignal(status="pass", message="CI passing", details={"sha": "abc123"})
assert signal.details["sha"] == "abc123"
class TestIssueSignal:
"""Test IssueSignal dataclass."""
def test_default_issues_list(self):
"""Issues defaults to empty list."""
signal = hs.IssueSignal(count=0, p0_count=0, p1_count=0)
assert signal.issues == []
def test_with_issues(self):
"""Can include issues."""
issues = [{"number": 1, "title": "Test"}]
signal = hs.IssueSignal(count=1, p0_count=1, p1_count=0, issues=issues)
assert len(signal.issues) == 1
class TestFlakinessSignal:
"""Test FlakinessSignal dataclass."""
def test_calculated_fields(self):
"""All fields set correctly."""
signal = hs.FlakinessSignal(
status="healthy",
recent_failures=2,
recent_cycles=20,
failure_rate=0.1,
message="Low flakiness",
)
assert signal.status == "healthy"
assert signal.recent_failures == 2
assert signal.failure_rate == 0.1
class TestHealthSnapshot:
"""Test HealthSnapshot dataclass."""
def test_to_dict_structure(self):
"""to_dict produces expected structure."""
snapshot = hs.HealthSnapshot(
timestamp="2026-01-01T00:00:00+00:00",
overall_status="green",
ci=hs.CISignal(status="pass", message="CI passing"),
issues=hs.IssueSignal(count=0, p0_count=0, p1_count=0),
flakiness=hs.FlakinessSignal(
status="healthy",
recent_failures=0,
recent_cycles=10,
failure_rate=0.0,
message="All good",
),
tokens=hs.TokenEconomySignal(status="balanced", message="Balanced"),
)
data = snapshot.to_dict()
assert data["timestamp"] == "2026-01-01T00:00:00+00:00"
assert data["overall_status"] == "green"
assert "ci" in data
assert "issues" in data
assert "flakiness" in data
assert "tokens" in data
def test_to_dict_limits_issues(self):
"""to_dict limits issues to 5."""
many_issues = [{"number": i, "title": f"Issue {i}"} for i in range(10)]
snapshot = hs.HealthSnapshot(
timestamp="2026-01-01T00:00:00+00:00",
overall_status="green",
ci=hs.CISignal(status="pass", message="CI passing"),
issues=hs.IssueSignal(count=10, p0_count=5, p1_count=5, issues=many_issues),
flakiness=hs.FlakinessSignal(
status="healthy",
recent_failures=0,
recent_cycles=10,
failure_rate=0.0,
message="All good",
),
tokens=hs.TokenEconomySignal(status="balanced", message="Balanced"),
)
data = snapshot.to_dict()
assert len(data["issues"]["issues"]) == 5
class TestCalculateOverallStatus:
"""Test overall status calculation."""
def test_green_when_all_healthy(self):
"""Status is green when all signals healthy."""
ci = hs.CISignal(status="pass", message="CI passing")
issues = hs.IssueSignal(count=0, p0_count=0, p1_count=0)
flakiness = hs.FlakinessSignal(
status="healthy",
recent_failures=0,
recent_cycles=10,
failure_rate=0.0,
message="All good",
)
status = hs.calculate_overall_status(ci, issues, flakiness)
assert status == "green"
def test_red_when_ci_fails(self):
"""Status is red when CI fails."""
ci = hs.CISignal(status="fail", message="CI failed")
issues = hs.IssueSignal(count=0, p0_count=0, p1_count=0)
flakiness = hs.FlakinessSignal(
status="healthy",
recent_failures=0,
recent_cycles=10,
failure_rate=0.0,
message="All good",
)
status = hs.calculate_overall_status(ci, issues, flakiness)
assert status == "red"
def test_red_when_p0_issues(self):
"""Status is red when P0 issues exist."""
ci = hs.CISignal(status="pass", message="CI passing")
issues = hs.IssueSignal(count=1, p0_count=1, p1_count=0)
flakiness = hs.FlakinessSignal(
status="healthy",
recent_failures=0,
recent_cycles=10,
failure_rate=0.0,
message="All good",
)
status = hs.calculate_overall_status(ci, issues, flakiness)
assert status == "red"
def test_yellow_when_p1_issues(self):
"""Status is yellow when P1 issues exist."""
ci = hs.CISignal(status="pass", message="CI passing")
issues = hs.IssueSignal(count=1, p0_count=0, p1_count=1)
flakiness = hs.FlakinessSignal(
status="healthy",
recent_failures=0,
recent_cycles=10,
failure_rate=0.0,
message="All good",
)
status = hs.calculate_overall_status(ci, issues, flakiness)
assert status == "yellow"
def test_yellow_when_flakiness_degraded(self):
"""Status is yellow when flakiness degraded."""
ci = hs.CISignal(status="pass", message="CI passing")
issues = hs.IssueSignal(count=0, p0_count=0, p1_count=0)
flakiness = hs.FlakinessSignal(
status="degraded",
recent_failures=5,
recent_cycles=20,
failure_rate=0.25,
message="Moderate flakiness",
)
status = hs.calculate_overall_status(ci, issues, flakiness)
assert status == "yellow"
def test_red_when_flakiness_critical(self):
"""Status is red when flakiness critical."""
ci = hs.CISignal(status="pass", message="CI passing")
issues = hs.IssueSignal(count=0, p0_count=0, p1_count=0)
flakiness = hs.FlakinessSignal(
status="critical",
recent_failures=10,
recent_cycles=20,
failure_rate=0.5,
message="High flakiness",
)
status = hs.calculate_overall_status(ci, issues, flakiness)
assert status == "red"
class TestCheckFlakiness:
"""Test flakiness checking."""
def test_no_data_returns_unknown(self, tmp_path, monkeypatch):
"""Return unknown when no cycle data exists."""
monkeypatch.setattr(hs, "REPO_ROOT", tmp_path)
config = {"flakiness_lookback_cycles": 20}
signal = hs.check_flakiness(config)
assert signal.status == "unknown"
assert signal.message == "No cycle data available"
def test_calculates_failure_rate(self, tmp_path, monkeypatch):
"""Calculate failure rate from cycle data."""
monkeypatch.setattr(hs, "REPO_ROOT", tmp_path)
retro_dir = tmp_path / ".loop" / "retro"
retro_dir.mkdir(parents=True)
cycles = [
json.dumps({"success": True, "cycle": 1}),
json.dumps({"success": True, "cycle": 2}),
json.dumps({"success": False, "cycle": 3}),
json.dumps({"success": True, "cycle": 4}),
json.dumps({"success": False, "cycle": 5}),
]
retro_file = retro_dir / "cycles.jsonl"
retro_file.write_text("\n".join(cycles))
config = {"flakiness_lookback_cycles": 20}
signal = hs.check_flakiness(config)
assert signal.recent_cycles == 5
assert signal.recent_failures == 2
assert signal.failure_rate == 0.4
assert signal.status == "critical" # 40% > 30%
class TestCheckTokenEconomy:
"""Test token economy checking."""
def test_no_data_returns_unknown(self, tmp_path, monkeypatch):
"""Return unknown when no token data exists."""
monkeypatch.setattr(hs, "REPO_ROOT", tmp_path)
config = {}
signal = hs.check_token_economy(config)
assert signal.status == "unknown"
def test_calculates_balanced(self, tmp_path, monkeypatch):
"""Detect balanced token economy."""
monkeypatch.setattr(hs, "REPO_ROOT", tmp_path)
loop_dir = tmp_path / ".loop"
loop_dir.mkdir(parents=True)
from datetime import datetime
now = datetime.now(UTC).isoformat()
transactions = [
json.dumps({"timestamp": now, "delta": 10}),
json.dumps({"timestamp": now, "delta": -5}),
]
ledger_file = loop_dir / "token_economy.jsonl"
ledger_file.write_text("\n".join(transactions))
config = {}
signal = hs.check_token_economy(config)
assert signal.status == "balanced"
assert signal.recent_mint == 10
assert signal.recent_burn == 5
class TestGiteaClient:
"""Test Gitea API client."""
def test_initialization(self):
"""Initialize with config and token."""
config = {"gitea_api": "http://test:3000/api/v1", "repo_slug": "test/repo"}
client = hs.GiteaClient(config, "token123")
assert client.api_base == "http://test:3000/api/v1"
assert client.repo_slug == "test/repo"
assert client.token == "token123"
def test_headers_with_token(self):
"""Include authorization header with token."""
config = {"gitea_api": "http://test:3000/api/v1", "repo_slug": "test/repo"}
client = hs.GiteaClient(config, "token123")
headers = client._headers()
assert headers["Authorization"] == "token token123"
assert headers["Accept"] == "application/json"
def test_headers_without_token(self):
"""No authorization header without token."""
config = {"gitea_api": "http://test:3000/api/v1", "repo_slug": "test/repo"}
client = hs.GiteaClient(config, None)
headers = client._headers()
assert "Authorization" not in headers
assert headers["Accept"] == "application/json"
class TestGenerateSnapshot:
"""Test snapshot generation."""
def test_returns_snapshot(self):
"""Generate a complete snapshot."""
config = hs.load_config()
with (
patch.object(hs.GiteaClient, "is_available", return_value=False),
patch.object(hs.GiteaClient, "__init__", return_value=None),
):
snapshot = hs.generate_snapshot(config, None)
assert isinstance(snapshot, hs.HealthSnapshot)
assert snapshot.overall_status in ["green", "yellow", "red", "unknown"]
assert snapshot.ci is not None
assert snapshot.issues is not None
assert snapshot.flakiness is not None
assert snapshot.tokens is not None

View File

@@ -0,0 +1,524 @@
"""Tests for token_rules module."""
from __future__ import annotations
import sys
from pathlib import Path
from unittest.mock import patch
import pytest
# Add timmy_automations to path for imports
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations"))
from utils import token_rules as tr
class TestTokenEvent:
"""Test TokenEvent dataclass."""
def test_delta_calculation_reward(self):
"""Delta is positive for rewards."""
event = tr.TokenEvent(
name="test",
description="Test event",
reward=10,
penalty=0,
category="test",
)
assert event.delta == 10
def test_delta_calculation_penalty(self):
"""Delta is negative for penalties."""
event = tr.TokenEvent(
name="test",
description="Test event",
reward=0,
penalty=-5,
category="test",
)
assert event.delta == -5
def test_delta_calculation_mixed(self):
"""Delta is net of reward and penalty."""
event = tr.TokenEvent(
name="test",
description="Test event",
reward=10,
penalty=-3,
category="test",
)
assert event.delta == 7
class TestTokenRulesLoading:
"""Test TokenRules configuration loading."""
def test_loads_from_yaml_file(self, tmp_path):
"""Load configuration from YAML file."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0-test",
"events": {
"test_event": {
"description": "A test event",
"reward": 15,
"category": "test",
}
},
"gating_thresholds": {"test_op": 50},
"daily_limits": {"test": {"max_earn": 100, "max_spend": 10}},
"audit": {"log_all_transactions": False},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
assert rules.get_config_version() == "1.0.0-test"
assert rules.get_delta("test_event") == 15
assert rules.get_gate_threshold("test_op") == 50
def test_fallback_when_yaml_missing(self, tmp_path):
"""Use fallback defaults when YAML file doesn't exist."""
config_file = tmp_path / "nonexistent.yaml"
rules = tr.TokenRules(config_path=config_file)
assert rules.get_config_version() == "fallback"
# Fallback should have some basic events
assert rules.get_delta("pr_merged") == 10
assert rules.get_delta("test_fixed") == 8
assert rules.get_delta("automation_failure") == -2
def test_fallback_when_yaml_not_installed(self, tmp_path):
"""Use fallback when PyYAML is not installed."""
with patch.dict(sys.modules, {"yaml": None}):
config_file = tmp_path / "token_rules.yaml"
config_file.write_text("version: '1.0.0'")
rules = tr.TokenRules(config_path=config_file)
assert rules.get_config_version() == "fallback"
class TestTokenRulesGetDelta:
"""Test get_delta method."""
def test_get_delta_existing_event(self, tmp_path):
"""Get delta for configured event."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {
"pr_merged": {"description": "PR merged", "reward": 10, "category": "merge"},
"automation_failure": {"description": "Failure", "penalty": -2, "category": "ops"},
},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
assert rules.get_delta("pr_merged") == 10
assert rules.get_delta("automation_failure") == -2
def test_get_delta_unknown_event(self, tmp_path):
"""Return 0 for unknown events."""
config_file = tmp_path / "nonexistent.yaml"
rules = tr.TokenRules(config_path=config_file)
assert rules.get_delta("unknown_event") == 0
class TestTokenRulesGetEvent:
"""Test get_event method."""
def test_get_event_returns_full_config(self, tmp_path):
"""Get full event configuration."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {
"pr_merged": {
"description": "PR merged successfully",
"reward": 10,
"category": "merge",
"gate_threshold": 0,
}
},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
event = rules.get_event("pr_merged")
assert event is not None
assert event.name == "pr_merged"
assert event.description == "PR merged successfully"
assert event.reward == 10
assert event.category == "merge"
assert event.gate_threshold == 0
def test_get_event_unknown_returns_none(self, tmp_path):
"""Return None for unknown event."""
config_file = tmp_path / "nonexistent.yaml"
rules = tr.TokenRules(config_path=config_file)
assert rules.get_event("unknown") is None
class TestTokenRulesListEvents:
"""Test list_events method."""
def test_list_all_events(self, tmp_path):
"""List all configured events."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
events = rules.list_events()
assert len(events) == 3
event_names = {e.name for e in events}
assert "event_a" in event_names
assert "event_b" in event_names
assert "event_c" in event_names
def test_list_events_by_category(self, tmp_path):
"""Filter events by category."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
events = rules.list_events(category="cat1")
assert len(events) == 2
for event in events:
assert event.category == "cat1"
class TestTokenRulesGating:
"""Test gating threshold methods."""
def test_check_gate_with_threshold(self, tmp_path):
"""Check gate when threshold is defined."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {},
"gating_thresholds": {"pr_merge": 50},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
assert rules.check_gate("pr_merge", current_tokens=100) is True
assert rules.check_gate("pr_merge", current_tokens=50) is True
assert rules.check_gate("pr_merge", current_tokens=49) is False
assert rules.check_gate("pr_merge", current_tokens=0) is False
def test_check_gate_no_threshold(self, tmp_path):
"""Check gate when no threshold is defined (always allowed)."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {},
"gating_thresholds": {},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
# No threshold defined, should always be allowed
assert rules.check_gate("unknown_op", current_tokens=0) is True
assert rules.check_gate("unknown_op", current_tokens=-100) is True
def test_get_gate_threshold(self, tmp_path):
"""Get threshold value."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"gating_thresholds": {"pr_merge": 50, "sensitive_op": 100},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
assert rules.get_gate_threshold("pr_merge") == 50
assert rules.get_gate_threshold("sensitive_op") == 100
assert rules.get_gate_threshold("unknown") is None
class TestTokenRulesDailyLimits:
"""Test daily limits methods."""
def test_get_daily_limits(self, tmp_path):
"""Get daily limits for a category."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"daily_limits": {
"triage": {"max_earn": 100, "max_spend": 0},
"merge": {"max_earn": 50, "max_spend": 10},
},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
triage_limits = rules.get_daily_limits("triage")
assert triage_limits is not None
assert triage_limits.max_earn == 100
assert triage_limits.max_spend == 0
merge_limits = rules.get_daily_limits("merge")
assert merge_limits is not None
assert merge_limits.max_earn == 50
assert merge_limits.max_spend == 10
def test_get_daily_limits_unknown(self, tmp_path):
"""Return None for unknown category."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {"version": "1.0.0", "daily_limits": {}}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
assert rules.get_daily_limits("unknown") is None
class TestTokenRulesComputeTransaction:
"""Test compute_transaction method."""
def test_compute_successful_transaction(self, tmp_path):
"""Compute transaction for valid event."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {
"pr_merged": {"description": "PR merged", "reward": 10, "category": "merge"}
},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
result = rules.compute_transaction("pr_merged", current_tokens=100)
assert result["event"] == "pr_merged"
assert result["delta"] == 10
assert result["category"] == "merge"
assert result["allowed"] is True
assert result["new_balance"] == 110
assert result["limit_reached"] is False
def test_compute_unknown_event(self, tmp_path):
"""Compute transaction for unknown event."""
config_file = tmp_path / "nonexistent.yaml"
rules = tr.TokenRules(config_path=config_file)
result = rules.compute_transaction("unknown_event", current_tokens=50)
assert result["event"] == "unknown_event"
assert result["delta"] == 0
assert result["allowed"] is False
assert result["reason"] == "unknown_event"
assert result["new_balance"] == 50
def test_compute_with_gate_check(self, tmp_path):
"""Compute transaction respects gating."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {
"sensitive_op": {
"description": "Sensitive",
"reward": 50,
"category": "sensitive",
"gate_threshold": 100,
}
},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
# With enough tokens
result = rules.compute_transaction("sensitive_op", current_tokens=150)
assert result["allowed"] is True
# Without enough tokens
result = rules.compute_transaction("sensitive_op", current_tokens=50)
assert result["allowed"] is False
assert "gate_reason" in result
def test_compute_with_daily_limits(self, tmp_path):
"""Compute transaction respects daily limits."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {
"triage_action": {
"description": "Triage",
"reward": 20,
"category": "triage",
}
},
"daily_limits": {"triage": {"max_earn": 50, "max_spend": 0}},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
# Within limit
daily_earned = {"triage": 20}
result = rules.compute_transaction(
"triage_action", current_tokens=100, current_daily_earned=daily_earned
)
assert result["allowed"] is True
assert result["limit_reached"] is False
# Would exceed limit (20 + 20 > 50 is false, so this should be fine)
# Let's test with higher current earned
daily_earned = {"triage": 40}
result = rules.compute_transaction(
"triage_action", current_tokens=100, current_daily_earned=daily_earned
)
assert result["allowed"] is False
assert result["limit_reached"] is True
assert "limit_reason" in result
class TestTokenRulesCategories:
"""Test category methods."""
def test_get_categories(self, tmp_path):
"""Get all unique categories."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
categories = rules.get_categories()
assert sorted(categories) == ["cat1", "cat2"]
class TestTokenRulesAudit:
"""Test audit methods."""
def test_is_auditable_true(self, tmp_path):
"""Check if auditable when enabled."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {"version": "1.0.0", "audit": {"log_all_transactions": True}}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
assert rules.is_auditable() is True
def test_is_auditable_false(self, tmp_path):
"""Check if auditable when disabled."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {"version": "1.0.0", "audit": {"log_all_transactions": False}}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
assert rules.is_auditable() is False
class TestConvenienceFunctions:
"""Test module-level convenience functions."""
def test_get_token_delta(self, tmp_path):
"""Convenience function returns delta."""
config_file = tmp_path / "nonexistent.yaml"
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
delta = tr.get_token_delta("pr_merged")
assert delta == 10 # From fallback
def test_check_operation_gate(self, tmp_path):
"""Convenience function checks gate."""
config_file = tmp_path / "nonexistent.yaml"
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
# Fallback has pr_merge gate at 0
assert tr.check_operation_gate("pr_merge", current_tokens=0) is True
assert tr.check_operation_gate("pr_merge", current_tokens=100) is True
def test_compute_token_reward(self, tmp_path):
"""Convenience function computes reward."""
config_file = tmp_path / "nonexistent.yaml"
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
result = tr.compute_token_reward("pr_merged", current_tokens=50)
assert result["event"] == "pr_merged"
assert result["delta"] == 10
assert result["new_balance"] == 60
def test_list_token_events(self, tmp_path):
"""Convenience function lists events."""
config_file = tmp_path / "nonexistent.yaml"
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
events = tr.list_token_events()
assert len(events) >= 3 # Fallback has at least 3 events
# Check structure
for event in events:
assert "name" in event
assert "description" in event
assert "delta" in event
assert "category" in event

View File

@@ -0,0 +1,343 @@
"""Tests for weekly_narrative.py script."""
from __future__ import annotations
import json
import sys
from datetime import UTC, datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
# Add timmy_automations to path for imports
sys.path.insert(
0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations" / "daily_run")
)
import weekly_narrative as wn
class TestParseTimestamp:
"""Test timestamp parsing."""
def test_parse_iso_with_z(self):
"""Parse ISO timestamp with Z suffix."""
result = wn.parse_ts("2026-03-21T12:00:00Z")
assert result is not None
assert result.year == 2026
assert result.month == 3
assert result.day == 21
def test_parse_iso_with_offset(self):
"""Parse ISO timestamp with timezone offset."""
result = wn.parse_ts("2026-03-21T12:00:00+00:00")
assert result is not None
assert result.year == 2026
def test_parse_empty_string(self):
"""Empty string returns None."""
result = wn.parse_ts("")
assert result is None
def test_parse_invalid_string(self):
"""Invalid string returns None."""
result = wn.parse_ts("not-a-timestamp")
assert result is None
class TestCollectCyclesData:
"""Test cycle data collection."""
def test_no_cycles_file(self, tmp_path):
"""Handle missing cycles file gracefully."""
with patch.object(wn, "REPO_ROOT", tmp_path):
since = datetime.now(UTC) - timedelta(days=7)
result = wn.collect_cycles_data(since)
assert result["total"] == 0
assert result["successes"] == 0
assert result["failures"] == 0
def test_collect_recent_cycles(self, tmp_path):
"""Collect cycles within lookback period."""
retro_dir = tmp_path / ".loop" / "retro"
retro_dir.mkdir(parents=True)
now = datetime.now(UTC)
cycles = [
{"timestamp": now.isoformat(), "success": True, "cycle": 1},
{"timestamp": now.isoformat(), "success": False, "cycle": 2},
{"timestamp": (now - timedelta(days=10)).isoformat(), "success": True, "cycle": 3},
]
with open(retro_dir / "cycles.jsonl", "w") as f:
for c in cycles:
f.write(json.dumps(c) + "\n")
with patch.object(wn, "REPO_ROOT", tmp_path):
since = now - timedelta(days=7)
result = wn.collect_cycles_data(since)
assert result["total"] == 2 # Only recent 2
assert result["successes"] == 1
assert result["failures"] == 1
class TestExtractThemes:
"""Test theme extraction from issues."""
def test_extract_layer_labels(self):
"""Extract layer labels from issues."""
issues = [
{"labels": [{"name": "layer:triage"}, {"name": "bug"}]},
{"labels": [{"name": "layer:tests"}, {"name": "bug"}]},
{"labels": [{"name": "layer:triage"}, {"name": "feature"}]},
]
result = wn.extract_themes(issues)
assert len(result["layers"]) == 2
layer_names = {layer["name"] for layer in result["layers"]}
assert "triage" in layer_names
assert "tests" in layer_names
def test_extract_type_labels(self):
"""Extract type labels (bug/feature/etc)."""
issues = [
{"labels": [{"name": "bug"}]},
{"labels": [{"name": "feature"}]},
{"labels": [{"name": "bug"}]},
]
result = wn.extract_themes(issues)
type_names = {t_type["name"] for t_type in result["types"]}
assert "bug" in type_names
assert "feature" in type_names
def test_empty_issues(self):
"""Handle empty issue list."""
result = wn.extract_themes([])
assert result["layers"] == []
assert result["types"] == []
assert result["top_labels"] == []
class TestExtractAgentContributions:
"""Test agent contribution extraction."""
def test_extract_assignees(self):
"""Extract assignee counts."""
issues = [
{"assignee": {"login": "kimi"}},
{"assignee": {"login": "hermes"}},
{"assignee": {"login": "kimi"}},
]
result = wn.extract_agent_contributions(issues, [], [])
assert len(result["active_assignees"]) == 2
assignee_logins = {a["login"] for a in result["active_assignees"]} # noqa: E741
assert "kimi" in assignee_logins
assert "hermes" in assignee_logins
def test_extract_pr_authors(self):
"""Extract PR author counts."""
prs = [
{"user": {"login": "kimi"}},
{"user": {"login": "claude"}},
{"user": {"login": "kimi"}},
]
result = wn.extract_agent_contributions([], prs, [])
assert len(result["pr_authors"]) == 2
def test_kimi_mentions_in_cycles(self):
"""Count Kimi mentions in cycle notes."""
cycles = [
{"notes": "Kimi did great work", "reason": ""},
{"notes": "", "reason": "Kimi timeout"},
{"notes": "All good", "reason": ""},
]
result = wn.extract_agent_contributions([], [], cycles)
assert result["kimi_mentioned_cycles"] == 2
class TestAnalyzeTestShifts:
"""Test test pattern analysis."""
def test_no_cycles(self):
"""Handle no cycle data."""
result = wn.analyze_test_shifts([])
assert "note" in result
def test_test_metrics(self):
"""Calculate test metrics from cycles."""
cycles = [
{"tests_passed": 100, "tests_added": 5},
{"tests_passed": 150, "tests_added": 3},
]
result = wn.analyze_test_shifts(cycles)
assert result["total_tests_passed"] == 250
assert result["total_tests_added"] == 8
class TestGenerateVibeSummary:
"""Test vibe summary generation."""
def test_productive_vibe(self):
"""High success rate and activity = productive vibe."""
cycles_data = {"success_rate": 0.95, "successes": 10, "failures": 1}
issues_data = {"closed_count": 5}
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
assert result["overall"] == "productive"
assert "strong week" in result["description"].lower()
def test_struggling_vibe(self):
"""More failures than successes = struggling vibe."""
cycles_data = {"success_rate": 0.3, "successes": 3, "failures": 7}
issues_data = {"closed_count": 0}
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
assert result["overall"] == "struggling"
def test_quiet_vibe(self):
"""Low activity = quiet vibe."""
cycles_data = {"success_rate": 0.0, "successes": 0, "failures": 0}
issues_data = {"closed_count": 0}
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
assert result["overall"] == "quiet"
class TestGenerateMarkdownSummary:
"""Test markdown summary generation."""
def test_includes_header(self):
"""Markdown includes header."""
narrative = {
"period": {"start": "2026-03-14T00:00:00", "end": "2026-03-21T00:00:00"},
"vibe": {"overall": "productive", "description": "Good week"},
"activity": {
"cycles": {"total": 10, "successes": 9, "failures": 1},
"issues": {"closed": 5, "opened": 3},
"pull_requests": {"merged": 4, "opened": 2},
},
}
result = wn.generate_markdown_summary(narrative)
assert "# Weekly Narrative Summary" in result
assert "productive" in result.lower()
assert "10 total" in result or "10" in result
def test_includes_focus_areas(self):
"""Markdown includes focus areas when present."""
narrative = {
"period": {"start": "2026-03-14", "end": "2026-03-21"},
"vibe": {
"overall": "productive",
"description": "Good week",
"focus_areas": ["triage (5 items)", "tests (3 items)"],
},
"activity": {
"cycles": {"total": 0, "successes": 0, "failures": 0},
"issues": {"closed": 0, "opened": 0},
"pull_requests": {"merged": 0, "opened": 0},
},
}
result = wn.generate_markdown_summary(narrative)
assert "Focus Areas" in result
assert "triage" in result
class TestConfigLoading:
"""Test configuration loading."""
def test_default_config(self, tmp_path):
"""Default config when manifest missing."""
with patch.object(wn, "CONFIG_PATH", tmp_path / "nonexistent.json"):
config = wn.load_automation_config()
assert config["lookback_days"] == 7
assert config["enabled"] is True
def test_environment_override(self, tmp_path):
"""Environment variables override config."""
with patch.dict("os.environ", {"TIMMY_WEEKLY_NARRATIVE_ENABLED": "false"}):
with patch.object(wn, "CONFIG_PATH", tmp_path / "nonexistent.json"):
config = wn.load_automation_config()
assert config["enabled"] is False
class TestMain:
"""Test main function."""
def test_disabled_exits_cleanly(self, tmp_path):
"""When disabled and no --force, exits cleanly."""
with patch.object(wn, "REPO_ROOT", tmp_path):
with patch.object(wn, "load_automation_config", return_value={"enabled": False}):
with patch("sys.argv", ["weekly_narrative"]):
result = wn.main()
assert result == 0
def test_force_runs_when_disabled(self, tmp_path):
"""--force runs even when disabled."""
# Setup minimal structure
(tmp_path / ".loop" / "retro").mkdir(parents=True)
with patch.object(wn, "REPO_ROOT", tmp_path):
with patch.object(
wn,
"load_automation_config",
return_value={
"enabled": False,
"lookback_days": 7,
"gitea_api": "http://localhost:3000/api/v1",
"repo_slug": "test/repo",
"token_file": "~/.hermes/gitea_token",
},
):
with patch.object(wn, "GiteaClient") as mock_client:
mock_instance = MagicMock()
mock_instance.is_available.return_value = False
mock_client.return_value = mock_instance
with patch("sys.argv", ["weekly_narrative", "--force"]):
result = wn.main()
# Should complete without error even though Gitea unavailable
assert result == 0
class TestGiteaClient:
"""Test Gitea API client."""
def test_is_available_when_unavailable(self):
"""is_available returns False when server down."""
config = {"gitea_api": "http://localhost:99999", "repo_slug": "test/repo"}
client = wn.GiteaClient(config, None)
# Should return False without raising
assert client.is_available() is False
def test_headers_with_token(self):
"""Headers include Authorization when token provided."""
config = {"gitea_api": "http://localhost:3000", "repo_slug": "test/repo"}
client = wn.GiteaClient(config, "test-token")
headers = client._headers()
assert headers["Authorization"] == "token test-token"
def test_headers_without_token(self):
"""Headers don't include Authorization when no token."""
config = {"gitea_api": "http://localhost:3000", "repo_slug": "test/repo"}
client = wn.GiteaClient(config, None)
headers = client._headers()
assert "Authorization" not in headers

View File

@@ -0,0 +1,489 @@
"""Unit tests for the quest system.
Tests quest definitions, progress tracking, completion detection,
and token rewards.
"""
from __future__ import annotations
import pytest
from timmy.quest_system import (
QuestDefinition,
QuestProgress,
QuestStatus,
QuestType,
_is_on_cooldown,
claim_quest_reward,
evaluate_quest_progress,
get_or_create_progress,
get_quest_definition,
get_quest_leaderboard,
load_quest_config,
reset_quest_progress,
update_quest_progress,
)
@pytest.fixture(autouse=True)
def clean_quest_state():
"""Reset quest progress between tests."""
reset_quest_progress()
yield
reset_quest_progress()
@pytest.fixture
def sample_issue_count_quest():
"""Create a sample issue_count quest definition."""
return QuestDefinition(
id="test_close_issues",
name="Test Issue Closer",
description="Close 3 test issues",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=False,
cooldown_hours=0,
criteria={"target_count": 3, "issue_labels": ["test"]},
notification_message="Test quest complete! Earned {tokens} tokens.",
)
@pytest.fixture
def sample_daily_run_quest():
"""Create a sample daily_run quest definition."""
return QuestDefinition(
id="test_daily_run",
name="Test Daily Runner",
description="Complete 5 sessions",
reward_tokens=250,
quest_type=QuestType.DAILY_RUN,
enabled=True,
repeatable=True,
cooldown_hours=24,
criteria={"min_sessions": 5},
notification_message="Daily run quest complete! Earned {tokens} tokens.",
)
# ── Quest Definition Tests ───────────────────────────────────────────────
class TestQuestDefinition:
def test_from_dict_minimal(self):
data = {"id": "test_quest", "name": "Test Quest"}
quest = QuestDefinition.from_dict(data)
assert quest.id == "test_quest"
assert quest.name == "Test Quest"
assert quest.quest_type == QuestType.CUSTOM
assert quest.enabled is True
def test_from_dict_full(self):
data = {
"id": "full_quest",
"name": "Full Quest",
"description": "A test quest",
"reward_tokens": 500,
"type": "issue_count",
"enabled": False,
"repeatable": True,
"cooldown_hours": 12,
"criteria": {"target_count": 5},
"notification_message": "Done!",
}
quest = QuestDefinition.from_dict(data)
assert quest.id == "full_quest"
assert quest.reward_tokens == 500
assert quest.quest_type == QuestType.ISSUE_COUNT
assert quest.enabled is False
assert quest.repeatable is True
assert quest.cooldown_hours == 12
# ── Quest Progress Tests ─────────────────────────────────────────────────
class TestQuestProgress:
def test_progress_creation(self):
progress = QuestProgress(
quest_id="test_quest",
agent_id="test_agent",
status=QuestStatus.NOT_STARTED,
)
assert progress.quest_id == "test_quest"
assert progress.agent_id == "test_agent"
assert progress.current_value == 0
def test_progress_to_dict(self):
progress = QuestProgress(
quest_id="test_quest",
agent_id="test_agent",
status=QuestStatus.IN_PROGRESS,
current_value=2,
target_value=5,
)
data = progress.to_dict()
assert data["quest_id"] == "test_quest"
assert data["status"] == "in_progress"
assert data["current_value"] == 2
# ── Quest Loading Tests ──────────────────────────────────────────────────
class TestQuestLoading:
def test_load_quest_config(self):
definitions, settings = load_quest_config()
assert isinstance(definitions, dict)
assert isinstance(settings, dict)
def test_get_quest_definition_exists(self):
# Should return None for non-existent quest in fresh state
quest = get_quest_definition("nonexistent")
# The function returns from loaded config, which may have quests
# or be empty if config doesn't exist
assert quest is None or isinstance(quest, QuestDefinition)
def test_get_quest_definition_not_found(self):
quest = get_quest_definition("definitely_not_a_real_quest_12345")
assert quest is None
# ── Quest Progress Management Tests ─────────────────────────────────────
class TestQuestProgressManagement:
def test_get_or_create_progress_new(self):
# First create a quest definition
quest = QuestDefinition(
id="progress_test",
name="Progress Test",
description="Test quest",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=False,
cooldown_hours=0,
criteria={"target_count": 3},
notification_message="Done!",
)
# Need to inject into the definitions dict
from timmy.quest_system import _quest_definitions
_quest_definitions["progress_test"] = quest
progress = get_or_create_progress("progress_test", "agent1")
assert progress.quest_id == "progress_test"
assert progress.agent_id == "agent1"
assert progress.status == QuestStatus.NOT_STARTED
assert progress.target_value == 3
del _quest_definitions["progress_test"]
def test_update_quest_progress(self):
quest = QuestDefinition(
id="update_test",
name="Update Test",
description="Test quest",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=False,
cooldown_hours=0,
criteria={"target_count": 3},
notification_message="Done!",
)
from timmy.quest_system import _quest_definitions
_quest_definitions["update_test"] = quest
# Create initial progress
progress = get_or_create_progress("update_test", "agent1")
assert progress.current_value == 0
# Update progress
updated = update_quest_progress("update_test", "agent1", 2)
assert updated.current_value == 2
assert updated.status == QuestStatus.NOT_STARTED
# Complete the quest
completed = update_quest_progress("update_test", "agent1", 3)
assert completed.current_value == 3
assert completed.status == QuestStatus.COMPLETED
assert completed.completed_at != ""
del _quest_definitions["update_test"]
# ── Quest Evaluation Tests ───────────────────────────────────────────────
class TestQuestEvaluation:
def test_evaluate_issue_count_quest(self):
quest = QuestDefinition(
id="eval_test",
name="Eval Test",
description="Test quest",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=False,
cooldown_hours=0,
criteria={"target_count": 2, "issue_labels": ["test"]},
notification_message="Done!",
)
from timmy.quest_system import _quest_definitions
_quest_definitions["eval_test"] = quest
# Simulate closed issues
closed_issues = [
{"id": 1, "labels": [{"name": "test"}]},
{"id": 2, "labels": [{"name": "test"}, {"name": "bug"}]},
{"id": 3, "labels": [{"name": "other"}]},
]
context = {"closed_issues": closed_issues}
progress = evaluate_quest_progress("eval_test", "agent1", context)
assert progress is not None
assert progress.current_value == 2 # Two issues with 'test' label
del _quest_definitions["eval_test"]
def test_evaluate_issue_reduce_quest(self):
quest = QuestDefinition(
id="reduce_test",
name="Reduce Test",
description="Test quest",
reward_tokens=200,
quest_type=QuestType.ISSUE_REDUCE,
enabled=True,
repeatable=False,
cooldown_hours=0,
criteria={"target_reduction": 2},
notification_message="Done!",
)
from timmy.quest_system import _quest_definitions
_quest_definitions["reduce_test"] = quest
context = {"previous_issue_count": 10, "current_issue_count": 7}
progress = evaluate_quest_progress("reduce_test", "agent1", context)
assert progress is not None
assert progress.current_value == 3 # Reduced by 3
del _quest_definitions["reduce_test"]
def test_evaluate_daily_run_quest(self):
quest = QuestDefinition(
id="daily_test",
name="Daily Test",
description="Test quest",
reward_tokens=250,
quest_type=QuestType.DAILY_RUN,
enabled=True,
repeatable=True,
cooldown_hours=24,
criteria={"min_sessions": 5},
notification_message="Done!",
)
from timmy.quest_system import _quest_definitions
_quest_definitions["daily_test"] = quest
context = {"sessions_completed": 5}
progress = evaluate_quest_progress("daily_test", "agent1", context)
assert progress is not None
assert progress.current_value == 5
assert progress.status == QuestStatus.COMPLETED
del _quest_definitions["daily_test"]
# ── Quest Cooldown Tests ─────────────────────────────────────────────────
class TestQuestCooldown:
def test_is_on_cooldown_no_cooldown(self):
quest = QuestDefinition(
id="cooldown_test",
name="Cooldown Test",
description="Test quest",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=True,
cooldown_hours=24,
criteria={},
notification_message="Done!",
)
progress = QuestProgress(
quest_id="cooldown_test",
agent_id="agent1",
status=QuestStatus.CLAIMED,
)
# No last_completed_at means no cooldown
assert _is_on_cooldown(progress, quest) is False
# ── Quest Reward Tests ───────────────────────────────────────────────────
class TestQuestReward:
def test_claim_quest_reward_not_completed(self):
quest = QuestDefinition(
id="reward_test",
name="Reward Test",
description="Test quest",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=False,
cooldown_hours=0,
criteria={"target_count": 3},
notification_message="Done!",
)
from timmy.quest_system import _quest_definitions, _quest_progress
_quest_definitions["reward_test"] = quest
# Create progress but don't complete
progress = get_or_create_progress("reward_test", "agent1")
_quest_progress["agent1:reward_test"] = progress
# Try to claim - should fail
reward = claim_quest_reward("reward_test", "agent1")
assert reward is None
del _quest_definitions["reward_test"]
# ── Leaderboard Tests ────────────────────────────────────────────────────
class TestQuestLeaderboard:
def test_get_quest_leaderboard_empty(self):
reset_quest_progress()
leaderboard = get_quest_leaderboard()
assert leaderboard == []
def test_get_quest_leaderboard_with_data(self):
# Create and complete a quest for two agents
quest = QuestDefinition(
id="leaderboard_test",
name="Leaderboard Test",
description="Test quest",
reward_tokens=100,
quest_type=QuestType.ISSUE_COUNT,
enabled=True,
repeatable=True,
cooldown_hours=0,
criteria={"target_count": 1},
notification_message="Done!",
)
from timmy.quest_system import _quest_definitions, _quest_progress
_quest_definitions["leaderboard_test"] = quest
# Create progress for agent1 with 2 completions
progress1 = QuestProgress(
quest_id="leaderboard_test",
agent_id="agent1",
status=QuestStatus.NOT_STARTED,
completion_count=2,
)
_quest_progress["agent1:leaderboard_test"] = progress1
# Create progress for agent2 with 1 completion
progress2 = QuestProgress(
quest_id="leaderboard_test",
agent_id="agent2",
status=QuestStatus.NOT_STARTED,
completion_count=1,
)
_quest_progress["agent2:leaderboard_test"] = progress2
leaderboard = get_quest_leaderboard()
assert len(leaderboard) == 2
# agent1 should be first (more tokens)
assert leaderboard[0]["agent_id"] == "agent1"
assert leaderboard[0]["total_tokens"] == 200
assert leaderboard[1]["agent_id"] == "agent2"
assert leaderboard[1]["total_tokens"] == 100
del _quest_definitions["leaderboard_test"]
# ── Quest Reset Tests ─────────────────────────────────────────────────────
class TestQuestReset:
def test_reset_quest_progress_all(self):
# Create some progress entries
progress1 = QuestProgress(
quest_id="quest1", agent_id="agent1", status=QuestStatus.NOT_STARTED
)
progress2 = QuestProgress(
quest_id="quest2", agent_id="agent2", status=QuestStatus.NOT_STARTED
)
from timmy.quest_system import _quest_progress
_quest_progress["agent1:quest1"] = progress1
_quest_progress["agent2:quest2"] = progress2
assert len(_quest_progress) == 2
count = reset_quest_progress()
assert count == 2
assert len(_quest_progress) == 0
def test_reset_quest_progress_specific_quest(self):
progress1 = QuestProgress(
quest_id="quest1", agent_id="agent1", status=QuestStatus.NOT_STARTED
)
progress2 = QuestProgress(
quest_id="quest2", agent_id="agent1", status=QuestStatus.NOT_STARTED
)
from timmy.quest_system import _quest_progress
_quest_progress["agent1:quest1"] = progress1
_quest_progress["agent1:quest2"] = progress2
count = reset_quest_progress(quest_id="quest1")
assert count == 1
assert "agent1:quest1" not in _quest_progress
assert "agent1:quest2" in _quest_progress
def test_reset_quest_progress_specific_agent(self):
progress1 = QuestProgress(
quest_id="quest1", agent_id="agent1", status=QuestStatus.NOT_STARTED
)
progress2 = QuestProgress(
quest_id="quest1", agent_id="agent2", status=QuestStatus.NOT_STARTED
)
from timmy.quest_system import _quest_progress
_quest_progress["agent1:quest1"] = progress1
_quest_progress["agent2:quest1"] = progress2
count = reset_quest_progress(agent_id="agent1")
assert count == 1
assert "agent1:quest1" not in _quest_progress
assert "agent2:quest1" in _quest_progress

View File

@@ -1,6 +1,9 @@
{
"version": "1.0.0",
"description": "Master manifest of all Timmy automations",
"_health_snapshot": {
"note": "Quick health check before coding — CI, P0/P1 issues, flakiness"
},
"last_updated": "2026-03-21",
"automations": [
{
@@ -228,6 +231,43 @@
"max_items": 5
},
"outputs": []
},
{
"id": "weekly_narrative",
"name": "Weekly Narrative Summary",
"description": "Generates a human-readable weekly summary of work themes, agent contributions, and token economy shifts",
"script": "timmy_automations/daily_run/weekly_narrative.py",
"category": "daily_run",
"enabled": true,
"trigger": "scheduled",
"schedule": "weekly",
"executable": "python3",
"config": {
"lookback_days": 7,
"output_file": ".loop/weekly_narrative.json",
"gitea_api": "http://localhost:3000/api/v1",
"repo_slug": "rockachopa/Timmy-time-dashboard"
},
"outputs": [
".loop/weekly_narrative.json",
".loop/weekly_narrative.md"
]
},
{
"id": "health_snapshot",
"name": "Health Snapshot",
"description": "Quick health check before coding — CI status, P0/P1 issues, test flakiness, token economy",
"script": "timmy_automations/daily_run/health_snapshot.py",
"category": "daily_run",
"enabled": true,
"trigger": "pre_cycle",
"executable": "python3",
"config": {
"critical_labels": ["P0", "P1", "priority/critical", "priority/high"],
"flakiness_lookback_cycles": 20,
"ci_timeout_seconds": 5
},
"outputs": []
}
]
}

View File

@@ -17,6 +17,10 @@
"manual": {
"description": "Run on-demand only",
"automations": ["agent_workspace", "kimi_bootstrap", "kimi_resume", "backfill_retro"]
},
"weekly": {
"description": "Run once per week (Sundays)",
"automations": ["weekly_narrative"]
}
},
"triggers": {

View File

@@ -0,0 +1,138 @@
# Token Rules — Agent reward/penalty configuration for automations
#
# This file defines the token economy for agent actions.
# Modify values here to adjust incentives without code changes.
#
# Used by: timmy_automations.utils.token_rules
version: "1.0.0"
description: "Token economy rules for agent automations"
# ── Events ─────────────────────────────────────────────────────────────────
# Each event type defines rewards/penalties and optional gating thresholds
events:
# Triage actions
triage_success:
description: "Successfully triaged an issue (scored and categorized)"
reward: 5
category: "triage"
deep_triage_refinement:
description: "LLM-driven issue refinement with acceptance criteria added"
reward: 20
category: "triage"
quarantine_candidate_found:
description: "Identified a repeat failure issue for quarantine"
reward: 10
category: "triage"
# Daily Run completions
daily_run_completed:
description: "Completed a daily run cycle successfully"
reward: 5
category: "daily_run"
golden_path_generated:
description: "Generated a coherent mini-session plan"
reward: 3
category: "daily_run"
weekly_narrative_created:
description: "Generated weekly summary of work themes"
reward: 15
category: "daily_run"
# PR merges
pr_merged:
description: "Successfully merged a pull request"
reward: 10
category: "merge"
# Gating: requires minimum tokens to perform
gate_threshold: 0
pr_merged_with_tests:
description: "Merged PR with all tests passing"
reward: 15
category: "merge"
gate_threshold: 0
# Test fixes
test_fixed:
description: "Fixed a failing test"
reward: 8
category: "test"
test_added:
description: "Added new test coverage"
reward: 5
category: "test"
critical_bug_fixed:
description: "Fixed a critical bug on main"
reward: 25
category: "test"
# General operations
automation_run:
description: "Ran any automation (resource usage)"
penalty: -1
category: "operation"
automation_failure:
description: "Automation failed or produced error"
penalty: -2
category: "operation"
cycle_retro_logged:
description: "Logged structured retrospective data"
reward: 5
category: "operation"
pre_commit_passed:
description: "Pre-commit checks passed"
reward: 2
category: "operation"
pre_commit_failed:
description: "Pre-commit checks failed"
penalty: -1
category: "operation"
# ── Gating Thresholds ──────────────────────────────────────────────────────
# Minimum token balances required for sensitive operations
gating_thresholds:
pr_merge: 0
sensitive_config_change: 50
agent_workspace_create: 10
deep_triage_run: 0
# ── Daily Limits ───────────────────────────────────────────────────────────
# Maximum tokens that can be earned/spent per category per day
daily_limits:
triage:
max_earn: 100
max_spend: 0
daily_run:
max_earn: 50
max_spend: 0
merge:
max_earn: 100
max_spend: 0
test:
max_earn: 100
max_spend: 0
operation:
max_earn: 50
max_spend: 50
# ── Audit Settings ─────────────────────────────────────────────────────────
# Settings for token audit and inspection
audit:
log_all_transactions: true
log_retention_days: 30
inspectable_by: ["orchestrator", "auditor", "timmy"]

View File

@@ -0,0 +1,619 @@
#!/usr/bin/env python3
"""Quick health snapshot before coding — checks CI, issues, flakiness.
A fast status check that shows major red/green signals before deeper work.
Runs in a few seconds and produces a concise summary.
Run: python3 timmy_automations/daily_run/health_snapshot.py
Env: GITEA_API, GITEA_TOKEN, REPO_SLUG
Refs: #710
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
# ── Configuration ─────────────────────────────────────────────────────────
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
DEFAULT_CONFIG = {
"gitea_api": "http://localhost:3000/api/v1",
"repo_slug": "rockachopa/Timmy-time-dashboard",
"token_file": "~/.hermes/gitea_token",
"critical_labels": ["P0", "P1", "priority/critical", "priority/high"],
"flakiness_lookback_cycles": 20,
"ci_timeout_seconds": 5,
}
def load_config() -> dict:
"""Load configuration with fallback to defaults."""
config = DEFAULT_CONFIG.copy()
# Environment variable overrides
if os.environ.get("TIMMY_GITEA_API"):
config["gitea_api"] = os.environ["TIMMY_GITEA_API"]
if os.environ.get("TIMMY_REPO_SLUG"):
config["repo_slug"] = os.environ["TIMMY_REPO_SLUG"]
if os.environ.get("TIMMY_GITEA_TOKEN"):
config["token"] = os.environ["TIMMY_GITEA_TOKEN"]
return config
def get_token(config: dict) -> str | None:
"""Get Gitea token from environment or file."""
if "token" in config:
return config["token"]
# Try timmy's token file
repo_root = Path(__file__).resolve().parent.parent.parent
timmy_token_path = repo_root / ".timmy_gitea_token"
if timmy_token_path.exists():
return timmy_token_path.read_text().strip()
# Fallback to legacy token file
token_file = Path(config["token_file"]).expanduser()
if token_file.exists():
return token_file.read_text().strip()
return None
# ── Gitea API Client ──────────────────────────────────────────────────────
class GiteaClient:
"""Simple Gitea API client with graceful degradation."""
def __init__(self, config: dict, token: str | None):
self.api_base = config["gitea_api"].rstrip("/")
self.repo_slug = config["repo_slug"]
self.token = token
self._available: bool | None = None
def _headers(self) -> dict:
headers = {"Accept": "application/json"}
if self.token:
headers["Authorization"] = f"token {self.token}"
return headers
def _api_url(self, path: str) -> str:
return f"{self.api_base}/repos/{self.repo_slug}/{path}"
def is_available(self) -> bool:
"""Check if Gitea API is reachable."""
if self._available is not None:
return self._available
try:
req = Request(
f"{self.api_base}/version",
headers=self._headers(),
method="GET",
)
with urlopen(req, timeout=3) as resp:
self._available = resp.status == 200
return self._available
except (HTTPError, URLError, TimeoutError):
self._available = False
return False
def get(self, path: str, params: dict | None = None) -> list | dict:
"""Make a GET request to the Gitea API."""
url = self._api_url(path)
if params:
query = "&".join(f"{k}={v}" for k, v in params.items())
url = f"{url}?{query}"
req = Request(url, headers=self._headers(), method="GET")
with urlopen(req, timeout=10) as resp:
return json.loads(resp.read())
def get_paginated(self, path: str, params: dict | None = None) -> list:
"""Fetch all pages of a paginated endpoint."""
all_items = []
page = 1
limit = 50
while True:
page_params = {"limit": limit, "page": page}
if params:
page_params.update(params)
batch = self.get(path, page_params)
if not batch:
break
all_items.extend(batch)
if len(batch) < limit:
break
page += 1
return all_items
# ── Data Models ───────────────────────────────────────────────────────────
@dataclass
class CISignal:
"""CI pipeline status signal."""
status: str # "pass", "fail", "unknown", "unavailable"
message: str
details: dict[str, Any] = field(default_factory=dict)
@dataclass
class IssueSignal:
"""Critical issues signal."""
count: int
p0_count: int
p1_count: int
issues: list[dict[str, Any]] = field(default_factory=list)
@dataclass
class FlakinessSignal:
"""Test flakiness/error rate signal."""
status: str # "healthy", "degraded", "critical", "unknown"
recent_failures: int
recent_cycles: int
failure_rate: float
message: str
@dataclass
class TokenEconomySignal:
"""Token economy temperature indicator."""
status: str # "balanced", "inflationary", "deflationary", "unknown"
message: str
recent_mint: int = 0
recent_burn: int = 0
@dataclass
class HealthSnapshot:
"""Complete health snapshot."""
timestamp: str
overall_status: str # "green", "yellow", "red"
ci: CISignal
issues: IssueSignal
flakiness: FlakinessSignal
tokens: TokenEconomySignal
def to_dict(self) -> dict[str, Any]:
return {
"timestamp": self.timestamp,
"overall_status": self.overall_status,
"ci": {
"status": self.ci.status,
"message": self.ci.message,
"details": self.ci.details,
},
"issues": {
"count": self.issues.count,
"p0_count": self.issues.p0_count,
"p1_count": self.issues.p1_count,
"issues": self.issues.issues[:5], # Limit to 5
},
"flakiness": {
"status": self.flakiness.status,
"recent_failures": self.flakiness.recent_failures,
"recent_cycles": self.flakiness.recent_cycles,
"failure_rate": round(self.flakiness.failure_rate, 2),
"message": self.flakiness.message,
},
"tokens": {
"status": self.tokens.status,
"message": self.tokens.message,
"recent_mint": self.tokens.recent_mint,
"recent_burn": self.tokens.recent_burn,
},
}
# ── Health Check Functions ────────────────────────────────────────────────
def check_ci_status(client: GiteaClient, config: dict) -> CISignal:
"""Check CI pipeline status from recent commits."""
try:
# Get recent commits with status
commits = client.get_paginated("commits", {"limit": 5})
if not commits:
return CISignal(
status="unknown",
message="No recent commits found",
)
# Check status for most recent commit
latest = commits[0]
sha = latest.get("sha", "")
try:
statuses = client.get(f"commits/{sha}/status")
state = statuses.get("state", "unknown")
if state == "success":
return CISignal(
status="pass",
message="CI passing",
details={"sha": sha[:8], "state": state},
)
elif state in ("failure", "error"):
return CISignal(
status="fail",
message=f"CI failed ({state})",
details={"sha": sha[:8], "state": state},
)
elif state == "pending":
return CISignal(
status="unknown",
message="CI pending",
details={"sha": sha[:8], "state": state},
)
else:
return CISignal(
status="unknown",
message=f"CI status: {state}",
details={"sha": sha[:8], "state": state},
)
except (HTTPError, URLError) as exc:
return CISignal(
status="unknown",
message=f"Could not fetch CI status: {exc}",
)
except (HTTPError, URLError) as exc:
return CISignal(
status="unavailable",
message=f"CI check failed: {exc}",
)
def check_critical_issues(client: GiteaClient, config: dict) -> IssueSignal:
"""Check for open P0/P1 issues."""
critical_labels = config.get("critical_labels", ["P0", "P1"])
try:
# Fetch open issues
issues = client.get_paginated("issues", {"state": "open", "limit": 100})
p0_issues = []
p1_issues = []
other_critical = []
for issue in issues:
labels = [l.get("name", "").lower() for l in issue.get("labels", [])]
# Check for P0/P1 labels
is_p0 = any("p0" in l or "critical" in l for l in labels)
is_p1 = any("p1" in l or "high" in l for l in labels)
issue_summary = {
"number": issue.get("number"),
"title": issue.get("title", "Untitled")[:60],
"url": issue.get("html_url", ""),
}
if is_p0:
p0_issues.append(issue_summary)
elif is_p1:
p1_issues.append(issue_summary)
elif any(cl.lower() in labels for cl in critical_labels):
other_critical.append(issue_summary)
all_critical = p0_issues + p1_issues + other_critical
return IssueSignal(
count=len(all_critical),
p0_count=len(p0_issues),
p1_count=len(p1_issues),
issues=all_critical[:10], # Limit stored issues
)
except (HTTPError, URLError) as exc:
return IssueSignal(
count=0,
p0_count=0,
p1_count=0,
issues=[],
)
def check_flakiness(config: dict) -> FlakinessSignal:
"""Check test flakiness from cycle retrospective data."""
retro_file = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
lookback = config.get("flakiness_lookback_cycles", 20)
if not retro_file.exists():
return FlakinessSignal(
status="unknown",
recent_failures=0,
recent_cycles=0,
failure_rate=0.0,
message="No cycle data available",
)
try:
entries = []
for line in retro_file.read_text().strip().splitlines():
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
# Get recent entries
recent = entries[-lookback:] if len(entries) > lookback else entries
failures = [e for e in recent if not e.get("success", True)]
failure_count = len(failures)
total_count = len(recent)
if total_count == 0:
return FlakinessSignal(
status="unknown",
recent_failures=0,
recent_cycles=0,
failure_rate=0.0,
message="No recent cycle data",
)
failure_rate = failure_count / total_count
# Determine status based on failure rate
if failure_rate < 0.1:
status = "healthy"
message = f"Low flakiness ({failure_rate:.0%})"
elif failure_rate < 0.3:
status = "degraded"
message = f"Moderate flakiness ({failure_rate:.0%})"
else:
status = "critical"
message = f"High flakiness ({failure_rate:.0%})"
return FlakinessSignal(
status=status,
recent_failures=failure_count,
recent_cycles=total_count,
failure_rate=failure_rate,
message=message,
)
except (OSError, ValueError) as exc:
return FlakinessSignal(
status="unknown",
recent_failures=0,
recent_cycles=0,
failure_rate=0.0,
message=f"Could not read cycle data: {exc}",
)
def check_token_economy(config: dict) -> TokenEconomySignal:
"""Check token economy temperature from recent transactions."""
# This is a simplified check - in a full implementation,
# this would query the token ledger
ledger_file = REPO_ROOT / ".loop" / "token_economy.jsonl"
if not ledger_file.exists():
return TokenEconomySignal(
status="unknown",
message="No token economy data",
)
try:
# Read last 24 hours of transactions
since = datetime.now(timezone.utc) - timedelta(hours=24)
recent_mint = 0
recent_burn = 0
for line in ledger_file.read_text().strip().splitlines():
try:
tx = json.loads(line)
tx_time = datetime.fromisoformat(tx.get("timestamp", "1970-01-01").replace("Z", "+00:00"))
if tx_time >= since:
delta = tx.get("delta", 0)
if delta > 0:
recent_mint += delta
else:
recent_burn += abs(delta)
except (json.JSONDecodeError, ValueError):
continue
# Simple temperature check
if recent_mint > recent_burn * 2:
status = "inflationary"
message = f"High mint activity (+{recent_mint}/-{recent_burn})"
elif recent_burn > recent_mint * 2:
status = "deflationary"
message = f"High burn activity (+{recent_mint}/-{recent_burn})"
else:
status = "balanced"
message = f"Balanced flow (+{recent_mint}/-{recent_burn})"
return TokenEconomySignal(
status=status,
message=message,
recent_mint=recent_mint,
recent_burn=recent_burn,
)
except (OSError, ValueError) as exc:
return TokenEconomySignal(
status="unknown",
message=f"Could not read token data: {exc}",
)
def calculate_overall_status(
ci: CISignal,
issues: IssueSignal,
flakiness: FlakinessSignal,
) -> str:
"""Calculate overall status from individual signals."""
# Red conditions
if ci.status == "fail":
return "red"
if issues.p0_count > 0:
return "red"
if flakiness.status == "critical":
return "red"
# Yellow conditions
if ci.status == "unknown":
return "yellow"
if issues.p1_count > 0:
return "yellow"
if flakiness.status == "degraded":
return "yellow"
# Green
return "green"
# ── Main Functions ────────────────────────────────────────────────────────
def generate_snapshot(config: dict, token: str | None) -> HealthSnapshot:
"""Generate a complete health snapshot."""
client = GiteaClient(config, token)
# Always run all checks (don't short-circuit)
if client.is_available():
ci = check_ci_status(client, config)
issues = check_critical_issues(client, config)
else:
ci = CISignal(
status="unavailable",
message="Gitea unavailable",
)
issues = IssueSignal(count=0, p0_count=0, p1_count=0, issues=[])
flakiness = check_flakiness(config)
tokens = check_token_economy(config)
overall = calculate_overall_status(ci, issues, flakiness)
return HealthSnapshot(
timestamp=datetime.now(timezone.utc).isoformat(),
overall_status=overall,
ci=ci,
issues=issues,
flakiness=flakiness,
tokens=tokens,
)
def print_snapshot(snapshot: HealthSnapshot, verbose: bool = False) -> None:
"""Print a formatted health snapshot."""
# Status emoji
status_emoji = {"green": "🟢", "yellow": "🟡", "red": "🔴"}.get(
snapshot.overall_status, ""
)
print("=" * 60)
print(f"{status_emoji} HEALTH SNAPSHOT")
print("=" * 60)
print(f"Generated: {snapshot.timestamp}")
print(f"Overall: {snapshot.overall_status.upper()}")
print()
# CI Status
ci_emoji = {"pass": "", "fail": "", "unknown": "⚠️", "unavailable": ""}.get(
snapshot.ci.status, ""
)
print(f"{ci_emoji} CI: {snapshot.ci.message}")
# Issues
if snapshot.issues.p0_count > 0:
issue_emoji = "🔴"
elif snapshot.issues.p1_count > 0:
issue_emoji = "🟡"
else:
issue_emoji = ""
print(f"{issue_emoji} Issues: {snapshot.issues.count} critical")
if snapshot.issues.p0_count > 0:
print(f" 🔴 P0: {snapshot.issues.p0_count}")
if snapshot.issues.p1_count > 0:
print(f" 🟡 P1: {snapshot.issues.p1_count}")
# Flakiness
flak_emoji = {"healthy": "", "degraded": "🟡", "critical": "🔴", "unknown": ""}.get(
snapshot.flakiness.status, ""
)
print(f"{flak_emoji} Flakiness: {snapshot.flakiness.message}")
# Token Economy
token_emoji = {"balanced": "", "inflationary": "🟡", "deflationary": "🔵", "unknown": ""}.get(
snapshot.tokens.status, ""
)
print(f"{token_emoji} Tokens: {snapshot.tokens.message}")
# Verbose: show issue details
if verbose and snapshot.issues.issues:
print()
print("Critical Issues:")
for issue in snapshot.issues.issues[:5]:
print(f" #{issue['number']}: {issue['title'][:50]}")
print()
print("" * 60)
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Quick health snapshot before coding",
)
p.add_argument(
"--json", "-j",
action="store_true",
help="Output as JSON",
)
p.add_argument(
"--verbose", "-v",
action="store_true",
help="Show verbose output including issue details",
)
p.add_argument(
"--quiet", "-q",
action="store_true",
help="Only show status line (no details)",
)
return p.parse_args()
def main() -> int:
"""Main entry point for CLI."""
args = parse_args()
config = load_config()
token = get_token(config)
snapshot = generate_snapshot(config, token)
if args.json:
print(json.dumps(snapshot.to_dict(), indent=2))
elif args.quiet:
status_emoji = {"green": "🟢", "yellow": "🟡", "red": "🔴"}.get(
snapshot.overall_status, ""
)
print(f"{status_emoji} {snapshot.overall_status.upper()}")
else:
print_snapshot(snapshot, verbose=args.verbose)
# Exit with non-zero if red status
return 0 if snapshot.overall_status != "red" else 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -22,6 +22,14 @@ from typing import Any
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
# ── Token Economy Integration ──────────────────────────────────────────────
# Import token rules helpers for tracking Daily Run rewards
sys.path.insert(
0, str(Path(__file__).resolve().parent.parent)
)
from utils.token_rules import TokenRules, compute_token_reward
# ── Configuration ─────────────────────────────────────────────────────────
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
@@ -490,6 +498,43 @@ def parse_args() -> argparse.Namespace:
return p.parse_args()
def compute_daily_run_tokens(success: bool = True) -> dict[str, Any]:
"""Compute token rewards for Daily Run completion.
Uses the centralized token_rules configuration to calculate
rewards/penalties for automation actions.
Args:
success: Whether the Daily Run completed successfully
Returns:
Token transaction details
"""
rules = TokenRules()
if success:
# Daily run completed successfully
transaction = compute_token_reward("daily_run_completed", current_tokens=0)
# Also compute golden path generation if agenda was created
agenda_transaction = compute_token_reward("golden_path_generated", current_tokens=0)
return {
"daily_run": transaction,
"golden_path": agenda_transaction,
"total_delta": transaction.get("delta", 0) + agenda_transaction.get("delta", 0),
"config_version": rules.get_config_version(),
}
else:
# Automation failed
transaction = compute_token_reward("automation_failure", current_tokens=0)
return {
"automation_failure": transaction,
"total_delta": transaction.get("delta", 0),
"config_version": rules.get_config_version(),
}
def main() -> int:
args = parse_args()
config = load_config()
@@ -503,10 +548,13 @@ def main() -> int:
# Check Gitea availability
if not client.is_available():
error_msg = "[orchestrator] Error: Gitea API is not available"
# Compute failure tokens even when unavailable
tokens = compute_daily_run_tokens(success=False)
if args.json:
print(json.dumps({"error": error_msg}))
print(json.dumps({"error": error_msg, "tokens": tokens}))
else:
print(error_msg, file=sys.stderr)
print(f"[tokens] Failure penalty: {tokens['total_delta']}", file=sys.stderr)
return 1
# Fetch candidates and generate agenda
@@ -521,9 +569,12 @@ def main() -> int:
cycles = load_cycle_data()
day_summary = generate_day_summary(activity, cycles)
# Compute token rewards for successful completion
tokens = compute_daily_run_tokens(success=True)
# Output
if args.json:
output = {"agenda": agenda}
output = {"agenda": agenda, "tokens": tokens}
if day_summary:
output["day_summary"] = day_summary
print(json.dumps(output, indent=2))
@@ -531,6 +582,15 @@ def main() -> int:
print_agenda(agenda)
if day_summary and activity:
print_day_summary(day_summary, activity)
# Show token rewards
print("" * 60)
print("🪙 Token Rewards")
print("" * 60)
print(f"Daily Run completed: +{tokens['daily_run']['delta']} tokens")
if candidates:
print(f"Golden path generated: +{tokens['golden_path']['delta']} tokens")
print(f"Total: +{tokens['total_delta']} tokens")
print(f"Config version: {tokens['config_version']}")
return 0

View File

@@ -0,0 +1,745 @@
#!/usr/bin/env python3
"""Weekly narrative summary generator — human-readable loop analysis.
Analyzes the past week's activity across the development loop to produce
a narrative summary of:
- What changed (themes, areas of focus)
- How agents and Timmy contributed
- Any shifts in tests, triage, or token economy
The output is designed to be skimmable — a quick read that gives context
on the week's progress without drowning in metrics.
Run: python3 timmy_automations/daily_run/weekly_narrative.py [--json]
Env: See timmy_automations/config/automations.json for configuration
Refs: #719
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from collections import Counter
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
# ── Configuration ─────────────────────────────────────────────────────────
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
CONFIG_PATH = Path(__file__).parent.parent / "config" / "automations.json"
DEFAULT_CONFIG = {
"gitea_api": "http://localhost:3000/api/v1",
"repo_slug": "rockachopa/Timmy-time-dashboard",
"token_file": "~/.hermes/gitea_token",
"lookback_days": 7,
"output_file": ".loop/weekly_narrative.json",
"enabled": True,
}
# ── Data Loading ───────────────────────────────────────────────────────────
def load_automation_config() -> dict:
"""Load configuration for weekly_narrative from automations manifest."""
config = DEFAULT_CONFIG.copy()
if CONFIG_PATH.exists():
try:
manifest = json.loads(CONFIG_PATH.read_text())
for auto in manifest.get("automations", []):
if auto.get("id") == "weekly_narrative":
config.update(auto.get("config", {}))
config["enabled"] = auto.get("enabled", True)
break
except (json.JSONDecodeError, OSError) as exc:
print(f"[weekly_narrative] Warning: Could not load config: {exc}", file=sys.stderr)
# Environment variable overrides
if os.environ.get("TIMMY_GITEA_API"):
config["gitea_api"] = os.environ.get("TIMMY_GITEA_API")
if os.environ.get("TIMMY_REPO_SLUG"):
config["repo_slug"] = os.environ.get("TIMMY_REPO_SLUG")
if os.environ.get("TIMMY_GITEA_TOKEN"):
config["token"] = os.environ.get("TIMMY_GITEA_TOKEN")
if os.environ.get("TIMMY_WEEKLY_NARRATIVE_ENABLED"):
config["enabled"] = os.environ.get("TIMMY_WEEKLY_NARRATIVE_ENABLED", "true").lower() == "true"
return config
def get_token(config: dict) -> str | None:
"""Get Gitea token from environment or file."""
if "token" in config:
return config["token"]
token_file = Path(config["token_file"]).expanduser()
if token_file.exists():
return token_file.read_text().strip()
return None
def load_jsonl(path: Path) -> list[dict]:
"""Load a JSONL file, skipping bad lines."""
if not path.exists():
return []
entries = []
for line in path.read_text().strip().splitlines():
try:
entries.append(json.loads(line))
except (json.JSONDecodeError, ValueError):
continue
return entries
def parse_ts(ts_str: str) -> datetime | None:
"""Parse an ISO timestamp, tolerating missing tz."""
if not ts_str:
return None
try:
dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=UTC)
return dt
except (ValueError, TypeError):
return None
# ── Gitea API Client ───────────────────────────────────────────────────────
class GiteaClient:
"""Simple Gitea API client with graceful degradation."""
def __init__(self, config: dict, token: str | None):
self.api_base = config["gitea_api"].rstrip("/")
self.repo_slug = config["repo_slug"]
self.token = token
self._available: bool | None = None
def _headers(self) -> dict:
headers = {"Accept": "application/json"}
if self.token:
headers["Authorization"] = f"token {self.token}"
return headers
def _api_url(self, path: str) -> str:
return f"{self.api_base}/repos/{self.repo_slug}/{path}"
def is_available(self) -> bool:
"""Check if Gitea API is reachable."""
if self._available is not None:
return self._available
try:
req = Request(
f"{self.api_base}/version",
headers=self._headers(),
method="GET",
)
with urlopen(req, timeout=5) as resp:
self._available = resp.status == 200
return self._available
except (HTTPError, URLError, TimeoutError):
self._available = False
return False
def get_paginated(self, path: str, params: dict | None = None) -> list:
"""Fetch all pages of a paginated endpoint."""
all_items = []
page = 1
limit = 50
while True:
url = self._api_url(path)
query_parts = [f"limit={limit}", f"page={page}"]
if params:
for key, val in params.items():
query_parts.append(f"{key}={val}")
url = f"{url}?{'&'.join(query_parts)}"
req = Request(url, headers=self._headers(), method="GET")
with urlopen(req, timeout=15) as resp:
batch = json.loads(resp.read())
if not batch:
break
all_items.extend(batch)
if len(batch) < limit:
break
page += 1
return all_items
# ── Data Collection ────────────────────────────────────────────────────────
def collect_cycles_data(since: datetime) -> dict:
"""Load cycle retrospective data from the lookback period."""
cycles_file = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
if not cycles_file.exists():
return {"cycles": [], "total": 0, "successes": 0, "failures": 0}
entries = load_jsonl(cycles_file)
recent = []
for e in entries:
ts = parse_ts(e.get("timestamp", ""))
if ts and ts >= since:
recent.append(e)
successes = [e for e in recent if e.get("success")]
failures = [e for e in recent if not e.get("success")]
return {
"cycles": recent,
"total": len(recent),
"successes": len(successes),
"failures": len(failures),
"success_rate": round(len(successes) / len(recent), 2) if recent else 0,
}
def collect_issues_data(client: GiteaClient, since: datetime) -> dict:
"""Collect issue activity from Gitea."""
if not client.is_available():
return {"error": "Gitea unavailable", "issues": [], "closed": [], "opened": []}
try:
issues = client.get_paginated("issues", {"state": "all", "sort": "updated", "limit": 100})
except (HTTPError, URLError) as exc:
return {"error": str(exc), "issues": [], "closed": [], "opened": []}
touched = []
closed = []
opened = []
for issue in issues:
updated_at = issue.get("updated_at", "")
created_at = issue.get("created_at", "")
updated = parse_ts(updated_at)
created = parse_ts(created_at)
if updated and updated >= since:
touched.append(issue)
if issue.get("state") == "closed":
closed_at = issue.get("closed_at", "")
closed_dt = parse_ts(closed_at)
if closed_dt and closed_dt >= since:
closed.append(issue)
elif created and created >= since:
opened.append(issue)
return {
"issues": touched,
"closed": closed,
"opened": opened,
"touched_count": len(touched),
"closed_count": len(closed),
"opened_count": len(opened),
}
def collect_prs_data(client: GiteaClient, since: datetime) -> dict:
"""Collect PR activity from Gitea."""
if not client.is_available():
return {"error": "Gitea unavailable", "prs": [], "merged": [], "opened": []}
try:
prs = client.get_paginated("pulls", {"state": "all", "sort": "updated", "limit": 100})
except (HTTPError, URLError) as exc:
return {"error": str(exc), "prs": [], "merged": [], "opened": []}
touched = []
merged = []
opened = []
for pr in prs:
updated_at = pr.get("updated_at", "")
created_at = pr.get("created_at", "")
merged_at = pr.get("merged_at", "")
updated = parse_ts(updated_at)
created = parse_ts(created_at)
merged_dt = parse_ts(merged_at) if merged_at else None
if updated and updated >= since:
touched.append(pr)
if pr.get("merged") and merged_dt and merged_dt >= since:
merged.append(pr)
elif created and created >= since:
opened.append(pr)
return {
"prs": touched,
"merged": merged,
"opened": opened,
"touched_count": len(touched),
"merged_count": len(merged),
"opened_count": len(opened),
}
def collect_triage_data(since: datetime) -> dict:
"""Load triage and introspection data."""
triage_file = REPO_ROOT / ".loop" / "retro" / "triage.jsonl"
insights_file = REPO_ROOT / ".loop" / "retro" / "insights.json"
triage_entries = load_jsonl(triage_file)
recent_triage = [
e for e in triage_entries
if parse_ts(e.get("timestamp", "")) and parse_ts(e.get("timestamp", "")) >= since
]
insights = {}
if insights_file.exists():
try:
insights = json.loads(insights_file.read_text())
except (json.JSONDecodeError, OSError):
pass
return {
"triage_runs": len(recent_triage),
"triage_entries": recent_triage,
"latest_insights": insights,
}
def collect_token_data(since: datetime) -> dict:
"""Load token economy data from the lightning ledger."""
# The ledger is in-memory but we can look for any persisted data
# For now, return placeholder that will be filled by the ledger module
return {
"note": "Token economy data is ephemeral — check dashboard for live metrics",
"balance_sats": 0, # Placeholder
"transactions_week": 0,
}
# ── Analysis Functions ─────────────────────────────────────────────────────
def extract_themes(issues: list[dict]) -> list[dict]:
"""Extract themes from issue labels."""
label_counts = Counter()
layer_counts = Counter()
type_counts = Counter()
for issue in issues:
for label in issue.get("labels", []):
name = label.get("name", "")
label_counts[name] += 1
if name.startswith("layer:"):
layer_counts[name.replace("layer:", "")] += 1
if name in ("bug", "feature", "refactor", "docs", "test", "chore"):
type_counts[name] += 1
# Top themes (labels excluding layer prefixes)
themes = [
{"name": name, "count": count}
for name, count in label_counts.most_common(10)
if not name.startswith(("layer:", "size:"))
]
# Layers
layers = [
{"name": name, "count": count}
for name, count in layer_counts.most_common()
]
# Types
types = [
{"name": name, "count": count}
for name, count in type_counts.most_common()
]
return {
"top_labels": themes,
"layers": layers,
"types": types,
}
def extract_agent_contributions(issues: list[dict], prs: list[dict], cycles: list[dict]) -> dict:
"""Extract agent contribution patterns."""
# Count by assignee
assignee_counts = Counter()
for issue in issues:
assignee = issue.get("assignee")
if assignee and isinstance(assignee, dict):
assignee_counts[assignee.get("login", "unknown")] += 1
# Count PR authors
pr_authors = Counter()
for pr in prs:
user = pr.get("user")
if user and isinstance(user, dict):
pr_authors[user.get("login", "unknown")] += 1
# Check for Kimi mentions in cycle notes
kimi_mentions = sum(
1 for c in cycles
if "kimi" in c.get("notes", "").lower() or "kimi" in c.get("reason", "").lower()
)
return {
"active_assignees": [
{"login": login, "issues_count": count}
for login, count in assignee_counts.most_common()
],
"pr_authors": [
{"login": login, "prs_count": count}
for login, count in pr_authors.most_common()
],
"kimi_mentioned_cycles": kimi_mentions,
}
def analyze_test_shifts(cycles: list[dict]) -> dict:
"""Analyze shifts in test patterns."""
if not cycles:
return {"note": "No cycle data available"}
total_tests_passed = sum(c.get("tests_passed", 0) for c in cycles)
total_tests_added = sum(c.get("tests_added", 0) for c in cycles)
avg_tests_per_cycle = round(total_tests_passed / len(cycles), 1) if cycles else 0
# Look for test-related issues
test_focused = [
c for c in cycles
if c.get("type") == "test" or "test" in c.get("notes", "").lower()
]
return {
"total_tests_passed": total_tests_passed,
"total_tests_added": total_tests_added,
"avg_tests_per_cycle": avg_tests_per_cycle,
"test_focused_cycles": len(test_focused),
}
def analyze_triage_shifts(triage_data: dict) -> dict:
"""Analyze shifts in triage patterns."""
insights = triage_data.get("latest_insights", {})
recommendations = insights.get("recommendations", [])
high_priority_recs = [
r for r in recommendations
if r.get("severity") == "high"
]
return {
"triage_runs": triage_data.get("triage_runs", 0),
"insights_generated": insights.get("generated_at") is not None,
"high_priority_recommendations": len(high_priority_recs),
"recent_recommendations": recommendations[:3] if recommendations else [],
}
def generate_vibe_summary(
cycles_data: dict,
issues_data: dict,
prs_data: dict,
themes: dict,
agent_contrib: dict,
test_shifts: dict,
triage_shifts: dict,
) -> dict:
"""Generate the human-readable 'vibe' summary."""
# Determine overall vibe
success_rate = cycles_data.get("success_rate", 0)
failures = cycles_data.get("failures", 0)
closed_count = issues_data.get("closed_count", 0)
merged_count = prs_data.get("merged_count", 0)
if success_rate >= 0.9 and closed_count > 0:
vibe = "productive"
vibe_description = "A strong week with solid delivery and healthy success rates."
elif success_rate >= 0.7:
vibe = "steady"
vibe_description = "Steady progress with some bumps. Things are moving forward."
elif failures > cycles_data.get("successes", 0):
vibe = "struggling"
vibe_description = "A challenging week with more failures than successes. Time to regroup."
else:
vibe = "quiet"
vibe_description = "A lighter week with limited activity."
# Focus areas from themes
focus_areas = []
for layer in themes.get("layers", [])[:3]:
focus_areas.append(f"{layer['name']} ({layer['count']} items)")
# Agent activity summary
agent_summary = ""
active_assignees = agent_contrib.get("active_assignees", [])
if active_assignees:
top_agent = active_assignees[0]
agent_summary = f"{top_agent['login']} led with {top_agent['issues_count']} assigned issues."
# Notable events
notable = []
if merged_count > 5:
notable.append(f"{merged_count} PRs merged — high integration velocity")
if triage_shifts.get("high_priority_recommendations", 0) > 0:
notable.append("High-priority recommendations from loop introspection")
if test_shifts.get("test_focused_cycles", 0) > 3:
notable.append("Strong test coverage focus")
if not notable:
notable.append("Regular development flow")
return {
"overall": vibe,
"description": vibe_description,
"focus_areas": focus_areas,
"agent_summary": agent_summary,
"notable_events": notable,
}
# ── Narrative Generation ───────────────────────────────────────────────────
def generate_narrative(
cycles_data: dict,
issues_data: dict,
prs_data: dict,
triage_data: dict,
themes: dict,
agent_contrib: dict,
test_shifts: dict,
triage_shifts: dict,
token_data: dict,
since: datetime,
until: datetime,
) -> dict:
"""Generate the complete weekly narrative."""
vibe = generate_vibe_summary(
cycles_data, issues_data, prs_data, themes, agent_contrib, test_shifts, triage_shifts
)
return {
"generated_at": datetime.now(UTC).isoformat(),
"period": {
"start": since.isoformat(),
"end": until.isoformat(),
"days": 7,
},
"vibe": vibe,
"activity": {
"cycles": {
"total": cycles_data.get("total", 0),
"successes": cycles_data.get("successes", 0),
"failures": cycles_data.get("failures", 0),
"success_rate": cycles_data.get("success_rate", 0),
},
"issues": {
"touched": issues_data.get("touched_count", 0),
"closed": issues_data.get("closed_count", 0),
"opened": issues_data.get("opened_count", 0),
},
"pull_requests": {
"touched": prs_data.get("touched_count", 0),
"merged": prs_data.get("merged_count", 0),
"opened": prs_data.get("opened_count", 0),
},
},
"themes": themes,
"agents": agent_contrib,
"test_health": test_shifts,
"triage_health": triage_shifts,
"token_economy": token_data,
}
def generate_markdown_summary(narrative: dict) -> str:
"""Generate a human-readable markdown summary."""
vibe = narrative.get("vibe", {})
activity = narrative.get("activity", {})
cycles = activity.get("cycles", {})
issues = activity.get("issues", {})
prs = activity.get("pull_requests", {})
lines = [
"# Weekly Narrative Summary",
"",
f"**Period:** {narrative['period']['start'][:10]} to {narrative['period']['end'][:10]}",
f"**Vibe:** {vibe.get('overall', 'unknown').title()}",
"",
f"{vibe.get('description', '')}",
"",
"## Activity Highlights",
"",
f"- **Development Cycles:** {cycles.get('total', 0)} total ({cycles.get('successes', 0)} success, {cycles.get('failures', 0)} failure)",
f"- **Issues:** {issues.get('closed', 0)} closed, {issues.get('opened', 0)} opened",
f"- **Pull Requests:** {prs.get('merged', 0)} merged, {prs.get('opened', 0)} opened",
"",
]
# Focus areas
focus = vibe.get("focus_areas", [])
if focus:
lines.append("## Focus Areas")
lines.append("")
for area in focus:
lines.append(f"- {area}")
lines.append("")
# Agent contributions
agent_summary = vibe.get("agent_summary", "")
if agent_summary:
lines.append("## Agent Activity")
lines.append("")
lines.append(agent_summary)
lines.append("")
# Notable events
notable = vibe.get("notable_events", [])
if notable:
lines.append("## Notable Events")
lines.append("")
for event in notable:
lines.append(f"- {event}")
lines.append("")
# Triage health
triage = narrative.get("triage_health", {})
if triage.get("high_priority_recommendations", 0) > 0:
lines.append("## Triage Notes")
lines.append("")
lines.append(f"⚠️ {triage['high_priority_recommendations']} high-priority recommendation(s) from loop introspection.")
lines.append("")
for rec in triage.get("recent_recommendations", [])[:2]:
lines.append(f"- **{rec.get('category', 'general')}:** {rec.get('finding', '')}")
lines.append("")
return "\n".join(lines)
# ── Main ───────────────────────────────────────────────────────────────────
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Generate weekly narrative summary of work and vibes",
)
p.add_argument(
"--json", "-j",
action="store_true",
help="Output as JSON instead of markdown",
)
p.add_argument(
"--output", "-o",
type=str,
default=None,
help="Output file path (default from config)",
)
p.add_argument(
"--days",
type=int,
default=None,
help="Override lookback days (default 7)",
)
p.add_argument(
"--force",
action="store_true",
help="Run even if disabled in config",
)
return p.parse_args()
def main() -> int:
args = parse_args()
config = load_automation_config()
# Check if enabled
if not config.get("enabled", True) and not args.force:
print("[weekly_narrative] Skipped — weekly narrative is disabled in config")
print("[weekly_narrative] Use --force to run anyway")
return 0
# Determine lookback period
days = args.days if args.days is not None else config.get("lookback_days", 7)
until = datetime.now(UTC)
since = until - timedelta(days=days)
print(f"[weekly_narrative] Generating narrative for the past {days} days...")
# Setup Gitea client
token = get_token(config)
client = GiteaClient(config, token)
if not client.is_available():
print("[weekly_narrative] Warning: Gitea API unavailable — will use local data only")
# Collect data
cycles_data = collect_cycles_data(since)
issues_data = collect_issues_data(client, since)
prs_data = collect_prs_data(client, since)
triage_data = collect_triage_data(since)
token_data = collect_token_data(since)
# Analyze
themes = extract_themes(issues_data.get("issues", []))
agent_contrib = extract_agent_contributions(
issues_data.get("issues", []),
prs_data.get("prs", []),
cycles_data.get("cycles", []),
)
test_shifts = analyze_test_shifts(cycles_data.get("cycles", []))
triage_shifts = analyze_triage_shifts(triage_data)
# Generate narrative
narrative = generate_narrative(
cycles_data,
issues_data,
prs_data,
triage_data,
themes,
agent_contrib,
test_shifts,
triage_shifts,
token_data,
since,
until,
)
# Determine output path
output_path = args.output or config.get("output_file", ".loop/weekly_narrative.json")
output_file = REPO_ROOT / output_path
output_file.parent.mkdir(parents=True, exist_ok=True)
# Write JSON output
output_file.write_text(json.dumps(narrative, indent=2) + "\n")
# Write markdown summary alongside JSON
md_output_file = output_file.with_suffix(".md")
md_output_file.write_text(generate_markdown_summary(narrative))
# Print output
if args.json:
print(json.dumps(narrative, indent=2))
else:
print()
print(generate_markdown_summary(narrative))
print(f"\n[weekly_narrative] Written to: {output_file}")
print(f"[weekly_narrative] Markdown summary: {md_output_file}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,6 @@
"""Timmy Automations utilities.
Shared helper modules for automations.
"""
from __future__ import annotations

View File

@@ -0,0 +1,389 @@
"""Token rules helper — Compute token deltas for agent actions.
This module loads token economy configuration from YAML and provides
functions for automations to compute token rewards/penalties.
Usage:
from timmy_automations.utils.token_rules import TokenRules
rules = TokenRules()
delta = rules.get_delta("pr_merged")
print(f"PR merge reward: {delta}") # 10
# Check if agent can perform sensitive operation
can_merge = rules.check_gate("pr_merge", current_tokens=25)
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@dataclass
class TokenEvent:
"""Represents a single token event configuration."""
name: str
description: str
reward: int
penalty: int
category: str
gate_threshold: int | None = None
@property
def delta(self) -> int:
"""Net token delta (reward + penalty)."""
return self.reward + self.penalty
@dataclass
class TokenCategoryLimits:
"""Daily limits for a token category."""
max_earn: int
max_spend: int
class TokenRules:
"""Token economy rules loader and calculator.
Loads configuration from timmy_automations/config/token_rules.yaml
and provides methods to compute token deltas and check gating.
"""
CONFIG_PATH = Path(__file__).parent.parent / "config" / "token_rules.yaml"
def __init__(self, config_path: Path | None = None) -> None:
"""Initialize token rules from configuration file.
Args:
config_path: Optional override for config file location.
"""
self._config_path = config_path or self.CONFIG_PATH
self._events: dict[str, TokenEvent] = {}
self._gating: dict[str, int] = {}
self._daily_limits: dict[str, TokenCategoryLimits] = {}
self._audit: dict[str, Any] = {}
self._version: str = "unknown"
self._load_config()
def _load_config(self) -> None:
"""Load configuration from YAML file."""
# Graceful degradation if yaml not available or file missing
try:
import yaml
except ImportError:
# YAML not installed, use fallback defaults
self._load_fallback_defaults()
return
if not self._config_path.exists():
self._load_fallback_defaults()
return
try:
config = yaml.safe_load(self._config_path.read_text())
if not config:
self._load_fallback_defaults()
return
self._version = config.get("version", "unknown")
self._parse_events(config.get("events", {}))
self._parse_gating(config.get("gating_thresholds", {}))
self._parse_daily_limits(config.get("daily_limits", {}))
self._audit = config.get("audit", {})
except Exception:
# Any error loading config, use fallbacks
self._load_fallback_defaults()
def _load_fallback_defaults(self) -> None:
"""Load minimal fallback defaults if config unavailable."""
self._version = "fallback"
self._events = {
"pr_merged": TokenEvent(
name="pr_merged",
description="Successfully merged a pull request",
reward=10,
penalty=0,
category="merge",
gate_threshold=0,
),
"test_fixed": TokenEvent(
name="test_fixed",
description="Fixed a failing test",
reward=8,
penalty=0,
category="test",
),
"automation_failure": TokenEvent(
name="automation_failure",
description="Automation failed",
reward=0,
penalty=-2,
category="operation",
),
}
self._gating = {"pr_merge": 0}
self._daily_limits = {}
self._audit = {"log_all_transactions": True}
def _parse_events(self, events_config: dict) -> None:
"""Parse event configurations from YAML."""
for name, config in events_config.items():
if not isinstance(config, dict):
continue
self._events[name] = TokenEvent(
name=name,
description=config.get("description", ""),
reward=config.get("reward", 0),
penalty=config.get("penalty", 0),
category=config.get("category", "unknown"),
gate_threshold=config.get("gate_threshold"),
)
def _parse_gating(self, gating_config: dict) -> None:
"""Parse gating thresholds from YAML."""
for name, threshold in gating_config.items():
if isinstance(threshold, int):
self._gating[name] = threshold
def _parse_daily_limits(self, limits_config: dict) -> None:
"""Parse daily limits from YAML."""
for category, limits in limits_config.items():
if isinstance(limits, dict):
self._daily_limits[category] = TokenCategoryLimits(
max_earn=limits.get("max_earn", 0),
max_spend=limits.get("max_spend", 0),
)
def get_delta(self, event_name: str) -> int:
"""Get token delta for an event.
Args:
event_name: Name of the event (e.g., "pr_merged", "test_fixed")
Returns:
Net token delta (positive for reward, negative for penalty)
"""
event = self._events.get(event_name)
if event:
return event.delta
return 0
def get_event(self, event_name: str) -> TokenEvent | None:
"""Get full event configuration.
Args:
event_name: Name of the event
Returns:
TokenEvent object or None if not found
"""
return self._events.get(event_name)
def list_events(self, category: str | None = None) -> list[TokenEvent]:
"""List all configured events.
Args:
category: Optional category filter
Returns:
List of TokenEvent objects
"""
events = list(self._events.values())
if category:
events = [e for e in events if e.category == category]
return events
def check_gate(self, operation: str, current_tokens: int) -> bool:
"""Check if agent meets token threshold for an operation.
Args:
operation: Operation name (e.g., "pr_merge")
current_tokens: Agent's current token balance
Returns:
True if agent can perform the operation
"""
threshold = self._gating.get(operation)
if threshold is None:
return True # No gate defined, allow
return current_tokens >= threshold
def get_gate_threshold(self, operation: str) -> int | None:
"""Get the gating threshold for an operation.
Args:
operation: Operation name
Returns:
Threshold value or None if no gate defined
"""
return self._gating.get(operation)
def get_daily_limits(self, category: str) -> TokenCategoryLimits | None:
"""Get daily limits for a category.
Args:
category: Category name
Returns:
TokenCategoryLimits or None if not defined
"""
return self._daily_limits.get(category)
def compute_transaction(
self,
event_name: str,
current_tokens: int = 0,
current_daily_earned: dict[str, int] | None = None,
) -> dict[str, Any]:
"""Compute a complete token transaction.
This is the main entry point for agents to use. It returns
a complete transaction record with delta, gating check, and limits.
Args:
event_name: Name of the event
current_tokens: Agent's current token balance
current_daily_earned: Dict of category -> tokens earned today
Returns:
Transaction dict with:
- event: Event name
- delta: Token delta
- allowed: Whether operation is allowed (gating)
- new_balance: Projected new balance
- limit_reached: Whether daily limit would be exceeded
"""
event = self._events.get(event_name)
if not event:
return {
"event": event_name,
"delta": 0,
"allowed": False,
"reason": "unknown_event",
"new_balance": current_tokens,
"limit_reached": False,
}
delta = event.delta
new_balance = current_tokens + delta
# Check gating (for penalties, we don't check gates)
allowed = True
gate_reason = None
if delta > 0 and event.gate_threshold is not None: # Only check gates for positive operations with thresholds
allowed = current_tokens >= event.gate_threshold
if not allowed:
gate_reason = f"requires {event.gate_threshold} tokens"
# Check daily limits
limit_reached = False
limit_reason = None
if current_daily_earned and event.category in current_daily_earned:
limits = self._daily_limits.get(event.category)
if limits:
current_earned = current_daily_earned.get(event.category, 0)
if delta > 0 and current_earned + delta > limits.max_earn:
limit_reached = True
limit_reason = f"daily earn limit ({limits.max_earn}) reached"
result = {
"event": event_name,
"delta": delta,
"category": event.category,
"allowed": allowed and not limit_reached,
"new_balance": new_balance,
"limit_reached": limit_reached,
}
if gate_reason:
result["gate_reason"] = gate_reason
if limit_reason:
result["limit_reason"] = limit_reason
return result
def get_config_version(self) -> str:
"""Get the loaded configuration version."""
return self._version
def get_categories(self) -> list[str]:
"""Get list of all configured categories."""
categories = {e.category for e in self._events.values()}
return sorted(categories)
def is_auditable(self) -> bool:
"""Check if transactions should be logged for audit."""
return self._audit.get("log_all_transactions", True)
# Convenience functions for simple use cases
def get_token_delta(event_name: str) -> int:
"""Get token delta for an event (convenience function).
Args:
event_name: Name of the event
Returns:
Token delta (positive for reward, negative for penalty)
"""
return TokenRules().get_delta(event_name)
def check_operation_gate(operation: str, current_tokens: int) -> bool:
"""Check if agent can perform operation (convenience function).
Args:
operation: Operation name
current_tokens: Agent's current token balance
Returns:
True if operation is allowed
"""
return TokenRules().check_gate(operation, current_tokens)
def compute_token_reward(
event_name: str,
current_tokens: int = 0,
) -> dict[str, Any]:
"""Compute token reward for an event (convenience function).
Args:
event_name: Name of the event
current_tokens: Agent's current token balance
Returns:
Transaction dict with delta, allowed status, new balance
"""
return TokenRules().compute_transaction(event_name, current_tokens)
def list_token_events(category: str | None = None) -> list[dict[str, Any]]:
"""List all token events (convenience function).
Args:
category: Optional category filter
Returns:
List of event dicts with name, description, delta, category
"""
rules = TokenRules()
events = rules.list_events(category)
return [
{
"name": e.name,
"description": e.description,
"delta": e.delta,
"category": e.category,
"gate_threshold": e.gate_threshold,
}
for e in events
]