diff --git a/config/quests.yaml b/config/quests.yaml
new file mode 100644
index 00000000..d28740ec
--- /dev/null
+++ b/config/quests.yaml
@@ -0,0 +1,178 @@
+# ── Token Quest System Configuration ─────────────────────────────────────────
+#
+# Quests are special objectives that agents (and humans) can complete for
+# bonus tokens. Each quest has:
+# - id: Unique identifier
+# - name: Display name
+# - description: What the quest requires
+# - reward_tokens: Number of tokens awarded on completion
+# - criteria: Detection rules for completion
+# - enabled: Whether this quest is active
+# - repeatable: Whether this quest can be completed multiple times
+# - cooldown_hours: Minimum hours between completions (if repeatable)
+#
+# Quest Types:
+# - issue_count: Complete when N issues matching criteria are closed
+# - issue_reduce: Complete when open issue count drops by N
+# - docs_update: Complete when documentation files are updated
+# - test_improve: Complete when test coverage/cases improve
+# - daily_run: Complete Daily Run session objectives
+# - custom: Special quests with manual completion
+#
+# ── Active Quests ─────────────────────────────────────────────────────────────
+
+quests:
+ # ── Daily Run & Test Improvement Quests ───────────────────────────────────
+
+ close_flaky_tests:
+ id: close_flaky_tests
+ name: Flaky Test Hunter
+ description: Close 3 issues labeled "flaky-test"
+ reward_tokens: 150
+ type: issue_count
+ enabled: true
+ repeatable: true
+ cooldown_hours: 24
+ criteria:
+ issue_labels:
+ - flaky-test
+ target_count: 3
+ issue_state: closed
+ lookback_days: 7
+ notification_message: "Quest Complete! You closed 3 flaky-test issues and earned {tokens} tokens."
+
+ reduce_p1_issues:
+ id: reduce_p1_issues
+ name: Priority Firefighter
+ description: Reduce open P1 Daily Run issues by 2
+ reward_tokens: 200
+ type: issue_reduce
+ enabled: true
+ repeatable: true
+ cooldown_hours: 48
+ criteria:
+ issue_labels:
+ - layer:triage
+ - P1
+ target_reduction: 2
+ lookback_days: 3
+ notification_message: "Quest Complete! You reduced P1 issues by 2 and earned {tokens} tokens."
+
+ improve_test_coverage:
+ id: improve_test_coverage
+ name: Coverage Champion
+ description: Improve test coverage by 5% or add 10 new test cases
+ reward_tokens: 300
+ type: test_improve
+ enabled: true
+ repeatable: false
+ criteria:
+ coverage_increase_percent: 5
+ min_new_tests: 10
+ notification_message: "Quest Complete! You improved test coverage and earned {tokens} tokens."
+
+ complete_daily_run_session:
+ id: complete_daily_run_session
+ name: Daily Runner
+ description: Successfully complete 5 Daily Run sessions in a week
+ reward_tokens: 250
+ type: daily_run
+ enabled: true
+ repeatable: true
+ cooldown_hours: 168 # 1 week
+ criteria:
+ min_sessions: 5
+ lookback_days: 7
+ notification_message: "Quest Complete! You completed 5 Daily Run sessions and earned {tokens} tokens."
+
+ # ── Documentation & Maintenance Quests ────────────────────────────────────
+
+ improve_automation_docs:
+ id: improve_automation_docs
+ name: Documentation Hero
+ description: Improve documentation for automations (update 3+ doc files)
+ reward_tokens: 100
+ type: docs_update
+ enabled: true
+ repeatable: true
+ cooldown_hours: 72
+ criteria:
+ file_patterns:
+ - "docs/**/*.md"
+ - "**/README.md"
+ - "timmy_automations/**/*.md"
+ min_files_changed: 3
+ lookback_days: 7
+ notification_message: "Quest Complete! You improved automation docs and earned {tokens} tokens."
+
+ close_micro_fixes:
+ id: close_micro_fixes
+ name: Micro Fix Master
+ description: Close 5 issues labeled "layer:micro-fix"
+ reward_tokens: 125
+ type: issue_count
+ enabled: true
+ repeatable: true
+ cooldown_hours: 24
+ criteria:
+ issue_labels:
+ - layer:micro-fix
+ target_count: 5
+ issue_state: closed
+ lookback_days: 7
+ notification_message: "Quest Complete! You closed 5 micro-fix issues and earned {tokens} tokens."
+
+ # ── Special Achievements ──────────────────────────────────────────────────
+
+ first_contribution:
+ id: first_contribution
+ name: First Steps
+ description: Make your first contribution (close any issue)
+ reward_tokens: 50
+ type: issue_count
+ enabled: true
+ repeatable: false
+ criteria:
+ target_count: 1
+ issue_state: closed
+ lookback_days: 30
+ notification_message: "Welcome! You completed your first contribution and earned {tokens} tokens."
+
+ bug_squasher:
+ id: bug_squasher
+ name: Bug Squasher
+ description: Close 10 issues labeled "bug"
+ reward_tokens: 500
+ type: issue_count
+ enabled: true
+ repeatable: true
+ cooldown_hours: 168 # 1 week
+ criteria:
+ issue_labels:
+ - bug
+ target_count: 10
+ issue_state: closed
+ lookback_days: 7
+ notification_message: "Quest Complete! You squashed 10 bugs and earned {tokens} tokens."
+
+# ── Quest System Settings ───────────────────────────────────────────────────
+
+settings:
+ # Enable/disable quest notifications
+ notifications_enabled: true
+
+ # Maximum number of concurrent active quests per agent
+ max_concurrent_quests: 5
+
+ # Auto-detect quest completions on Daily Run metrics update
+ auto_detect_on_daily_run: true
+
+ # Gitea issue labels that indicate quest-related work
+ quest_work_labels:
+ - layer:triage
+ - layer:micro-fix
+ - layer:tests
+ - layer:economy
+ - flaky-test
+ - bug
+ - documentation
diff --git a/src/dashboard/app.py b/src/dashboard/app.py
index 56b92369..43c980fa 100644
--- a/src/dashboard/app.py
+++ b/src/dashboard/app.py
@@ -43,6 +43,7 @@ from dashboard.routes.memory import router as memory_router
from dashboard.routes.mobile import router as mobile_router
from dashboard.routes.models import api_router as models_api_router
from dashboard.routes.models import router as models_router
+from dashboard.routes.quests import router as quests_router
from dashboard.routes.spark import router as spark_router
from dashboard.routes.system import router as system_router
from dashboard.routes.tasks import router as tasks_router
@@ -627,6 +628,7 @@ app.include_router(world_router)
app.include_router(matrix_router)
app.include_router(tower_router)
app.include_router(daily_run_router)
+app.include_router(quests_router)
@app.websocket("/ws")
diff --git a/src/dashboard/routes/daily_run.py b/src/dashboard/routes/daily_run.py
index e60f1903..f24ea09d 100644
--- a/src/dashboard/routes/daily_run.py
+++ b/src/dashboard/routes/daily_run.py
@@ -365,6 +365,15 @@ async def daily_run_metrics_api(lookback_days: int = 7):
status_code=503,
)
+ # Check for quest completions based on Daily Run metrics
+ quest_rewards = []
+ try:
+ from dashboard.routes.quests import check_daily_run_quests
+
+ quest_rewards = await check_daily_run_quests(agent_id="system")
+ except Exception as exc:
+ logger.debug("Quest checking failed: %s", exc)
+
return JSONResponse(
{
"status": "ok",
@@ -389,6 +398,7 @@ async def daily_run_metrics_api(lookback_days: int = 7):
"previous": metrics.total_touched_previous,
},
"generated_at": metrics.generated_at,
+ "quest_rewards": quest_rewards,
}
)
diff --git a/src/dashboard/routes/quests.py b/src/dashboard/routes/quests.py
new file mode 100644
index 00000000..f3444761
--- /dev/null
+++ b/src/dashboard/routes/quests.py
@@ -0,0 +1,377 @@
+"""Quest system routes for agent token rewards.
+
+Provides API endpoints for:
+- Listing quests and their status
+- Claiming quest rewards
+- Getting quest leaderboard
+- Quest progress tracking
+"""
+
+from __future__ import annotations
+
+import logging
+from typing import Any
+
+from fastapi import APIRouter, Request
+from fastapi.responses import HTMLResponse, JSONResponse
+from pydantic import BaseModel
+
+from dashboard.templating import templates
+from timmy.quest_system import (
+ QuestStatus,
+ auto_evaluate_all_quests,
+ claim_quest_reward,
+ evaluate_quest_progress,
+ get_active_quests,
+ get_agent_quests_status,
+ get_quest_definition,
+ get_quest_leaderboard,
+ load_quest_config,
+)
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter(prefix="/quests", tags=["quests"])
+
+
+class ClaimQuestRequest(BaseModel):
+ """Request to claim a quest reward."""
+
+ agent_id: str
+ quest_id: str
+
+
+class EvaluateQuestRequest(BaseModel):
+ """Request to manually evaluate quest progress."""
+
+ agent_id: str
+ quest_id: str
+
+
+# ---------------------------------------------------------------------------
+# API Endpoints
+# ---------------------------------------------------------------------------
+
+
+@router.get("/api/definitions")
+async def get_quest_definitions_api() -> JSONResponse:
+ """Get all quest definitions.
+
+ Returns:
+ JSON list of all quest definitions with their criteria.
+ """
+ definitions = get_active_quests()
+ return JSONResponse(
+ {
+ "quests": [
+ {
+ "id": q.id,
+ "name": q.name,
+ "description": q.description,
+ "reward_tokens": q.reward_tokens,
+ "type": q.quest_type.value,
+ "repeatable": q.repeatable,
+ "cooldown_hours": q.cooldown_hours,
+ "criteria": q.criteria,
+ }
+ for q in definitions
+ ]
+ }
+ )
+
+
+@router.get("/api/status/{agent_id}")
+async def get_agent_quest_status(agent_id: str) -> JSONResponse:
+ """Get quest status for a specific agent.
+
+ Returns:
+ Complete quest status including progress, completion counts,
+ and tokens earned.
+ """
+ status = get_agent_quests_status(agent_id)
+ return JSONResponse(status)
+
+
+@router.post("/api/claim")
+async def claim_quest_reward_api(request: ClaimQuestRequest) -> JSONResponse:
+ """Claim a quest reward for an agent.
+
+ The quest must be completed but not yet claimed.
+ """
+ reward = claim_quest_reward(request.quest_id, request.agent_id)
+
+ if not reward:
+ return JSONResponse(
+ {
+ "success": False,
+ "error": "Quest not completed, already claimed, or on cooldown",
+ },
+ status_code=400,
+ )
+
+ return JSONResponse(
+ {
+ "success": True,
+ "reward": reward,
+ }
+ )
+
+
+@router.post("/api/evaluate")
+async def evaluate_quest_api(request: EvaluateQuestRequest) -> JSONResponse:
+ """Manually evaluate quest progress with provided context.
+
+ This is useful for testing or when the quest completion
+ needs to be triggered manually.
+ """
+ quest = get_quest_definition(request.quest_id)
+ if not quest:
+ return JSONResponse(
+ {"success": False, "error": "Quest not found"},
+ status_code=404,
+ )
+
+ # Build evaluation context based on quest type
+ context = await _build_evaluation_context(quest)
+
+ progress = evaluate_quest_progress(request.quest_id, request.agent_id, context)
+
+ if not progress:
+ return JSONResponse(
+ {"success": False, "error": "Failed to evaluate quest"},
+ status_code=500,
+ )
+
+ # Auto-claim if completed
+ reward = None
+ if progress.status == QuestStatus.COMPLETED:
+ reward = claim_quest_reward(request.quest_id, request.agent_id)
+
+ return JSONResponse(
+ {
+ "success": True,
+ "progress": progress.to_dict(),
+ "reward": reward,
+ "completed": progress.status == QuestStatus.COMPLETED,
+ }
+ )
+
+
+@router.get("/api/leaderboard")
+async def get_leaderboard_api() -> JSONResponse:
+ """Get the quest completion leaderboard.
+
+ Returns agents sorted by total tokens earned.
+ """
+ leaderboard = get_quest_leaderboard()
+ return JSONResponse(
+ {
+ "leaderboard": leaderboard,
+ }
+ )
+
+
+@router.post("/api/reload")
+async def reload_quest_config_api() -> JSONResponse:
+ """Reload quest configuration from quests.yaml.
+
+ Useful for applying quest changes without restarting.
+ """
+ definitions, quest_settings = load_quest_config()
+ return JSONResponse(
+ {
+ "success": True,
+ "quests_loaded": len(definitions),
+ "settings": quest_settings,
+ }
+ )
+
+
+# ---------------------------------------------------------------------------
+# Dashboard UI Endpoints
+# ---------------------------------------------------------------------------
+
+
+@router.get("", response_class=HTMLResponse)
+async def quests_dashboard(request: Request) -> HTMLResponse:
+ """Main quests dashboard page."""
+ return templates.TemplateResponse(
+ request,
+ "quests.html",
+ {"agent_id": "current_user"},
+ )
+
+
+@router.get("/panel/{agent_id}", response_class=HTMLResponse)
+async def quests_panel(request: Request, agent_id: str) -> HTMLResponse:
+ """Quest panel for HTMX partial updates."""
+ status = get_agent_quests_status(agent_id)
+ return templates.TemplateResponse(
+ request,
+ "partials/quests_panel.html",
+ {
+ "agent_id": agent_id,
+ "quests": status["quests"],
+ "total_tokens": status["total_tokens_earned"],
+ "completed_count": status["total_quests_completed"],
+ },
+ )
+
+
+# ---------------------------------------------------------------------------
+# Internal Functions
+# ---------------------------------------------------------------------------
+
+
+async def _build_evaluation_context(quest) -> dict[str, Any]:
+ """Build evaluation context for a quest based on its type."""
+ context: dict[str, Any] = {}
+
+ if quest.quest_type.value == "issue_count":
+ # Fetch closed issues with relevant labels
+ context["closed_issues"] = await _fetch_closed_issues(
+ quest.criteria.get("issue_labels", [])
+ )
+
+ elif quest.quest_type.value == "issue_reduce":
+ # Fetch current and previous issue counts
+ labels = quest.criteria.get("issue_labels", [])
+ context["current_issue_count"] = await _fetch_open_issue_count(labels)
+ context["previous_issue_count"] = await _fetch_previous_issue_count(
+ labels, quest.criteria.get("lookback_days", 7)
+ )
+
+ elif quest.quest_type.value == "daily_run":
+ # Fetch Daily Run metrics
+ metrics = await _fetch_daily_run_metrics()
+ context["sessions_completed"] = metrics.get("sessions_completed", 0)
+
+ return context
+
+
+async def _fetch_closed_issues(labels: list[str]) -> list[dict]:
+ """Fetch closed issues matching the given labels."""
+ try:
+ from dashboard.routes.daily_run import GiteaClient, _load_config
+
+ config = _load_config()
+ token = _get_gitea_token(config)
+ client = GiteaClient(config, token)
+
+ if not client.is_available():
+ return []
+
+ # Build label filter
+ label_filter = ",".join(labels) if labels else ""
+
+ issues = client.get_paginated(
+ "issues",
+ {"state": "closed", "labels": label_filter, "limit": 100},
+ )
+
+ return issues
+ except Exception as exc:
+ logger.debug("Failed to fetch closed issues: %s", exc)
+ return []
+
+
+async def _fetch_open_issue_count(labels: list[str]) -> int:
+ """Fetch count of open issues with given labels."""
+ try:
+ from dashboard.routes.daily_run import GiteaClient, _load_config
+
+ config = _load_config()
+ token = _get_gitea_token(config)
+ client = GiteaClient(config, token)
+
+ if not client.is_available():
+ return 0
+
+ label_filter = ",".join(labels) if labels else ""
+
+ issues = client.get_paginated(
+ "issues",
+ {"state": "open", "labels": label_filter, "limit": 100},
+ )
+
+ return len(issues)
+ except Exception as exc:
+ logger.debug("Failed to fetch open issue count: %s", exc)
+ return 0
+
+
+async def _fetch_previous_issue_count(labels: list[str], lookback_days: int) -> int:
+ """Fetch previous issue count (simplified - uses current for now)."""
+ # This is a simplified implementation
+ # In production, you'd query historical data
+ return await _fetch_open_issue_count(labels)
+
+
+async def _fetch_daily_run_metrics() -> dict[str, Any]:
+ """Fetch Daily Run metrics."""
+ try:
+ from dashboard.routes.daily_run import _get_metrics
+
+ metrics = _get_metrics(lookback_days=7)
+ if metrics:
+ return {
+ "sessions_completed": metrics.sessions_completed,
+ "sessions_previous": metrics.sessions_previous,
+ }
+ except Exception as exc:
+ logger.debug("Failed to fetch Daily Run metrics: %s", exc)
+
+ return {"sessions_completed": 0, "sessions_previous": 0}
+
+
+def _get_gitea_token(config: dict) -> str | None:
+ """Get Gitea token from config."""
+ if "token" in config:
+ return config["token"]
+
+ from pathlib import Path
+
+ token_file = Path(config.get("token_file", "~/.hermes/gitea_token")).expanduser()
+ if token_file.exists():
+ return token_file.read_text().strip()
+
+ return None
+
+
+# ---------------------------------------------------------------------------
+# Daily Run Integration
+# ---------------------------------------------------------------------------
+
+
+async def check_daily_run_quests(agent_id: str = "system") -> list[dict]:
+ """Check and award Daily Run related quests.
+
+ Called by the Daily Run system when metrics are updated.
+
+ Returns:
+ List of rewards awarded
+ """
+ # Check if auto-detect is enabled
+ _, quest_settings = load_quest_config()
+ if not quest_settings.get("auto_detect_on_daily_run", True):
+ return []
+
+ # Build context from Daily Run metrics
+ metrics = await _fetch_daily_run_metrics()
+ context = {
+ "sessions_completed": metrics.get("sessions_completed", 0),
+ "sessions_previous": metrics.get("sessions_previous", 0),
+ }
+
+ # Add closed issues for issue_count quests
+ active_quests = get_active_quests()
+ for quest in active_quests:
+ if quest.quest_type.value == "issue_count":
+ labels = quest.criteria.get("issue_labels", [])
+ context["closed_issues"] = await _fetch_closed_issues(labels)
+ break # Only need to fetch once
+
+ # Evaluate all quests
+ rewards = auto_evaluate_all_quests(agent_id, context)
+
+ return rewards
diff --git a/src/dashboard/templates/partials/quests_panel.html b/src/dashboard/templates/partials/quests_panel.html
new file mode 100644
index 00000000..5540e92f
--- /dev/null
+++ b/src/dashboard/templates/partials/quests_panel.html
@@ -0,0 +1,80 @@
+{% from "macros.html" import panel %}
+
+
+
+
+
+
{{ total_tokens }}
+
Tokens Earned
+
+
+
+
+
{{ completed_count }}
+
Quests Completed
+
+
+
+
+
{{ quests|selectattr('enabled', 'equalto', true)|list|length }}
+
Active Quests
+
+
+
+
+
+
+ {% for quest in quests %}
+ {% if quest.enabled %}
+
+
+
{{ quest.description }}
+
+
+ {% if quest.status == 'completed' %}
+
+
Completed
+ {% elif quest.status == 'claimed' %}
+
+
Reward Claimed
+ {% elif quest.on_cooldown %}
+
+
+ Cooldown: {{ quest.cooldown_hours_remaining }}h remaining
+
+ {% else %}
+
+
{{ quest.current_value }} / {{ quest.target_value }}
+ {% endif %}
+
+
+
+ {{ quest.type }}
+ {% if quest.repeatable %}
+ ↻ Repeatable
+ {% endif %}
+ {% if quest.completion_count > 0 %}
+ Completed {{ quest.completion_count }} time{% if quest.completion_count != 1 %}s{% endif %}
+ {% endif %}
+
+
+ {% endif %}
+ {% endfor %}
+
+
+{% if not quests|selectattr('enabled', 'equalto', true)|list|length %}
+
+ No active quests available. Check back later or contact an administrator.
+
+{% endif %}
diff --git a/src/dashboard/templates/quests.html b/src/dashboard/templates/quests.html
new file mode 100644
index 00000000..68eb1262
--- /dev/null
+++ b/src/dashboard/templates/quests.html
@@ -0,0 +1,50 @@
+{% extends "base.html" %}
+
+{% block title %}Quests — Mission Control{% endblock %}
+
+{% block content %}
+
+
+
+
Token Quests
+
Complete quests to earn bonus tokens
+
+
+
+
+
+
+
+
+
+
+
+
Loading leaderboard...
+
+
+
+
+
+
+
+
Quests are special objectives that reward tokens upon completion.
+
+ - Complete Daily Run sessions
+ - Close flaky-test issues
+ - Reduce P1 issue backlog
+ - Improve documentation
+
+
+
+
+
+
+{% endblock %}
diff --git a/src/timmy/quest_system.py b/src/timmy/quest_system.py
new file mode 100644
index 00000000..ae7f6c02
--- /dev/null
+++ b/src/timmy/quest_system.py
@@ -0,0 +1,581 @@
+"""Token Quest System for agent rewards.
+
+Provides quest definitions, progress tracking, completion detection,
+and token awards for agent accomplishments.
+
+Quests are defined in config/quests.yaml and loaded at runtime.
+"""
+
+from __future__ import annotations
+
+import logging
+import time
+from dataclasses import dataclass, field
+from datetime import UTC, datetime, timedelta
+from enum import StrEnum
+from pathlib import Path
+from typing import Any
+
+import yaml
+
+from config import settings
+
+logger = logging.getLogger(__name__)
+
+# Path to quest configuration
+QUEST_CONFIG_PATH = Path(settings.repo_root) / "config" / "quests.yaml"
+
+
+class QuestType(StrEnum):
+ """Types of quests supported by the system."""
+
+ ISSUE_COUNT = "issue_count"
+ ISSUE_REDUCE = "issue_reduce"
+ DOCS_UPDATE = "docs_update"
+ TEST_IMPROVE = "test_improve"
+ DAILY_RUN = "daily_run"
+ CUSTOM = "custom"
+
+
+class QuestStatus(StrEnum):
+ """Status of a quest for an agent."""
+
+ NOT_STARTED = "not_started"
+ IN_PROGRESS = "in_progress"
+ COMPLETED = "completed"
+ CLAIMED = "claimed"
+ EXPIRED = "expired"
+
+
+@dataclass
+class QuestDefinition:
+ """Definition of a quest from configuration."""
+
+ id: str
+ name: str
+ description: str
+ reward_tokens: int
+ quest_type: QuestType
+ enabled: bool
+ repeatable: bool
+ cooldown_hours: int
+ criteria: dict[str, Any]
+ notification_message: str
+
+ @classmethod
+ def from_dict(cls, data: dict[str, Any]) -> QuestDefinition:
+ """Create a QuestDefinition from a dictionary."""
+ return cls(
+ id=data["id"],
+ name=data.get("name", "Unnamed Quest"),
+ description=data.get("description", ""),
+ reward_tokens=data.get("reward_tokens", 0),
+ quest_type=QuestType(data.get("type", "custom")),
+ enabled=data.get("enabled", True),
+ repeatable=data.get("repeatable", False),
+ cooldown_hours=data.get("cooldown_hours", 0),
+ criteria=data.get("criteria", {}),
+ notification_message=data.get(
+ "notification_message", "Quest Complete! You earned {tokens} tokens."
+ ),
+ )
+
+
+@dataclass
+class QuestProgress:
+ """Progress of a quest for a specific agent."""
+
+ quest_id: str
+ agent_id: str
+ status: QuestStatus
+ current_value: int = 0
+ target_value: int = 0
+ started_at: str = ""
+ completed_at: str = ""
+ claimed_at: str = ""
+ completion_count: int = 0
+ last_completed_at: str = ""
+ metadata: dict[str, Any] = field(default_factory=dict)
+
+ def to_dict(self) -> dict[str, Any]:
+ """Convert to dictionary for serialization."""
+ return {
+ "quest_id": self.quest_id,
+ "agent_id": self.agent_id,
+ "status": self.status.value,
+ "current_value": self.current_value,
+ "target_value": self.target_value,
+ "started_at": self.started_at,
+ "completed_at": self.completed_at,
+ "claimed_at": self.claimed_at,
+ "completion_count": self.completion_count,
+ "last_completed_at": self.last_completed_at,
+ "metadata": self.metadata,
+ }
+
+
+# In-memory storage for quest progress
+_quest_progress: dict[str, QuestProgress] = {}
+_quest_definitions: dict[str, QuestDefinition] = {}
+_quest_settings: dict[str, Any] = {}
+
+
+def _get_progress_key(quest_id: str, agent_id: str) -> str:
+ """Generate a unique key for quest progress."""
+ return f"{agent_id}:{quest_id}"
+
+
+def load_quest_config() -> tuple[dict[str, QuestDefinition], dict[str, Any]]:
+ """Load quest definitions from quests.yaml.
+
+ Returns:
+ Tuple of (quest definitions dict, settings dict)
+ """
+ global _quest_definitions, _quest_settings
+
+ if not QUEST_CONFIG_PATH.exists():
+ logger.warning("Quest config not found at %s", QUEST_CONFIG_PATH)
+ return {}, {}
+
+ try:
+ raw = QUEST_CONFIG_PATH.read_text()
+ config = yaml.safe_load(raw)
+
+ if not isinstance(config, dict):
+ logger.warning("Invalid quest config format")
+ return {}, {}
+
+ # Load quest definitions
+ quests_data = config.get("quests", {})
+ definitions = {}
+ for quest_id, quest_data in quests_data.items():
+ quest_data["id"] = quest_id
+ try:
+ definition = QuestDefinition.from_dict(quest_data)
+ definitions[quest_id] = definition
+ except (ValueError, KeyError) as exc:
+ logger.warning("Failed to load quest %s: %s", quest_id, exc)
+
+ # Load settings
+ _quest_settings = config.get("settings", {})
+ _quest_definitions = definitions
+
+ logger.debug("Loaded %d quest definitions", len(definitions))
+ return definitions, _quest_settings
+
+ except (OSError, yaml.YAMLError) as exc:
+ logger.warning("Failed to load quest config: %s", exc)
+ return {}, {}
+
+
+def get_quest_definitions() -> dict[str, QuestDefinition]:
+ """Get all quest definitions, loading if necessary."""
+ global _quest_definitions
+ if not _quest_definitions:
+ _quest_definitions, _ = load_quest_config()
+ return _quest_definitions
+
+
+def get_quest_definition(quest_id: str) -> QuestDefinition | None:
+ """Get a specific quest definition by ID."""
+ definitions = get_quest_definitions()
+ return definitions.get(quest_id)
+
+
+def get_active_quests() -> list[QuestDefinition]:
+ """Get all enabled quest definitions."""
+ definitions = get_quest_definitions()
+ return [q for q in definitions.values() if q.enabled]
+
+
+def get_quest_progress(quest_id: str, agent_id: str) -> QuestProgress | None:
+ """Get progress for a specific quest and agent."""
+ key = _get_progress_key(quest_id, agent_id)
+ return _quest_progress.get(key)
+
+
+def get_or_create_progress(quest_id: str, agent_id: str) -> QuestProgress:
+ """Get existing progress or create new for quest/agent."""
+ key = _get_progress_key(quest_id, agent_id)
+ if key not in _quest_progress:
+ quest = get_quest_definition(quest_id)
+ if not quest:
+ raise ValueError(f"Quest {quest_id} not found")
+
+ target = _get_target_value(quest)
+ _quest_progress[key] = QuestProgress(
+ quest_id=quest_id,
+ agent_id=agent_id,
+ status=QuestStatus.NOT_STARTED,
+ current_value=0,
+ target_value=target,
+ started_at=datetime.now(UTC).isoformat(),
+ )
+ return _quest_progress[key]
+
+
+def _get_target_value(quest: QuestDefinition) -> int:
+ """Extract target value from quest criteria."""
+ criteria = quest.criteria
+ if quest.quest_type == QuestType.ISSUE_COUNT:
+ return criteria.get("target_count", 1)
+ elif quest.quest_type == QuestType.ISSUE_REDUCE:
+ return criteria.get("target_reduction", 1)
+ elif quest.quest_type == QuestType.DAILY_RUN:
+ return criteria.get("min_sessions", 1)
+ elif quest.quest_type == QuestType.DOCS_UPDATE:
+ return criteria.get("min_files_changed", 1)
+ elif quest.quest_type == QuestType.TEST_IMPROVE:
+ return criteria.get("min_new_tests", 1)
+ return 1
+
+
+def update_quest_progress(
+ quest_id: str,
+ agent_id: str,
+ current_value: int,
+ metadata: dict[str, Any] | None = None,
+) -> QuestProgress:
+ """Update progress for a quest."""
+ progress = get_or_create_progress(quest_id, agent_id)
+ progress.current_value = current_value
+
+ if metadata:
+ progress.metadata.update(metadata)
+
+ # Check if quest is now complete
+ if progress.current_value >= progress.target_value:
+ if progress.status not in (QuestStatus.COMPLETED, QuestStatus.CLAIMED):
+ progress.status = QuestStatus.COMPLETED
+ progress.completed_at = datetime.now(UTC).isoformat()
+ logger.info("Quest %s completed for agent %s", quest_id, agent_id)
+
+ return progress
+
+
+def _is_on_cooldown(progress: QuestProgress, quest: QuestDefinition) -> bool:
+ """Check if a repeatable quest is on cooldown."""
+ if not quest.repeatable or not progress.last_completed_at:
+ return False
+
+ if quest.cooldown_hours <= 0:
+ return False
+
+ try:
+ last_completed = datetime.fromisoformat(progress.last_completed_at)
+ cooldown_end = last_completed + timedelta(hours=quest.cooldown_hours)
+ return datetime.now(UTC) < cooldown_end
+ except (ValueError, TypeError):
+ return False
+
+
+def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
+ """Claim the token reward for a completed quest.
+
+ Returns:
+ Reward info dict if successful, None if not claimable
+ """
+ progress = get_quest_progress(quest_id, agent_id)
+ if not progress:
+ return None
+
+ quest = get_quest_definition(quest_id)
+ if not quest:
+ return None
+
+ # Check if quest is completed but not yet claimed
+ if progress.status != QuestStatus.COMPLETED:
+ return None
+
+ # Check cooldown for repeatable quests
+ if _is_on_cooldown(progress, quest):
+ return None
+
+ try:
+ # Award tokens via ledger
+ from lightning.ledger import create_invoice_entry, mark_settled
+
+ # Create a mock invoice for the reward
+ invoice_entry = create_invoice_entry(
+ payment_hash=f"quest_{quest_id}_{agent_id}_{int(time.time())}",
+ amount_sats=quest.reward_tokens,
+ memo=f"Quest reward: {quest.name}",
+ source="quest_reward",
+ agent_id=agent_id,
+ )
+
+ # Mark as settled immediately (quest rewards are auto-settled)
+ mark_settled(invoice_entry.payment_hash, preimage=f"quest_{quest_id}")
+
+ # Update progress
+ progress.status = QuestStatus.CLAIMED
+ progress.claimed_at = datetime.now(UTC).isoformat()
+ progress.completion_count += 1
+ progress.last_completed_at = progress.claimed_at
+
+ # Reset for repeatable quests
+ if quest.repeatable:
+ progress.status = QuestStatus.NOT_STARTED
+ progress.current_value = 0
+ progress.completed_at = ""
+ progress.claimed_at = ""
+
+ notification = quest.notification_message.format(tokens=quest.reward_tokens)
+
+ return {
+ "quest_id": quest_id,
+ "agent_id": agent_id,
+ "tokens_awarded": quest.reward_tokens,
+ "notification": notification,
+ "completion_count": progress.completion_count,
+ }
+
+ except Exception as exc:
+ logger.error("Failed to award quest reward: %s", exc)
+ return None
+
+
+def check_issue_count_quest(
+ quest: QuestDefinition,
+ agent_id: str,
+ closed_issues: list[dict],
+) -> QuestProgress | None:
+ """Check progress for issue_count type quest."""
+ criteria = quest.criteria
+ target_labels = set(criteria.get("issue_labels", []))
+ # target_count is available in criteria but not used directly here
+
+ # Count matching issues
+ matching_count = 0
+ for issue in closed_issues:
+ issue_labels = {label.get("name", "") for label in issue.get("labels", [])}
+ if target_labels.issubset(issue_labels) or (not target_labels and issue_labels):
+ matching_count += 1
+
+ progress = update_quest_progress(
+ quest.id, agent_id, matching_count, {"matching_issues": matching_count}
+ )
+
+ return progress
+
+
+def check_issue_reduce_quest(
+ quest: QuestDefinition,
+ agent_id: str,
+ previous_count: int,
+ current_count: int,
+) -> QuestProgress | None:
+ """Check progress for issue_reduce type quest."""
+ # target_reduction available in quest.criteria but we track actual reduction
+ reduction = max(0, previous_count - current_count)
+
+ progress = update_quest_progress(quest.id, agent_id, reduction, {"reduction": reduction})
+
+ return progress
+
+
+def check_daily_run_quest(
+ quest: QuestDefinition,
+ agent_id: str,
+ sessions_completed: int,
+) -> QuestProgress | None:
+ """Check progress for daily_run type quest."""
+ # min_sessions available in quest.criteria but we track actual sessions
+ progress = update_quest_progress(
+ quest.id, agent_id, sessions_completed, {"sessions": sessions_completed}
+ )
+
+ return progress
+
+
+def evaluate_quest_progress(
+ quest_id: str,
+ agent_id: str,
+ context: dict[str, Any],
+) -> QuestProgress | None:
+ """Evaluate quest progress based on quest type and context.
+
+ Args:
+ quest_id: The quest to evaluate
+ agent_id: The agent to evaluate for
+ context: Context data for evaluation (issues, metrics, etc.)
+
+ Returns:
+ Updated QuestProgress or None if evaluation failed
+ """
+ quest = get_quest_definition(quest_id)
+ if not quest or not quest.enabled:
+ return None
+
+ progress = get_quest_progress(quest_id, agent_id)
+
+ # Check cooldown for repeatable quests
+ if progress and _is_on_cooldown(progress, quest):
+ return progress
+
+ try:
+ if quest.quest_type == QuestType.ISSUE_COUNT:
+ closed_issues = context.get("closed_issues", [])
+ return check_issue_count_quest(quest, agent_id, closed_issues)
+
+ elif quest.quest_type == QuestType.ISSUE_REDUCE:
+ prev_count = context.get("previous_issue_count", 0)
+ curr_count = context.get("current_issue_count", 0)
+ return check_issue_reduce_quest(quest, agent_id, prev_count, curr_count)
+
+ elif quest.quest_type == QuestType.DAILY_RUN:
+ sessions = context.get("sessions_completed", 0)
+ return check_daily_run_quest(quest, agent_id, sessions)
+
+ elif quest.quest_type == QuestType.CUSTOM:
+ # Custom quests require manual completion
+ return progress
+
+ else:
+ logger.debug("Quest type %s not yet implemented", quest.quest_type)
+ return progress
+
+ except Exception as exc:
+ logger.warning("Quest evaluation failed for %s: %s", quest_id, exc)
+ return progress
+
+
+def auto_evaluate_all_quests(agent_id: str, context: dict[str, Any]) -> list[dict]:
+ """Evaluate all active quests for an agent and award rewards.
+
+ Returns:
+ List of reward info for newly completed quests
+ """
+ rewards = []
+ active_quests = get_active_quests()
+
+ for quest in active_quests:
+ progress = evaluate_quest_progress(quest.id, agent_id, context)
+ if progress and progress.status == QuestStatus.COMPLETED:
+ # Auto-claim the reward
+ reward = claim_quest_reward(quest.id, agent_id)
+ if reward:
+ rewards.append(reward)
+
+ return rewards
+
+
+def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
+ """Get complete quest status for an agent."""
+ definitions = get_quest_definitions()
+ quests_status = []
+ total_rewards = 0
+ completed_count = 0
+
+ for quest_id, quest in definitions.items():
+ progress = get_quest_progress(quest_id, agent_id)
+ if not progress:
+ progress = get_or_create_progress(quest_id, agent_id)
+
+ is_on_cooldown = _is_on_cooldown(progress, quest) if quest.repeatable else False
+
+ quest_info = {
+ "quest_id": quest_id,
+ "name": quest.name,
+ "description": quest.description,
+ "reward_tokens": quest.reward_tokens,
+ "type": quest.quest_type.value,
+ "enabled": quest.enabled,
+ "repeatable": quest.repeatable,
+ "status": progress.status.value,
+ "current_value": progress.current_value,
+ "target_value": progress.target_value,
+ "completion_count": progress.completion_count,
+ "on_cooldown": is_on_cooldown,
+ "cooldown_hours_remaining": 0,
+ }
+
+ if is_on_cooldown and progress.last_completed_at:
+ try:
+ last = datetime.fromisoformat(progress.last_completed_at)
+ cooldown_end = last + timedelta(hours=quest.cooldown_hours)
+ hours_remaining = (cooldown_end - datetime.now(UTC)).total_seconds() / 3600
+ quest_info["cooldown_hours_remaining"] = round(max(0, hours_remaining), 1)
+ except (ValueError, TypeError):
+ pass
+
+ quests_status.append(quest_info)
+ total_rewards += progress.completion_count * quest.reward_tokens
+ completed_count += progress.completion_count
+
+ return {
+ "agent_id": agent_id,
+ "quests": quests_status,
+ "total_tokens_earned": total_rewards,
+ "total_quests_completed": completed_count,
+ "active_quests_count": len([q for q in quests_status if q["enabled"]]),
+ }
+
+
+def reset_quest_progress(quest_id: str | None = None, agent_id: str | None = None) -> int:
+ """Reset quest progress. Useful for testing.
+
+ Args:
+ quest_id: Specific quest to reset, or None for all
+ agent_id: Specific agent to reset, or None for all
+
+ Returns:
+ Number of progress entries reset
+ """
+ global _quest_progress
+ count = 0
+
+ keys_to_reset = []
+ for key, _progress in _quest_progress.items():
+ key_agent, key_quest = key.split(":", 1)
+ if (quest_id is None or key_quest == quest_id) and (
+ agent_id is None or key_agent == agent_id
+ ):
+ keys_to_reset.append(key)
+
+ for key in keys_to_reset:
+ del _quest_progress[key]
+ count += 1
+
+ return count
+
+
+def get_quest_leaderboard() -> list[dict[str, Any]]:
+ """Get a leaderboard of agents by quest completion."""
+ agent_stats: dict[str, dict[str, Any]] = {}
+
+ for _key, progress in _quest_progress.items():
+ agent_id = progress.agent_id
+ if agent_id not in agent_stats:
+ agent_stats[agent_id] = {
+ "agent_id": agent_id,
+ "total_completions": 0,
+ "total_tokens": 0,
+ "quests_completed": set(),
+ }
+
+ quest = get_quest_definition(progress.quest_id)
+ if quest:
+ agent_stats[agent_id]["total_completions"] += progress.completion_count
+ agent_stats[agent_id]["total_tokens"] += progress.completion_count * quest.reward_tokens
+ if progress.completion_count > 0:
+ agent_stats[agent_id]["quests_completed"].add(quest.id)
+
+ leaderboard = []
+ for stats in agent_stats.values():
+ leaderboard.append(
+ {
+ "agent_id": stats["agent_id"],
+ "total_completions": stats["total_completions"],
+ "total_tokens": stats["total_tokens"],
+ "unique_quests_completed": len(stats["quests_completed"]),
+ }
+ )
+
+ # Sort by total tokens (descending)
+ leaderboard.sort(key=lambda x: x["total_tokens"], reverse=True)
+ return leaderboard
+
+
+# Initialize on module load
+load_quest_config()
diff --git a/tests/unit/test_quest_system.py b/tests/unit/test_quest_system.py
new file mode 100644
index 00000000..62980f81
--- /dev/null
+++ b/tests/unit/test_quest_system.py
@@ -0,0 +1,489 @@
+"""Unit tests for the quest system.
+
+Tests quest definitions, progress tracking, completion detection,
+and token rewards.
+"""
+
+from __future__ import annotations
+
+import pytest
+
+from timmy.quest_system import (
+ QuestDefinition,
+ QuestProgress,
+ QuestStatus,
+ QuestType,
+ _is_on_cooldown,
+ claim_quest_reward,
+ evaluate_quest_progress,
+ get_or_create_progress,
+ get_quest_definition,
+ get_quest_leaderboard,
+ load_quest_config,
+ reset_quest_progress,
+ update_quest_progress,
+)
+
+
+@pytest.fixture(autouse=True)
+def clean_quest_state():
+ """Reset quest progress between tests."""
+ reset_quest_progress()
+ yield
+ reset_quest_progress()
+
+
+@pytest.fixture
+def sample_issue_count_quest():
+ """Create a sample issue_count quest definition."""
+ return QuestDefinition(
+ id="test_close_issues",
+ name="Test Issue Closer",
+ description="Close 3 test issues",
+ reward_tokens=100,
+ quest_type=QuestType.ISSUE_COUNT,
+ enabled=True,
+ repeatable=False,
+ cooldown_hours=0,
+ criteria={"target_count": 3, "issue_labels": ["test"]},
+ notification_message="Test quest complete! Earned {tokens} tokens.",
+ )
+
+
+@pytest.fixture
+def sample_daily_run_quest():
+ """Create a sample daily_run quest definition."""
+ return QuestDefinition(
+ id="test_daily_run",
+ name="Test Daily Runner",
+ description="Complete 5 sessions",
+ reward_tokens=250,
+ quest_type=QuestType.DAILY_RUN,
+ enabled=True,
+ repeatable=True,
+ cooldown_hours=24,
+ criteria={"min_sessions": 5},
+ notification_message="Daily run quest complete! Earned {tokens} tokens.",
+ )
+
+
+# ── Quest Definition Tests ───────────────────────────────────────────────
+
+
+class TestQuestDefinition:
+ def test_from_dict_minimal(self):
+ data = {"id": "test_quest", "name": "Test Quest"}
+ quest = QuestDefinition.from_dict(data)
+ assert quest.id == "test_quest"
+ assert quest.name == "Test Quest"
+ assert quest.quest_type == QuestType.CUSTOM
+ assert quest.enabled is True
+
+ def test_from_dict_full(self):
+ data = {
+ "id": "full_quest",
+ "name": "Full Quest",
+ "description": "A test quest",
+ "reward_tokens": 500,
+ "type": "issue_count",
+ "enabled": False,
+ "repeatable": True,
+ "cooldown_hours": 12,
+ "criteria": {"target_count": 5},
+ "notification_message": "Done!",
+ }
+ quest = QuestDefinition.from_dict(data)
+ assert quest.id == "full_quest"
+ assert quest.reward_tokens == 500
+ assert quest.quest_type == QuestType.ISSUE_COUNT
+ assert quest.enabled is False
+ assert quest.repeatable is True
+ assert quest.cooldown_hours == 12
+
+
+# ── Quest Progress Tests ─────────────────────────────────────────────────
+
+
+class TestQuestProgress:
+ def test_progress_creation(self):
+ progress = QuestProgress(
+ quest_id="test_quest",
+ agent_id="test_agent",
+ status=QuestStatus.NOT_STARTED,
+ )
+ assert progress.quest_id == "test_quest"
+ assert progress.agent_id == "test_agent"
+ assert progress.current_value == 0
+
+ def test_progress_to_dict(self):
+ progress = QuestProgress(
+ quest_id="test_quest",
+ agent_id="test_agent",
+ status=QuestStatus.IN_PROGRESS,
+ current_value=2,
+ target_value=5,
+ )
+ data = progress.to_dict()
+ assert data["quest_id"] == "test_quest"
+ assert data["status"] == "in_progress"
+ assert data["current_value"] == 2
+
+
+# ── Quest Loading Tests ──────────────────────────────────────────────────
+
+
+class TestQuestLoading:
+ def test_load_quest_config(self):
+ definitions, settings = load_quest_config()
+ assert isinstance(definitions, dict)
+ assert isinstance(settings, dict)
+
+ def test_get_quest_definition_exists(self):
+ # Should return None for non-existent quest in fresh state
+ quest = get_quest_definition("nonexistent")
+ # The function returns from loaded config, which may have quests
+ # or be empty if config doesn't exist
+ assert quest is None or isinstance(quest, QuestDefinition)
+
+ def test_get_quest_definition_not_found(self):
+ quest = get_quest_definition("definitely_not_a_real_quest_12345")
+ assert quest is None
+
+
+# ── Quest Progress Management Tests ─────────────────────────────────────
+
+
+class TestQuestProgressManagement:
+ def test_get_or_create_progress_new(self):
+ # First create a quest definition
+ quest = QuestDefinition(
+ id="progress_test",
+ name="Progress Test",
+ description="Test quest",
+ reward_tokens=100,
+ quest_type=QuestType.ISSUE_COUNT,
+ enabled=True,
+ repeatable=False,
+ cooldown_hours=0,
+ criteria={"target_count": 3},
+ notification_message="Done!",
+ )
+
+ # Need to inject into the definitions dict
+ from timmy.quest_system import _quest_definitions
+
+ _quest_definitions["progress_test"] = quest
+
+ progress = get_or_create_progress("progress_test", "agent1")
+ assert progress.quest_id == "progress_test"
+ assert progress.agent_id == "agent1"
+ assert progress.status == QuestStatus.NOT_STARTED
+ assert progress.target_value == 3
+
+ del _quest_definitions["progress_test"]
+
+ def test_update_quest_progress(self):
+ quest = QuestDefinition(
+ id="update_test",
+ name="Update Test",
+ description="Test quest",
+ reward_tokens=100,
+ quest_type=QuestType.ISSUE_COUNT,
+ enabled=True,
+ repeatable=False,
+ cooldown_hours=0,
+ criteria={"target_count": 3},
+ notification_message="Done!",
+ )
+
+ from timmy.quest_system import _quest_definitions
+
+ _quest_definitions["update_test"] = quest
+
+ # Create initial progress
+ progress = get_or_create_progress("update_test", "agent1")
+ assert progress.current_value == 0
+
+ # Update progress
+ updated = update_quest_progress("update_test", "agent1", 2)
+ assert updated.current_value == 2
+ assert updated.status == QuestStatus.NOT_STARTED
+
+ # Complete the quest
+ completed = update_quest_progress("update_test", "agent1", 3)
+ assert completed.current_value == 3
+ assert completed.status == QuestStatus.COMPLETED
+ assert completed.completed_at != ""
+
+ del _quest_definitions["update_test"]
+
+
+# ── Quest Evaluation Tests ───────────────────────────────────────────────
+
+
+class TestQuestEvaluation:
+ def test_evaluate_issue_count_quest(self):
+ quest = QuestDefinition(
+ id="eval_test",
+ name="Eval Test",
+ description="Test quest",
+ reward_tokens=100,
+ quest_type=QuestType.ISSUE_COUNT,
+ enabled=True,
+ repeatable=False,
+ cooldown_hours=0,
+ criteria={"target_count": 2, "issue_labels": ["test"]},
+ notification_message="Done!",
+ )
+
+ from timmy.quest_system import _quest_definitions
+
+ _quest_definitions["eval_test"] = quest
+
+ # Simulate closed issues
+ closed_issues = [
+ {"id": 1, "labels": [{"name": "test"}]},
+ {"id": 2, "labels": [{"name": "test"}, {"name": "bug"}]},
+ {"id": 3, "labels": [{"name": "other"}]},
+ ]
+
+ context = {"closed_issues": closed_issues}
+ progress = evaluate_quest_progress("eval_test", "agent1", context)
+
+ assert progress is not None
+ assert progress.current_value == 2 # Two issues with 'test' label
+
+ del _quest_definitions["eval_test"]
+
+ def test_evaluate_issue_reduce_quest(self):
+ quest = QuestDefinition(
+ id="reduce_test",
+ name="Reduce Test",
+ description="Test quest",
+ reward_tokens=200,
+ quest_type=QuestType.ISSUE_REDUCE,
+ enabled=True,
+ repeatable=False,
+ cooldown_hours=0,
+ criteria={"target_reduction": 2},
+ notification_message="Done!",
+ )
+
+ from timmy.quest_system import _quest_definitions
+
+ _quest_definitions["reduce_test"] = quest
+
+ context = {"previous_issue_count": 10, "current_issue_count": 7}
+ progress = evaluate_quest_progress("reduce_test", "agent1", context)
+
+ assert progress is not None
+ assert progress.current_value == 3 # Reduced by 3
+
+ del _quest_definitions["reduce_test"]
+
+ def test_evaluate_daily_run_quest(self):
+ quest = QuestDefinition(
+ id="daily_test",
+ name="Daily Test",
+ description="Test quest",
+ reward_tokens=250,
+ quest_type=QuestType.DAILY_RUN,
+ enabled=True,
+ repeatable=True,
+ cooldown_hours=24,
+ criteria={"min_sessions": 5},
+ notification_message="Done!",
+ )
+
+ from timmy.quest_system import _quest_definitions
+
+ _quest_definitions["daily_test"] = quest
+
+ context = {"sessions_completed": 5}
+ progress = evaluate_quest_progress("daily_test", "agent1", context)
+
+ assert progress is not None
+ assert progress.current_value == 5
+ assert progress.status == QuestStatus.COMPLETED
+
+ del _quest_definitions["daily_test"]
+
+
+# ── Quest Cooldown Tests ─────────────────────────────────────────────────
+
+
+class TestQuestCooldown:
+ def test_is_on_cooldown_no_cooldown(self):
+ quest = QuestDefinition(
+ id="cooldown_test",
+ name="Cooldown Test",
+ description="Test quest",
+ reward_tokens=100,
+ quest_type=QuestType.ISSUE_COUNT,
+ enabled=True,
+ repeatable=True,
+ cooldown_hours=24,
+ criteria={},
+ notification_message="Done!",
+ )
+
+ progress = QuestProgress(
+ quest_id="cooldown_test",
+ agent_id="agent1",
+ status=QuestStatus.CLAIMED,
+ )
+
+ # No last_completed_at means no cooldown
+ assert _is_on_cooldown(progress, quest) is False
+
+
+# ── Quest Reward Tests ───────────────────────────────────────────────────
+
+
+class TestQuestReward:
+ def test_claim_quest_reward_not_completed(self):
+ quest = QuestDefinition(
+ id="reward_test",
+ name="Reward Test",
+ description="Test quest",
+ reward_tokens=100,
+ quest_type=QuestType.ISSUE_COUNT,
+ enabled=True,
+ repeatable=False,
+ cooldown_hours=0,
+ criteria={"target_count": 3},
+ notification_message="Done!",
+ )
+
+ from timmy.quest_system import _quest_definitions, _quest_progress
+
+ _quest_definitions["reward_test"] = quest
+
+ # Create progress but don't complete
+ progress = get_or_create_progress("reward_test", "agent1")
+ _quest_progress["agent1:reward_test"] = progress
+
+ # Try to claim - should fail
+ reward = claim_quest_reward("reward_test", "agent1")
+ assert reward is None
+
+ del _quest_definitions["reward_test"]
+
+
+# ── Leaderboard Tests ────────────────────────────────────────────────────
+
+
+class TestQuestLeaderboard:
+ def test_get_quest_leaderboard_empty(self):
+ reset_quest_progress()
+ leaderboard = get_quest_leaderboard()
+ assert leaderboard == []
+
+ def test_get_quest_leaderboard_with_data(self):
+ # Create and complete a quest for two agents
+ quest = QuestDefinition(
+ id="leaderboard_test",
+ name="Leaderboard Test",
+ description="Test quest",
+ reward_tokens=100,
+ quest_type=QuestType.ISSUE_COUNT,
+ enabled=True,
+ repeatable=True,
+ cooldown_hours=0,
+ criteria={"target_count": 1},
+ notification_message="Done!",
+ )
+
+ from timmy.quest_system import _quest_definitions, _quest_progress
+
+ _quest_definitions["leaderboard_test"] = quest
+
+ # Create progress for agent1 with 2 completions
+ progress1 = QuestProgress(
+ quest_id="leaderboard_test",
+ agent_id="agent1",
+ status=QuestStatus.NOT_STARTED,
+ completion_count=2,
+ )
+ _quest_progress["agent1:leaderboard_test"] = progress1
+
+ # Create progress for agent2 with 1 completion
+ progress2 = QuestProgress(
+ quest_id="leaderboard_test",
+ agent_id="agent2",
+ status=QuestStatus.NOT_STARTED,
+ completion_count=1,
+ )
+ _quest_progress["agent2:leaderboard_test"] = progress2
+
+ leaderboard = get_quest_leaderboard()
+
+ assert len(leaderboard) == 2
+ # agent1 should be first (more tokens)
+ assert leaderboard[0]["agent_id"] == "agent1"
+ assert leaderboard[0]["total_tokens"] == 200
+ assert leaderboard[1]["agent_id"] == "agent2"
+ assert leaderboard[1]["total_tokens"] == 100
+
+ del _quest_definitions["leaderboard_test"]
+
+
+# ── Quest Reset Tests ─────────────────────────────────────────────────────
+
+
+class TestQuestReset:
+ def test_reset_quest_progress_all(self):
+ # Create some progress entries
+ progress1 = QuestProgress(
+ quest_id="quest1", agent_id="agent1", status=QuestStatus.NOT_STARTED
+ )
+ progress2 = QuestProgress(
+ quest_id="quest2", agent_id="agent2", status=QuestStatus.NOT_STARTED
+ )
+
+ from timmy.quest_system import _quest_progress
+
+ _quest_progress["agent1:quest1"] = progress1
+ _quest_progress["agent2:quest2"] = progress2
+
+ assert len(_quest_progress) == 2
+
+ count = reset_quest_progress()
+ assert count == 2
+ assert len(_quest_progress) == 0
+
+ def test_reset_quest_progress_specific_quest(self):
+ progress1 = QuestProgress(
+ quest_id="quest1", agent_id="agent1", status=QuestStatus.NOT_STARTED
+ )
+ progress2 = QuestProgress(
+ quest_id="quest2", agent_id="agent1", status=QuestStatus.NOT_STARTED
+ )
+
+ from timmy.quest_system import _quest_progress
+
+ _quest_progress["agent1:quest1"] = progress1
+ _quest_progress["agent1:quest2"] = progress2
+
+ count = reset_quest_progress(quest_id="quest1")
+ assert count == 1
+ assert "agent1:quest1" not in _quest_progress
+ assert "agent1:quest2" in _quest_progress
+
+ def test_reset_quest_progress_specific_agent(self):
+ progress1 = QuestProgress(
+ quest_id="quest1", agent_id="agent1", status=QuestStatus.NOT_STARTED
+ )
+ progress2 = QuestProgress(
+ quest_id="quest1", agent_id="agent2", status=QuestStatus.NOT_STARTED
+ )
+
+ from timmy.quest_system import _quest_progress
+
+ _quest_progress["agent1:quest1"] = progress1
+ _quest_progress["agent2:quest1"] = progress2
+
+ count = reset_quest_progress(agent_id="agent1")
+ assert count == 1
+ assert "agent1:quest1" not in _quest_progress
+ assert "agent2:quest1" in _quest_progress