"""Token Quest System for agent rewards. Provides quest definitions, progress tracking, completion detection, and token awards for agent accomplishments. Quests are defined in config/quests.yaml and loaded at runtime. """ from __future__ import annotations import logging import time from dataclasses import dataclass, field from datetime import UTC, datetime, timedelta from enum import StrEnum from pathlib import Path from typing import Any import yaml from config import settings logger = logging.getLogger(__name__) # Path to quest configuration QUEST_CONFIG_PATH = Path(settings.repo_root) / "config" / "quests.yaml" class QuestType(StrEnum): """Types of quests supported by the system.""" ISSUE_COUNT = "issue_count" ISSUE_REDUCE = "issue_reduce" DOCS_UPDATE = "docs_update" TEST_IMPROVE = "test_improve" DAILY_RUN = "daily_run" CUSTOM = "custom" class QuestStatus(StrEnum): """Status of a quest for an agent.""" NOT_STARTED = "not_started" IN_PROGRESS = "in_progress" COMPLETED = "completed" CLAIMED = "claimed" EXPIRED = "expired" @dataclass class QuestDefinition: """Definition of a quest from configuration.""" id: str name: str description: str reward_tokens: int quest_type: QuestType enabled: bool repeatable: bool cooldown_hours: int criteria: dict[str, Any] notification_message: str @classmethod def from_dict(cls, data: dict[str, Any]) -> QuestDefinition: """Create a QuestDefinition from a dictionary.""" return cls( id=data["id"], name=data.get("name", "Unnamed Quest"), description=data.get("description", ""), reward_tokens=data.get("reward_tokens", 0), quest_type=QuestType(data.get("type", "custom")), enabled=data.get("enabled", True), repeatable=data.get("repeatable", False), cooldown_hours=data.get("cooldown_hours", 0), criteria=data.get("criteria", {}), notification_message=data.get( "notification_message", "Quest Complete! You earned {tokens} tokens." ), ) @dataclass class QuestProgress: """Progress of a quest for a specific agent.""" quest_id: str agent_id: str status: QuestStatus current_value: int = 0 target_value: int = 0 started_at: str = "" completed_at: str = "" claimed_at: str = "" completion_count: int = 0 last_completed_at: str = "" metadata: dict[str, Any] = field(default_factory=dict) def to_dict(self) -> dict[str, Any]: """Convert to dictionary for serialization.""" return { "quest_id": self.quest_id, "agent_id": self.agent_id, "status": self.status.value, "current_value": self.current_value, "target_value": self.target_value, "started_at": self.started_at, "completed_at": self.completed_at, "claimed_at": self.claimed_at, "completion_count": self.completion_count, "last_completed_at": self.last_completed_at, "metadata": self.metadata, } # In-memory storage for quest progress _quest_progress: dict[str, QuestProgress] = {} _quest_definitions: dict[str, QuestDefinition] = {} _quest_settings: dict[str, Any] = {} def _get_progress_key(quest_id: str, agent_id: str) -> str: """Generate a unique key for quest progress.""" return f"{agent_id}:{quest_id}" def load_quest_config() -> tuple[dict[str, QuestDefinition], dict[str, Any]]: """Load quest definitions from quests.yaml. Returns: Tuple of (quest definitions dict, settings dict) """ global _quest_definitions, _quest_settings if not QUEST_CONFIG_PATH.exists(): logger.warning("Quest config not found at %s", QUEST_CONFIG_PATH) return {}, {} try: raw = QUEST_CONFIG_PATH.read_text() config = yaml.safe_load(raw) if not isinstance(config, dict): logger.warning("Invalid quest config format") return {}, {} # Load quest definitions quests_data = config.get("quests", {}) definitions = {} for quest_id, quest_data in quests_data.items(): quest_data["id"] = quest_id try: definition = QuestDefinition.from_dict(quest_data) definitions[quest_id] = definition except (ValueError, KeyError) as exc: logger.warning("Failed to load quest %s: %s", quest_id, exc) # Load settings _quest_settings = config.get("settings", {}) _quest_definitions = definitions logger.debug("Loaded %d quest definitions", len(definitions)) return definitions, _quest_settings except (OSError, yaml.YAMLError) as exc: logger.warning("Failed to load quest config: %s", exc) return {}, {} def get_quest_definitions() -> dict[str, QuestDefinition]: """Get all quest definitions, loading if necessary.""" global _quest_definitions if not _quest_definitions: _quest_definitions, _ = load_quest_config() return _quest_definitions def get_quest_definition(quest_id: str) -> QuestDefinition | None: """Get a specific quest definition by ID.""" definitions = get_quest_definitions() return definitions.get(quest_id) def get_active_quests() -> list[QuestDefinition]: """Get all enabled quest definitions.""" definitions = get_quest_definitions() return [q for q in definitions.values() if q.enabled] def get_quest_progress(quest_id: str, agent_id: str) -> QuestProgress | None: """Get progress for a specific quest and agent.""" key = _get_progress_key(quest_id, agent_id) return _quest_progress.get(key) def get_or_create_progress(quest_id: str, agent_id: str) -> QuestProgress: """Get existing progress or create new for quest/agent.""" key = _get_progress_key(quest_id, agent_id) if key not in _quest_progress: quest = get_quest_definition(quest_id) if not quest: raise ValueError(f"Quest {quest_id} not found") target = _get_target_value(quest) _quest_progress[key] = QuestProgress( quest_id=quest_id, agent_id=agent_id, status=QuestStatus.NOT_STARTED, current_value=0, target_value=target, started_at=datetime.now(UTC).isoformat(), ) return _quest_progress[key] def _get_target_value(quest: QuestDefinition) -> int: """Extract target value from quest criteria.""" criteria = quest.criteria if quest.quest_type == QuestType.ISSUE_COUNT: return criteria.get("target_count", 1) elif quest.quest_type == QuestType.ISSUE_REDUCE: return criteria.get("target_reduction", 1) elif quest.quest_type == QuestType.DAILY_RUN: return criteria.get("min_sessions", 1) elif quest.quest_type == QuestType.DOCS_UPDATE: return criteria.get("min_files_changed", 1) elif quest.quest_type == QuestType.TEST_IMPROVE: return criteria.get("min_new_tests", 1) return 1 def update_quest_progress( quest_id: str, agent_id: str, current_value: int, metadata: dict[str, Any] | None = None, ) -> QuestProgress: """Update progress for a quest.""" progress = get_or_create_progress(quest_id, agent_id) progress.current_value = current_value if metadata: progress.metadata.update(metadata) # Check if quest is now complete if progress.current_value >= progress.target_value: if progress.status not in (QuestStatus.COMPLETED, QuestStatus.CLAIMED): progress.status = QuestStatus.COMPLETED progress.completed_at = datetime.now(UTC).isoformat() logger.info("Quest %s completed for agent %s", quest_id, agent_id) return progress def _is_on_cooldown(progress: QuestProgress, quest: QuestDefinition) -> bool: """Check if a repeatable quest is on cooldown.""" if not quest.repeatable or not progress.last_completed_at: return False if quest.cooldown_hours <= 0: return False try: last_completed = datetime.fromisoformat(progress.last_completed_at) cooldown_end = last_completed + timedelta(hours=quest.cooldown_hours) return datetime.now(UTC) < cooldown_end except (ValueError, TypeError): return False def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None: """Claim the token reward for a completed quest. Returns: Reward info dict if successful, None if not claimable """ progress = get_quest_progress(quest_id, agent_id) if not progress: return None quest = get_quest_definition(quest_id) if not quest: return None # Check if quest is completed but not yet claimed if progress.status != QuestStatus.COMPLETED: return None # Check cooldown for repeatable quests if _is_on_cooldown(progress, quest): return None try: # Award tokens via ledger from lightning.ledger import create_invoice_entry, mark_settled # Create a mock invoice for the reward invoice_entry = create_invoice_entry( payment_hash=f"quest_{quest_id}_{agent_id}_{int(time.time())}", amount_sats=quest.reward_tokens, memo=f"Quest reward: {quest.name}", source="quest_reward", agent_id=agent_id, ) # Mark as settled immediately (quest rewards are auto-settled) mark_settled(invoice_entry.payment_hash, preimage=f"quest_{quest_id}") # Update progress progress.status = QuestStatus.CLAIMED progress.claimed_at = datetime.now(UTC).isoformat() progress.completion_count += 1 progress.last_completed_at = progress.claimed_at # Reset for repeatable quests if quest.repeatable: progress.status = QuestStatus.NOT_STARTED progress.current_value = 0 progress.completed_at = "" progress.claimed_at = "" notification = quest.notification_message.format(tokens=quest.reward_tokens) return { "quest_id": quest_id, "agent_id": agent_id, "tokens_awarded": quest.reward_tokens, "notification": notification, "completion_count": progress.completion_count, } except Exception as exc: logger.error("Failed to award quest reward: %s", exc) return None def check_issue_count_quest( quest: QuestDefinition, agent_id: str, closed_issues: list[dict], ) -> QuestProgress | None: """Check progress for issue_count type quest.""" criteria = quest.criteria target_labels = set(criteria.get("issue_labels", [])) # target_count is available in criteria but not used directly here # Count matching issues matching_count = 0 for issue in closed_issues: issue_labels = {label.get("name", "") for label in issue.get("labels", [])} if target_labels.issubset(issue_labels) or (not target_labels and issue_labels): matching_count += 1 progress = update_quest_progress( quest.id, agent_id, matching_count, {"matching_issues": matching_count} ) return progress def check_issue_reduce_quest( quest: QuestDefinition, agent_id: str, previous_count: int, current_count: int, ) -> QuestProgress | None: """Check progress for issue_reduce type quest.""" # target_reduction available in quest.criteria but we track actual reduction reduction = max(0, previous_count - current_count) progress = update_quest_progress(quest.id, agent_id, reduction, {"reduction": reduction}) return progress def check_daily_run_quest( quest: QuestDefinition, agent_id: str, sessions_completed: int, ) -> QuestProgress | None: """Check progress for daily_run type quest.""" # min_sessions available in quest.criteria but we track actual sessions progress = update_quest_progress( quest.id, agent_id, sessions_completed, {"sessions": sessions_completed} ) return progress def evaluate_quest_progress( quest_id: str, agent_id: str, context: dict[str, Any], ) -> QuestProgress | None: """Evaluate quest progress based on quest type and context. Args: quest_id: The quest to evaluate agent_id: The agent to evaluate for context: Context data for evaluation (issues, metrics, etc.) Returns: Updated QuestProgress or None if evaluation failed """ quest = get_quest_definition(quest_id) if not quest or not quest.enabled: return None progress = get_quest_progress(quest_id, agent_id) # Check cooldown for repeatable quests if progress and _is_on_cooldown(progress, quest): return progress try: if quest.quest_type == QuestType.ISSUE_COUNT: closed_issues = context.get("closed_issues", []) return check_issue_count_quest(quest, agent_id, closed_issues) elif quest.quest_type == QuestType.ISSUE_REDUCE: prev_count = context.get("previous_issue_count", 0) curr_count = context.get("current_issue_count", 0) return check_issue_reduce_quest(quest, agent_id, prev_count, curr_count) elif quest.quest_type == QuestType.DAILY_RUN: sessions = context.get("sessions_completed", 0) return check_daily_run_quest(quest, agent_id, sessions) elif quest.quest_type == QuestType.CUSTOM: # Custom quests require manual completion return progress else: logger.debug("Quest type %s not yet implemented", quest.quest_type) return progress except Exception as exc: logger.warning("Quest evaluation failed for %s: %s", quest_id, exc) return progress def auto_evaluate_all_quests(agent_id: str, context: dict[str, Any]) -> list[dict]: """Evaluate all active quests for an agent and award rewards. Returns: List of reward info for newly completed quests """ rewards = [] active_quests = get_active_quests() for quest in active_quests: progress = evaluate_quest_progress(quest.id, agent_id, context) if progress and progress.status == QuestStatus.COMPLETED: # Auto-claim the reward reward = claim_quest_reward(quest.id, agent_id) if reward: rewards.append(reward) return rewards def get_agent_quests_status(agent_id: str) -> dict[str, Any]: """Get complete quest status for an agent.""" definitions = get_quest_definitions() quests_status = [] total_rewards = 0 completed_count = 0 for quest_id, quest in definitions.items(): progress = get_quest_progress(quest_id, agent_id) if not progress: progress = get_or_create_progress(quest_id, agent_id) is_on_cooldown = _is_on_cooldown(progress, quest) if quest.repeatable else False quest_info = { "quest_id": quest_id, "name": quest.name, "description": quest.description, "reward_tokens": quest.reward_tokens, "type": quest.quest_type.value, "enabled": quest.enabled, "repeatable": quest.repeatable, "status": progress.status.value, "current_value": progress.current_value, "target_value": progress.target_value, "completion_count": progress.completion_count, "on_cooldown": is_on_cooldown, "cooldown_hours_remaining": 0, } if is_on_cooldown and progress.last_completed_at: try: last = datetime.fromisoformat(progress.last_completed_at) cooldown_end = last + timedelta(hours=quest.cooldown_hours) hours_remaining = (cooldown_end - datetime.now(UTC)).total_seconds() / 3600 quest_info["cooldown_hours_remaining"] = round(max(0, hours_remaining), 1) except (ValueError, TypeError): pass quests_status.append(quest_info) total_rewards += progress.completion_count * quest.reward_tokens completed_count += progress.completion_count return { "agent_id": agent_id, "quests": quests_status, "total_tokens_earned": total_rewards, "total_quests_completed": completed_count, "active_quests_count": len([q for q in quests_status if q["enabled"]]), } def reset_quest_progress(quest_id: str | None = None, agent_id: str | None = None) -> int: """Reset quest progress. Useful for testing. Args: quest_id: Specific quest to reset, or None for all agent_id: Specific agent to reset, or None for all Returns: Number of progress entries reset """ global _quest_progress count = 0 keys_to_reset = [] for key, _progress in _quest_progress.items(): key_agent, key_quest = key.split(":", 1) if (quest_id is None or key_quest == quest_id) and ( agent_id is None or key_agent == agent_id ): keys_to_reset.append(key) for key in keys_to_reset: del _quest_progress[key] count += 1 return count def get_quest_leaderboard() -> list[dict[str, Any]]: """Get a leaderboard of agents by quest completion.""" agent_stats: dict[str, dict[str, Any]] = {} for _key, progress in _quest_progress.items(): agent_id = progress.agent_id if agent_id not in agent_stats: agent_stats[agent_id] = { "agent_id": agent_id, "total_completions": 0, "total_tokens": 0, "quests_completed": set(), } quest = get_quest_definition(progress.quest_id) if quest: agent_stats[agent_id]["total_completions"] += progress.completion_count agent_stats[agent_id]["total_tokens"] += progress.completion_count * quest.reward_tokens if progress.completion_count > 0: agent_stats[agent_id]["quests_completed"].add(quest.id) leaderboard = [] for stats in agent_stats.values(): leaderboard.append( { "agent_id": stats["agent_id"], "total_completions": stats["total_completions"], "total_tokens": stats["total_tokens"], "unique_quests_completed": len(stats["quests_completed"]), } ) # Sort by total tokens (descending) leaderboard.sort(key=lambda x: x["total_tokens"], reverse=True) return leaderboard # Initialize on module load load_quest_config()