forked from Rockachopa/Timmy-time-dashboard
582 lines
18 KiB
Python
582 lines
18 KiB
Python
"""Token Quest System for agent rewards.
|
|
|
|
Provides quest definitions, progress tracking, completion detection,
|
|
and token awards for agent accomplishments.
|
|
|
|
Quests are defined in config/quests.yaml and loaded at runtime.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from datetime import UTC, datetime, timedelta
|
|
from enum import StrEnum
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
from config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Path to quest configuration
|
|
QUEST_CONFIG_PATH = Path(settings.repo_root) / "config" / "quests.yaml"
|
|
|
|
|
|
class QuestType(StrEnum):
|
|
"""Types of quests supported by the system."""
|
|
|
|
ISSUE_COUNT = "issue_count"
|
|
ISSUE_REDUCE = "issue_reduce"
|
|
DOCS_UPDATE = "docs_update"
|
|
TEST_IMPROVE = "test_improve"
|
|
DAILY_RUN = "daily_run"
|
|
CUSTOM = "custom"
|
|
|
|
|
|
class QuestStatus(StrEnum):
|
|
"""Status of a quest for an agent."""
|
|
|
|
NOT_STARTED = "not_started"
|
|
IN_PROGRESS = "in_progress"
|
|
COMPLETED = "completed"
|
|
CLAIMED = "claimed"
|
|
EXPIRED = "expired"
|
|
|
|
|
|
@dataclass
|
|
class QuestDefinition:
|
|
"""Definition of a quest from configuration."""
|
|
|
|
id: str
|
|
name: str
|
|
description: str
|
|
reward_tokens: int
|
|
quest_type: QuestType
|
|
enabled: bool
|
|
repeatable: bool
|
|
cooldown_hours: int
|
|
criteria: dict[str, Any]
|
|
notification_message: str
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict[str, Any]) -> QuestDefinition:
|
|
"""Create a QuestDefinition from a dictionary."""
|
|
return cls(
|
|
id=data["id"],
|
|
name=data.get("name", "Unnamed Quest"),
|
|
description=data.get("description", ""),
|
|
reward_tokens=data.get("reward_tokens", 0),
|
|
quest_type=QuestType(data.get("type", "custom")),
|
|
enabled=data.get("enabled", True),
|
|
repeatable=data.get("repeatable", False),
|
|
cooldown_hours=data.get("cooldown_hours", 0),
|
|
criteria=data.get("criteria", {}),
|
|
notification_message=data.get(
|
|
"notification_message", "Quest Complete! You earned {tokens} tokens."
|
|
),
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class QuestProgress:
|
|
"""Progress of a quest for a specific agent."""
|
|
|
|
quest_id: str
|
|
agent_id: str
|
|
status: QuestStatus
|
|
current_value: int = 0
|
|
target_value: int = 0
|
|
started_at: str = ""
|
|
completed_at: str = ""
|
|
claimed_at: str = ""
|
|
completion_count: int = 0
|
|
last_completed_at: str = ""
|
|
metadata: dict[str, Any] = field(default_factory=dict)
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
"""Convert to dictionary for serialization."""
|
|
return {
|
|
"quest_id": self.quest_id,
|
|
"agent_id": self.agent_id,
|
|
"status": self.status.value,
|
|
"current_value": self.current_value,
|
|
"target_value": self.target_value,
|
|
"started_at": self.started_at,
|
|
"completed_at": self.completed_at,
|
|
"claimed_at": self.claimed_at,
|
|
"completion_count": self.completion_count,
|
|
"last_completed_at": self.last_completed_at,
|
|
"metadata": self.metadata,
|
|
}
|
|
|
|
|
|
# In-memory storage for quest progress
|
|
_quest_progress: dict[str, QuestProgress] = {}
|
|
_quest_definitions: dict[str, QuestDefinition] = {}
|
|
_quest_settings: dict[str, Any] = {}
|
|
|
|
|
|
def _get_progress_key(quest_id: str, agent_id: str) -> str:
|
|
"""Generate a unique key for quest progress."""
|
|
return f"{agent_id}:{quest_id}"
|
|
|
|
|
|
def load_quest_config() -> tuple[dict[str, QuestDefinition], dict[str, Any]]:
|
|
"""Load quest definitions from quests.yaml.
|
|
|
|
Returns:
|
|
Tuple of (quest definitions dict, settings dict)
|
|
"""
|
|
global _quest_definitions, _quest_settings
|
|
|
|
if not QUEST_CONFIG_PATH.exists():
|
|
logger.warning("Quest config not found at %s", QUEST_CONFIG_PATH)
|
|
return {}, {}
|
|
|
|
try:
|
|
raw = QUEST_CONFIG_PATH.read_text()
|
|
config = yaml.safe_load(raw)
|
|
|
|
if not isinstance(config, dict):
|
|
logger.warning("Invalid quest config format")
|
|
return {}, {}
|
|
|
|
# Load quest definitions
|
|
quests_data = config.get("quests", {})
|
|
definitions = {}
|
|
for quest_id, quest_data in quests_data.items():
|
|
quest_data["id"] = quest_id
|
|
try:
|
|
definition = QuestDefinition.from_dict(quest_data)
|
|
definitions[quest_id] = definition
|
|
except (ValueError, KeyError) as exc:
|
|
logger.warning("Failed to load quest %s: %s", quest_id, exc)
|
|
|
|
# Load settings
|
|
_quest_settings = config.get("settings", {})
|
|
_quest_definitions = definitions
|
|
|
|
logger.debug("Loaded %d quest definitions", len(definitions))
|
|
return definitions, _quest_settings
|
|
|
|
except (OSError, yaml.YAMLError) as exc:
|
|
logger.warning("Failed to load quest config: %s", exc)
|
|
return {}, {}
|
|
|
|
|
|
def get_quest_definitions() -> dict[str, QuestDefinition]:
|
|
"""Get all quest definitions, loading if necessary."""
|
|
global _quest_definitions
|
|
if not _quest_definitions:
|
|
_quest_definitions, _ = load_quest_config()
|
|
return _quest_definitions
|
|
|
|
|
|
def get_quest_definition(quest_id: str) -> QuestDefinition | None:
|
|
"""Get a specific quest definition by ID."""
|
|
definitions = get_quest_definitions()
|
|
return definitions.get(quest_id)
|
|
|
|
|
|
def get_active_quests() -> list[QuestDefinition]:
|
|
"""Get all enabled quest definitions."""
|
|
definitions = get_quest_definitions()
|
|
return [q for q in definitions.values() if q.enabled]
|
|
|
|
|
|
def get_quest_progress(quest_id: str, agent_id: str) -> QuestProgress | None:
|
|
"""Get progress for a specific quest and agent."""
|
|
key = _get_progress_key(quest_id, agent_id)
|
|
return _quest_progress.get(key)
|
|
|
|
|
|
def get_or_create_progress(quest_id: str, agent_id: str) -> QuestProgress:
|
|
"""Get existing progress or create new for quest/agent."""
|
|
key = _get_progress_key(quest_id, agent_id)
|
|
if key not in _quest_progress:
|
|
quest = get_quest_definition(quest_id)
|
|
if not quest:
|
|
raise ValueError(f"Quest {quest_id} not found")
|
|
|
|
target = _get_target_value(quest)
|
|
_quest_progress[key] = QuestProgress(
|
|
quest_id=quest_id,
|
|
agent_id=agent_id,
|
|
status=QuestStatus.NOT_STARTED,
|
|
current_value=0,
|
|
target_value=target,
|
|
started_at=datetime.now(UTC).isoformat(),
|
|
)
|
|
return _quest_progress[key]
|
|
|
|
|
|
def _get_target_value(quest: QuestDefinition) -> int:
|
|
"""Extract target value from quest criteria."""
|
|
criteria = quest.criteria
|
|
if quest.quest_type == QuestType.ISSUE_COUNT:
|
|
return criteria.get("target_count", 1)
|
|
elif quest.quest_type == QuestType.ISSUE_REDUCE:
|
|
return criteria.get("target_reduction", 1)
|
|
elif quest.quest_type == QuestType.DAILY_RUN:
|
|
return criteria.get("min_sessions", 1)
|
|
elif quest.quest_type == QuestType.DOCS_UPDATE:
|
|
return criteria.get("min_files_changed", 1)
|
|
elif quest.quest_type == QuestType.TEST_IMPROVE:
|
|
return criteria.get("min_new_tests", 1)
|
|
return 1
|
|
|
|
|
|
def update_quest_progress(
|
|
quest_id: str,
|
|
agent_id: str,
|
|
current_value: int,
|
|
metadata: dict[str, Any] | None = None,
|
|
) -> QuestProgress:
|
|
"""Update progress for a quest."""
|
|
progress = get_or_create_progress(quest_id, agent_id)
|
|
progress.current_value = current_value
|
|
|
|
if metadata:
|
|
progress.metadata.update(metadata)
|
|
|
|
# Check if quest is now complete
|
|
if progress.current_value >= progress.target_value:
|
|
if progress.status not in (QuestStatus.COMPLETED, QuestStatus.CLAIMED):
|
|
progress.status = QuestStatus.COMPLETED
|
|
progress.completed_at = datetime.now(UTC).isoformat()
|
|
logger.info("Quest %s completed for agent %s", quest_id, agent_id)
|
|
|
|
return progress
|
|
|
|
|
|
def _is_on_cooldown(progress: QuestProgress, quest: QuestDefinition) -> bool:
|
|
"""Check if a repeatable quest is on cooldown."""
|
|
if not quest.repeatable or not progress.last_completed_at:
|
|
return False
|
|
|
|
if quest.cooldown_hours <= 0:
|
|
return False
|
|
|
|
try:
|
|
last_completed = datetime.fromisoformat(progress.last_completed_at)
|
|
cooldown_end = last_completed + timedelta(hours=quest.cooldown_hours)
|
|
return datetime.now(UTC) < cooldown_end
|
|
except (ValueError, TypeError):
|
|
return False
|
|
|
|
|
|
def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
|
|
"""Claim the token reward for a completed quest.
|
|
|
|
Returns:
|
|
Reward info dict if successful, None if not claimable
|
|
"""
|
|
progress = get_quest_progress(quest_id, agent_id)
|
|
if not progress:
|
|
return None
|
|
|
|
quest = get_quest_definition(quest_id)
|
|
if not quest:
|
|
return None
|
|
|
|
# Check if quest is completed but not yet claimed
|
|
if progress.status != QuestStatus.COMPLETED:
|
|
return None
|
|
|
|
# Check cooldown for repeatable quests
|
|
if _is_on_cooldown(progress, quest):
|
|
return None
|
|
|
|
try:
|
|
# Award tokens via ledger
|
|
from lightning.ledger import create_invoice_entry, mark_settled
|
|
|
|
# Create a mock invoice for the reward
|
|
invoice_entry = create_invoice_entry(
|
|
payment_hash=f"quest_{quest_id}_{agent_id}_{int(time.time())}",
|
|
amount_sats=quest.reward_tokens,
|
|
memo=f"Quest reward: {quest.name}",
|
|
source="quest_reward",
|
|
agent_id=agent_id,
|
|
)
|
|
|
|
# Mark as settled immediately (quest rewards are auto-settled)
|
|
mark_settled(invoice_entry.payment_hash, preimage=f"quest_{quest_id}")
|
|
|
|
# Update progress
|
|
progress.status = QuestStatus.CLAIMED
|
|
progress.claimed_at = datetime.now(UTC).isoformat()
|
|
progress.completion_count += 1
|
|
progress.last_completed_at = progress.claimed_at
|
|
|
|
# Reset for repeatable quests
|
|
if quest.repeatable:
|
|
progress.status = QuestStatus.NOT_STARTED
|
|
progress.current_value = 0
|
|
progress.completed_at = ""
|
|
progress.claimed_at = ""
|
|
|
|
notification = quest.notification_message.format(tokens=quest.reward_tokens)
|
|
|
|
return {
|
|
"quest_id": quest_id,
|
|
"agent_id": agent_id,
|
|
"tokens_awarded": quest.reward_tokens,
|
|
"notification": notification,
|
|
"completion_count": progress.completion_count,
|
|
}
|
|
|
|
except Exception as exc:
|
|
logger.error("Failed to award quest reward: %s", exc)
|
|
return None
|
|
|
|
|
|
def check_issue_count_quest(
|
|
quest: QuestDefinition,
|
|
agent_id: str,
|
|
closed_issues: list[dict],
|
|
) -> QuestProgress | None:
|
|
"""Check progress for issue_count type quest."""
|
|
criteria = quest.criteria
|
|
target_labels = set(criteria.get("issue_labels", []))
|
|
# target_count is available in criteria but not used directly here
|
|
|
|
# Count matching issues
|
|
matching_count = 0
|
|
for issue in closed_issues:
|
|
issue_labels = {label.get("name", "") for label in issue.get("labels", [])}
|
|
if target_labels.issubset(issue_labels) or (not target_labels and issue_labels):
|
|
matching_count += 1
|
|
|
|
progress = update_quest_progress(
|
|
quest.id, agent_id, matching_count, {"matching_issues": matching_count}
|
|
)
|
|
|
|
return progress
|
|
|
|
|
|
def check_issue_reduce_quest(
|
|
quest: QuestDefinition,
|
|
agent_id: str,
|
|
previous_count: int,
|
|
current_count: int,
|
|
) -> QuestProgress | None:
|
|
"""Check progress for issue_reduce type quest."""
|
|
# target_reduction available in quest.criteria but we track actual reduction
|
|
reduction = max(0, previous_count - current_count)
|
|
|
|
progress = update_quest_progress(quest.id, agent_id, reduction, {"reduction": reduction})
|
|
|
|
return progress
|
|
|
|
|
|
def check_daily_run_quest(
|
|
quest: QuestDefinition,
|
|
agent_id: str,
|
|
sessions_completed: int,
|
|
) -> QuestProgress | None:
|
|
"""Check progress for daily_run type quest."""
|
|
# min_sessions available in quest.criteria but we track actual sessions
|
|
progress = update_quest_progress(
|
|
quest.id, agent_id, sessions_completed, {"sessions": sessions_completed}
|
|
)
|
|
|
|
return progress
|
|
|
|
|
|
def evaluate_quest_progress(
|
|
quest_id: str,
|
|
agent_id: str,
|
|
context: dict[str, Any],
|
|
) -> QuestProgress | None:
|
|
"""Evaluate quest progress based on quest type and context.
|
|
|
|
Args:
|
|
quest_id: The quest to evaluate
|
|
agent_id: The agent to evaluate for
|
|
context: Context data for evaluation (issues, metrics, etc.)
|
|
|
|
Returns:
|
|
Updated QuestProgress or None if evaluation failed
|
|
"""
|
|
quest = get_quest_definition(quest_id)
|
|
if not quest or not quest.enabled:
|
|
return None
|
|
|
|
progress = get_quest_progress(quest_id, agent_id)
|
|
|
|
# Check cooldown for repeatable quests
|
|
if progress and _is_on_cooldown(progress, quest):
|
|
return progress
|
|
|
|
try:
|
|
if quest.quest_type == QuestType.ISSUE_COUNT:
|
|
closed_issues = context.get("closed_issues", [])
|
|
return check_issue_count_quest(quest, agent_id, closed_issues)
|
|
|
|
elif quest.quest_type == QuestType.ISSUE_REDUCE:
|
|
prev_count = context.get("previous_issue_count", 0)
|
|
curr_count = context.get("current_issue_count", 0)
|
|
return check_issue_reduce_quest(quest, agent_id, prev_count, curr_count)
|
|
|
|
elif quest.quest_type == QuestType.DAILY_RUN:
|
|
sessions = context.get("sessions_completed", 0)
|
|
return check_daily_run_quest(quest, agent_id, sessions)
|
|
|
|
elif quest.quest_type == QuestType.CUSTOM:
|
|
# Custom quests require manual completion
|
|
return progress
|
|
|
|
else:
|
|
logger.debug("Quest type %s not yet implemented", quest.quest_type)
|
|
return progress
|
|
|
|
except Exception as exc:
|
|
logger.warning("Quest evaluation failed for %s: %s", quest_id, exc)
|
|
return progress
|
|
|
|
|
|
def auto_evaluate_all_quests(agent_id: str, context: dict[str, Any]) -> list[dict]:
|
|
"""Evaluate all active quests for an agent and award rewards.
|
|
|
|
Returns:
|
|
List of reward info for newly completed quests
|
|
"""
|
|
rewards = []
|
|
active_quests = get_active_quests()
|
|
|
|
for quest in active_quests:
|
|
progress = evaluate_quest_progress(quest.id, agent_id, context)
|
|
if progress and progress.status == QuestStatus.COMPLETED:
|
|
# Auto-claim the reward
|
|
reward = claim_quest_reward(quest.id, agent_id)
|
|
if reward:
|
|
rewards.append(reward)
|
|
|
|
return rewards
|
|
|
|
|
|
def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
|
|
"""Get complete quest status for an agent."""
|
|
definitions = get_quest_definitions()
|
|
quests_status = []
|
|
total_rewards = 0
|
|
completed_count = 0
|
|
|
|
for quest_id, quest in definitions.items():
|
|
progress = get_quest_progress(quest_id, agent_id)
|
|
if not progress:
|
|
progress = get_or_create_progress(quest_id, agent_id)
|
|
|
|
is_on_cooldown = _is_on_cooldown(progress, quest) if quest.repeatable else False
|
|
|
|
quest_info = {
|
|
"quest_id": quest_id,
|
|
"name": quest.name,
|
|
"description": quest.description,
|
|
"reward_tokens": quest.reward_tokens,
|
|
"type": quest.quest_type.value,
|
|
"enabled": quest.enabled,
|
|
"repeatable": quest.repeatable,
|
|
"status": progress.status.value,
|
|
"current_value": progress.current_value,
|
|
"target_value": progress.target_value,
|
|
"completion_count": progress.completion_count,
|
|
"on_cooldown": is_on_cooldown,
|
|
"cooldown_hours_remaining": 0,
|
|
}
|
|
|
|
if is_on_cooldown and progress.last_completed_at:
|
|
try:
|
|
last = datetime.fromisoformat(progress.last_completed_at)
|
|
cooldown_end = last + timedelta(hours=quest.cooldown_hours)
|
|
hours_remaining = (cooldown_end - datetime.now(UTC)).total_seconds() / 3600
|
|
quest_info["cooldown_hours_remaining"] = round(max(0, hours_remaining), 1)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
quests_status.append(quest_info)
|
|
total_rewards += progress.completion_count * quest.reward_tokens
|
|
completed_count += progress.completion_count
|
|
|
|
return {
|
|
"agent_id": agent_id,
|
|
"quests": quests_status,
|
|
"total_tokens_earned": total_rewards,
|
|
"total_quests_completed": completed_count,
|
|
"active_quests_count": len([q for q in quests_status if q["enabled"]]),
|
|
}
|
|
|
|
|
|
def reset_quest_progress(quest_id: str | None = None, agent_id: str | None = None) -> int:
|
|
"""Reset quest progress. Useful for testing.
|
|
|
|
Args:
|
|
quest_id: Specific quest to reset, or None for all
|
|
agent_id: Specific agent to reset, or None for all
|
|
|
|
Returns:
|
|
Number of progress entries reset
|
|
"""
|
|
global _quest_progress
|
|
count = 0
|
|
|
|
keys_to_reset = []
|
|
for key, _progress in _quest_progress.items():
|
|
key_agent, key_quest = key.split(":", 1)
|
|
if (quest_id is None or key_quest == quest_id) and (
|
|
agent_id is None or key_agent == agent_id
|
|
):
|
|
keys_to_reset.append(key)
|
|
|
|
for key in keys_to_reset:
|
|
del _quest_progress[key]
|
|
count += 1
|
|
|
|
return count
|
|
|
|
|
|
def get_quest_leaderboard() -> list[dict[str, Any]]:
|
|
"""Get a leaderboard of agents by quest completion."""
|
|
agent_stats: dict[str, dict[str, Any]] = {}
|
|
|
|
for _key, progress in _quest_progress.items():
|
|
agent_id = progress.agent_id
|
|
if agent_id not in agent_stats:
|
|
agent_stats[agent_id] = {
|
|
"agent_id": agent_id,
|
|
"total_completions": 0,
|
|
"total_tokens": 0,
|
|
"quests_completed": set(),
|
|
}
|
|
|
|
quest = get_quest_definition(progress.quest_id)
|
|
if quest:
|
|
agent_stats[agent_id]["total_completions"] += progress.completion_count
|
|
agent_stats[agent_id]["total_tokens"] += progress.completion_count * quest.reward_tokens
|
|
if progress.completion_count > 0:
|
|
agent_stats[agent_id]["quests_completed"].add(quest.id)
|
|
|
|
leaderboard = []
|
|
for stats in agent_stats.values():
|
|
leaderboard.append(
|
|
{
|
|
"agent_id": stats["agent_id"],
|
|
"total_completions": stats["total_completions"],
|
|
"total_tokens": stats["total_tokens"],
|
|
"unique_quests_completed": len(stats["quests_completed"]),
|
|
}
|
|
)
|
|
|
|
# Sort by total tokens (descending)
|
|
leaderboard.sort(key=lambda x: x["total_tokens"], reverse=True)
|
|
return leaderboard
|
|
|
|
|
|
# Initialize on module load
|
|
load_quest_config()
|