From a48f30fee41f49cfa1a14a1e9f2b0d0e4106839a Mon Sep 17 00:00:00 2001 From: "Claude (Opus 4.6)" Date: Tue, 24 Mar 2026 01:57:29 +0000 Subject: [PATCH] [claude] Add unit tests for quest_system.py (#1292) (#1309) --- tests/timmy/test_quest_system.py | 839 +++++++++++++++++++++++++++++++ 1 file changed, 839 insertions(+) create mode 100644 tests/timmy/test_quest_system.py diff --git a/tests/timmy/test_quest_system.py b/tests/timmy/test_quest_system.py new file mode 100644 index 00000000..3dc71686 --- /dev/null +++ b/tests/timmy/test_quest_system.py @@ -0,0 +1,839 @@ +"""Unit tests for timmy.quest_system.""" + +from __future__ import annotations + +from datetime import UTC, datetime, timedelta +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest + +import timmy.quest_system as qs +from timmy.quest_system import ( + QuestDefinition, + QuestProgress, + QuestStatus, + QuestType, + _get_progress_key, + _get_target_value, + _is_on_cooldown, + check_daily_run_quest, + check_issue_count_quest, + check_issue_reduce_quest, + claim_quest_reward, + evaluate_quest_progress, + get_active_quests, + get_agent_quests_status, + get_or_create_progress, + get_quest_definition, + get_quest_definitions, + get_quest_leaderboard, + get_quest_progress, + load_quest_config, + reset_quest_progress, + update_quest_progress, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_quest( + quest_id: str = "test_quest", + quest_type: QuestType = QuestType.ISSUE_COUNT, + reward_tokens: int = 10, + enabled: bool = True, + repeatable: bool = False, + cooldown_hours: int = 0, + criteria: dict[str, Any] | None = None, +) -> QuestDefinition: + return QuestDefinition( + id=quest_id, + name=f"Quest {quest_id}", + description="Test quest", + reward_tokens=reward_tokens, + quest_type=quest_type, + enabled=enabled, + repeatable=repeatable, + cooldown_hours=cooldown_hours, + criteria=criteria or {"target_count": 3}, + notification_message="Quest Complete! You earned {tokens} tokens.", + ) + + +@pytest.fixture(autouse=True) +def clean_state(): + """Reset module-level state before and after each test.""" + reset_quest_progress() + qs._quest_definitions.clear() + qs._quest_settings.clear() + yield + reset_quest_progress() + qs._quest_definitions.clear() + qs._quest_settings.clear() + + +# --------------------------------------------------------------------------- +# QuestDefinition +# --------------------------------------------------------------------------- + +class TestQuestDefinition: + def test_from_dict_minimal(self): + data = {"id": "q1"} + defn = QuestDefinition.from_dict(data) + assert defn.id == "q1" + assert defn.name == "Unnamed Quest" + assert defn.reward_tokens == 0 + assert defn.quest_type == QuestType.CUSTOM + assert defn.enabled is True + assert defn.repeatable is False + assert defn.cooldown_hours == 0 + + def test_from_dict_full(self): + data = { + "id": "q2", + "name": "Full Quest", + "description": "A full quest", + "reward_tokens": 50, + "type": "issue_count", + "enabled": False, + "repeatable": True, + "cooldown_hours": 24, + "criteria": {"target_count": 5}, + "notification_message": "You earned {tokens}!", + } + defn = QuestDefinition.from_dict(data) + assert defn.id == "q2" + assert defn.name == "Full Quest" + assert defn.reward_tokens == 50 + assert defn.quest_type == QuestType.ISSUE_COUNT + assert defn.enabled is False + assert defn.repeatable is True + assert defn.cooldown_hours == 24 + assert defn.criteria == {"target_count": 5} + assert defn.notification_message == "You earned {tokens}!" + + def test_from_dict_invalid_type_raises(self): + data = {"id": "q3", "type": "not_a_real_type"} + with pytest.raises(ValueError): + QuestDefinition.from_dict(data) + + +# --------------------------------------------------------------------------- +# QuestProgress +# --------------------------------------------------------------------------- + +class TestQuestProgress: + def test_to_dict_roundtrip(self): + progress = QuestProgress( + quest_id="q1", + agent_id="agent_a", + status=QuestStatus.IN_PROGRESS, + current_value=2, + target_value=5, + started_at="2026-01-01T00:00:00", + metadata={"key": "val"}, + ) + d = progress.to_dict() + assert d["quest_id"] == "q1" + assert d["agent_id"] == "agent_a" + assert d["status"] == "in_progress" + assert d["current_value"] == 2 + assert d["target_value"] == 5 + assert d["metadata"] == {"key": "val"} + + def test_to_dict_defaults(self): + progress = QuestProgress( + quest_id="q1", + agent_id="agent_a", + status=QuestStatus.NOT_STARTED, + ) + d = progress.to_dict() + assert d["completion_count"] == 0 + assert d["started_at"] == "" + assert d["completed_at"] == "" + + +# --------------------------------------------------------------------------- +# _get_progress_key +# --------------------------------------------------------------------------- + +def test_get_progress_key(): + assert _get_progress_key("q1", "agent_a") == "agent_a:q1" + + +def test_get_progress_key_different_agents(): + key_a = _get_progress_key("q1", "agent_a") + key_b = _get_progress_key("q1", "agent_b") + assert key_a != key_b + + +# --------------------------------------------------------------------------- +# load_quest_config +# --------------------------------------------------------------------------- + +class TestLoadQuestConfig: + def test_missing_file_returns_empty(self, tmp_path): + missing = tmp_path / "nonexistent.yaml" + with patch.object(qs, "QUEST_CONFIG_PATH", missing): + defs, settings = load_quest_config() + assert defs == {} + assert settings == {} + + def test_valid_yaml_loads_quests(self, tmp_path): + config_path = tmp_path / "quests.yaml" + config_path.write_text( + """ +quests: + first_quest: + name: First Quest + description: Do stuff + reward_tokens: 25 + type: issue_count + enabled: true + repeatable: false + cooldown_hours: 0 + criteria: + target_count: 3 + notification_message: "Done! {tokens} tokens" +settings: + some_setting: true +""" + ) + with patch.object(qs, "QUEST_CONFIG_PATH", config_path): + defs, settings = load_quest_config() + + assert "first_quest" in defs + assert defs["first_quest"].name == "First Quest" + assert defs["first_quest"].reward_tokens == 25 + assert settings == {"some_setting": True} + + def test_invalid_yaml_returns_empty(self, tmp_path): + config_path = tmp_path / "quests.yaml" + config_path.write_text(":: not valid yaml ::") + with patch.object(qs, "QUEST_CONFIG_PATH", config_path): + defs, settings = load_quest_config() + assert defs == {} + assert settings == {} + + def test_non_dict_yaml_returns_empty(self, tmp_path): + config_path = tmp_path / "quests.yaml" + config_path.write_text("- item1\n- item2\n") + with patch.object(qs, "QUEST_CONFIG_PATH", config_path): + defs, settings = load_quest_config() + assert defs == {} + assert settings == {} + + def test_bad_quest_entry_is_skipped(self, tmp_path): + config_path = tmp_path / "quests.yaml" + config_path.write_text( + """ +quests: + good_quest: + name: Good + type: issue_count + reward_tokens: 10 + enabled: true + repeatable: false + cooldown_hours: 0 + criteria: {} + notification_message: "{tokens}" + bad_quest: + type: invalid_type_that_does_not_exist +""" + ) + with patch.object(qs, "QUEST_CONFIG_PATH", config_path): + defs, _ = load_quest_config() + assert "good_quest" in defs + assert "bad_quest" not in defs + + +# --------------------------------------------------------------------------- +# get_quest_definitions / get_quest_definition / get_active_quests +# --------------------------------------------------------------------------- + +class TestQuestLookup: + def setup_method(self): + q1 = _make_quest("q1", enabled=True) + q2 = _make_quest("q2", enabled=False) + qs._quest_definitions.update({"q1": q1, "q2": q2}) + + def test_get_quest_definitions_returns_all(self): + defs = get_quest_definitions() + assert "q1" in defs + assert "q2" in defs + + def test_get_quest_definition_found(self): + defn = get_quest_definition("q1") + assert defn is not None + assert defn.id == "q1" + + def test_get_quest_definition_not_found(self): + assert get_quest_definition("missing") is None + + def test_get_active_quests_only_enabled(self): + active = get_active_quests() + ids = [q.id for q in active] + assert "q1" in ids + assert "q2" not in ids + + +# --------------------------------------------------------------------------- +# _get_target_value +# --------------------------------------------------------------------------- + +class TestGetTargetValue: + def test_issue_count(self): + q = _make_quest(quest_type=QuestType.ISSUE_COUNT, criteria={"target_count": 7}) + assert _get_target_value(q) == 7 + + def test_issue_reduce(self): + q = _make_quest(quest_type=QuestType.ISSUE_REDUCE, criteria={"target_reduction": 5}) + assert _get_target_value(q) == 5 + + def test_daily_run(self): + q = _make_quest(quest_type=QuestType.DAILY_RUN, criteria={"min_sessions": 3}) + assert _get_target_value(q) == 3 + + def test_docs_update(self): + q = _make_quest(quest_type=QuestType.DOCS_UPDATE, criteria={"min_files_changed": 2}) + assert _get_target_value(q) == 2 + + def test_test_improve(self): + q = _make_quest(quest_type=QuestType.TEST_IMPROVE, criteria={"min_new_tests": 4}) + assert _get_target_value(q) == 4 + + def test_custom_defaults_to_one(self): + q = _make_quest(quest_type=QuestType.CUSTOM, criteria={}) + assert _get_target_value(q) == 1 + + def test_missing_criteria_key_defaults_to_one(self): + q = _make_quest(quest_type=QuestType.ISSUE_COUNT, criteria={}) + assert _get_target_value(q) == 1 + + +# --------------------------------------------------------------------------- +# get_or_create_progress / get_quest_progress +# --------------------------------------------------------------------------- + +class TestProgressCreation: + def setup_method(self): + qs._quest_definitions["q1"] = _make_quest("q1", criteria={"target_count": 5}) + + def test_creates_new_progress(self): + progress = get_or_create_progress("q1", "agent_a") + assert progress.quest_id == "q1" + assert progress.agent_id == "agent_a" + assert progress.status == QuestStatus.NOT_STARTED + assert progress.target_value == 5 + assert progress.current_value == 0 + + def test_returns_existing_progress(self): + p1 = get_or_create_progress("q1", "agent_a") + p1.current_value = 3 + p2 = get_or_create_progress("q1", "agent_a") + assert p2.current_value == 3 + assert p1 is p2 + + def test_raises_for_unknown_quest(self): + with pytest.raises(ValueError, match="Quest unknown not found"): + get_or_create_progress("unknown", "agent_a") + + def test_get_quest_progress_none_before_creation(self): + assert get_quest_progress("q1", "agent_a") is None + + def test_get_quest_progress_after_creation(self): + get_or_create_progress("q1", "agent_a") + progress = get_quest_progress("q1", "agent_a") + assert progress is not None + + +# --------------------------------------------------------------------------- +# update_quest_progress +# --------------------------------------------------------------------------- + +class TestUpdateQuestProgress: + def setup_method(self): + qs._quest_definitions["q1"] = _make_quest("q1", criteria={"target_count": 3}) + + def test_updates_current_value(self): + progress = update_quest_progress("q1", "agent_a", 2) + assert progress.current_value == 2 + assert progress.status == QuestStatus.NOT_STARTED + + def test_marks_completed_when_target_reached(self): + progress = update_quest_progress("q1", "agent_a", 3) + assert progress.status == QuestStatus.COMPLETED + assert progress.completed_at != "" + + def test_marks_completed_when_value_exceeds_target(self): + progress = update_quest_progress("q1", "agent_a", 10) + assert progress.status == QuestStatus.COMPLETED + + def test_does_not_re_complete_already_completed(self): + p = update_quest_progress("q1", "agent_a", 3) + first_completed_at = p.completed_at + p2 = update_quest_progress("q1", "agent_a", 5) + # should not change completed_at again + assert p2.completed_at == first_completed_at + + def test_does_not_re_complete_claimed_quest(self): + p = update_quest_progress("q1", "agent_a", 3) + p.status = QuestStatus.CLAIMED + p2 = update_quest_progress("q1", "agent_a", 5) + assert p2.status == QuestStatus.CLAIMED + + def test_updates_metadata(self): + progress = update_quest_progress("q1", "agent_a", 1, metadata={"info": "value"}) + assert progress.metadata["info"] == "value" + + def test_merges_metadata(self): + update_quest_progress("q1", "agent_a", 1, metadata={"a": 1}) + progress = update_quest_progress("q1", "agent_a", 2, metadata={"b": 2}) + assert progress.metadata["a"] == 1 + assert progress.metadata["b"] == 2 + + +# --------------------------------------------------------------------------- +# _is_on_cooldown +# --------------------------------------------------------------------------- + +class TestIsOnCooldown: + def test_non_repeatable_never_on_cooldown(self): + quest = _make_quest(repeatable=False, cooldown_hours=24) + progress = QuestProgress( + quest_id="q1", + agent_id="agent_a", + status=QuestStatus.CLAIMED, + last_completed_at=datetime.now(UTC).isoformat(), + ) + assert _is_on_cooldown(progress, quest) is False + + def test_no_last_completed_not_on_cooldown(self): + quest = _make_quest(repeatable=True, cooldown_hours=24) + progress = QuestProgress( + quest_id="q1", + agent_id="agent_a", + status=QuestStatus.NOT_STARTED, + last_completed_at="", + ) + assert _is_on_cooldown(progress, quest) is False + + def test_zero_cooldown_not_on_cooldown(self): + quest = _make_quest(repeatable=True, cooldown_hours=0) + progress = QuestProgress( + quest_id="q1", + agent_id="agent_a", + status=QuestStatus.CLAIMED, + last_completed_at=datetime.now(UTC).isoformat(), + ) + assert _is_on_cooldown(progress, quest) is False + + def test_recent_completion_is_on_cooldown(self): + quest = _make_quest(repeatable=True, cooldown_hours=24) + recent = datetime.now(UTC) - timedelta(hours=1) + progress = QuestProgress( + quest_id="q1", + agent_id="agent_a", + status=QuestStatus.NOT_STARTED, + last_completed_at=recent.isoformat(), + ) + assert _is_on_cooldown(progress, quest) is True + + def test_expired_cooldown_not_on_cooldown(self): + quest = _make_quest(repeatable=True, cooldown_hours=24) + old = datetime.now(UTC) - timedelta(hours=25) + progress = QuestProgress( + quest_id="q1", + agent_id="agent_a", + status=QuestStatus.NOT_STARTED, + last_completed_at=old.isoformat(), + ) + assert _is_on_cooldown(progress, quest) is False + + def test_invalid_last_completed_returns_false(self): + quest = _make_quest(repeatable=True, cooldown_hours=24) + progress = QuestProgress( + quest_id="q1", + agent_id="agent_a", + status=QuestStatus.NOT_STARTED, + last_completed_at="not-a-date", + ) + assert _is_on_cooldown(progress, quest) is False + + +# --------------------------------------------------------------------------- +# claim_quest_reward +# --------------------------------------------------------------------------- + +class TestClaimQuestReward: + def setup_method(self): + qs._quest_definitions["q1"] = _make_quest("q1", reward_tokens=25) + + def test_returns_none_if_no_progress(self): + assert claim_quest_reward("q1", "agent_a") is None + + def test_returns_none_if_not_completed(self): + get_or_create_progress("q1", "agent_a") + assert claim_quest_reward("q1", "agent_a") is None + + def test_returns_none_if_quest_not_found(self): + assert claim_quest_reward("nonexistent", "agent_a") is None + + def test_successful_claim(self): + progress = get_or_create_progress("q1", "agent_a") + progress.status = QuestStatus.COMPLETED + progress.completed_at = datetime.now(UTC).isoformat() + + mock_invoice = MagicMock() + mock_invoice.payment_hash = "quest_q1_agent_a_123" + + with ( + patch("timmy.quest_system.create_invoice_entry", return_value=mock_invoice), + patch("timmy.quest_system.mark_settled"), + ): + result = claim_quest_reward("q1", "agent_a") + + assert result is not None + assert result["tokens_awarded"] == 25 + assert result["quest_id"] == "q1" + assert result["agent_id"] == "agent_a" + assert result["completion_count"] == 1 + + def test_successful_claim_marks_claimed(self): + progress = get_or_create_progress("q1", "agent_a") + progress.status = QuestStatus.COMPLETED + progress.completed_at = datetime.now(UTC).isoformat() + + mock_invoice = MagicMock() + mock_invoice.payment_hash = "phash" + + with ( + patch("timmy.quest_system.create_invoice_entry", return_value=mock_invoice), + patch("timmy.quest_system.mark_settled"), + ): + claim_quest_reward("q1", "agent_a") + + assert progress.status == QuestStatus.CLAIMED + + def test_repeatable_quest_resets_after_claim(self): + qs._quest_definitions["rep"] = _make_quest( + "rep", repeatable=True, cooldown_hours=0, reward_tokens=10 + ) + progress = get_or_create_progress("rep", "agent_a") + progress.status = QuestStatus.COMPLETED + progress.completed_at = datetime.now(UTC).isoformat() + progress.current_value = 5 + + mock_invoice = MagicMock() + mock_invoice.payment_hash = "phash" + + with ( + patch("timmy.quest_system.create_invoice_entry", return_value=mock_invoice), + patch("timmy.quest_system.mark_settled"), + ): + result = claim_quest_reward("rep", "agent_a") + + assert result is not None + assert progress.status == QuestStatus.NOT_STARTED + assert progress.current_value == 0 + assert progress.completed_at == "" + + def test_on_cooldown_returns_none(self): + qs._quest_definitions["rep"] = _make_quest("rep", repeatable=True, cooldown_hours=24) + progress = get_or_create_progress("rep", "agent_a") + progress.status = QuestStatus.COMPLETED + recent = datetime.now(UTC) - timedelta(hours=1) + progress.last_completed_at = recent.isoformat() + + assert claim_quest_reward("rep", "agent_a") is None + + def test_ledger_error_returns_none(self): + progress = get_or_create_progress("q1", "agent_a") + progress.status = QuestStatus.COMPLETED + progress.completed_at = datetime.now(UTC).isoformat() + + with patch("timmy.quest_system.create_invoice_entry", side_effect=Exception("ledger error")): + result = claim_quest_reward("q1", "agent_a") + + assert result is None + + +# --------------------------------------------------------------------------- +# check_issue_count_quest +# --------------------------------------------------------------------------- + +class TestCheckIssueCountQuest: + def setup_method(self): + qs._quest_definitions["iq"] = _make_quest( + "iq", quest_type=QuestType.ISSUE_COUNT, criteria={"target_count": 2, "issue_labels": ["bug"]} + ) + + def test_counts_matching_issues(self): + issues = [ + {"labels": [{"name": "bug"}]}, + {"labels": [{"name": "bug"}, {"name": "priority"}]}, + {"labels": [{"name": "feature"}]}, # doesn't match + ] + progress = check_issue_count_quest( + qs._quest_definitions["iq"], "agent_a", issues + ) + assert progress.current_value == 2 + assert progress.status == QuestStatus.COMPLETED + + def test_empty_issues_returns_zero(self): + progress = check_issue_count_quest(qs._quest_definitions["iq"], "agent_a", []) + assert progress.current_value == 0 + + def test_no_labels_filter_counts_all_labeled(self): + q = _make_quest( + "nolabel", + quest_type=QuestType.ISSUE_COUNT, + criteria={"target_count": 1, "issue_labels": []}, + ) + qs._quest_definitions["nolabel"] = q + issues = [ + {"labels": [{"name": "bug"}]}, + {"labels": [{"name": "feature"}]}, + ] + progress = check_issue_count_quest(q, "agent_a", issues) + assert progress.current_value == 2 + + +# --------------------------------------------------------------------------- +# check_issue_reduce_quest +# --------------------------------------------------------------------------- + +class TestCheckIssueReduceQuest: + def setup_method(self): + qs._quest_definitions["ir"] = _make_quest( + "ir", quest_type=QuestType.ISSUE_REDUCE, criteria={"target_reduction": 5} + ) + + def test_computes_reduction(self): + progress = check_issue_reduce_quest(qs._quest_definitions["ir"], "agent_a", 20, 15) + assert progress.current_value == 5 + assert progress.status == QuestStatus.COMPLETED + + def test_negative_reduction_treated_as_zero(self): + progress = check_issue_reduce_quest(qs._quest_definitions["ir"], "agent_a", 10, 15) + assert progress.current_value == 0 + + def test_no_change_yields_zero(self): + progress = check_issue_reduce_quest(qs._quest_definitions["ir"], "agent_a", 10, 10) + assert progress.current_value == 0 + + +# --------------------------------------------------------------------------- +# check_daily_run_quest +# --------------------------------------------------------------------------- + +class TestCheckDailyRunQuest: + def setup_method(self): + qs._quest_definitions["dr"] = _make_quest( + "dr", quest_type=QuestType.DAILY_RUN, criteria={"min_sessions": 2} + ) + + def test_tracks_sessions(self): + progress = check_daily_run_quest(qs._quest_definitions["dr"], "agent_a", 2) + assert progress.current_value == 2 + assert progress.status == QuestStatus.COMPLETED + + def test_incomplete_sessions(self): + progress = check_daily_run_quest(qs._quest_definitions["dr"], "agent_a", 1) + assert progress.current_value == 1 + assert progress.status != QuestStatus.COMPLETED + + +# --------------------------------------------------------------------------- +# evaluate_quest_progress +# --------------------------------------------------------------------------- + +class TestEvaluateQuestProgress: + def setup_method(self): + qs._quest_definitions["iq"] = _make_quest( + "iq", quest_type=QuestType.ISSUE_COUNT, criteria={"target_count": 1} + ) + qs._quest_definitions["dis"] = _make_quest("dis", enabled=False) + + def test_disabled_quest_returns_none(self): + result = evaluate_quest_progress("dis", "agent_a", {}) + assert result is None + + def test_missing_quest_returns_none(self): + result = evaluate_quest_progress("nonexistent", "agent_a", {}) + assert result is None + + def test_issue_count_quest_evaluated(self): + context = {"closed_issues": [{"labels": [{"name": "bug"}]}]} + result = evaluate_quest_progress("iq", "agent_a", context) + assert result is not None + assert result.current_value == 1 + + def test_issue_reduce_quest_evaluated(self): + qs._quest_definitions["ir"] = _make_quest( + "ir", quest_type=QuestType.ISSUE_REDUCE, criteria={"target_reduction": 3} + ) + context = {"previous_issue_count": 10, "current_issue_count": 7} + result = evaluate_quest_progress("ir", "agent_a", context) + assert result is not None + assert result.current_value == 3 + + def test_daily_run_quest_evaluated(self): + qs._quest_definitions["dr"] = _make_quest( + "dr", quest_type=QuestType.DAILY_RUN, criteria={"min_sessions": 1} + ) + context = {"sessions_completed": 2} + result = evaluate_quest_progress("dr", "agent_a", context) + assert result is not None + assert result.current_value == 2 + + def test_custom_quest_returns_existing_progress(self): + qs._quest_definitions["cust"] = _make_quest("cust", quest_type=QuestType.CUSTOM) + # No progress yet => None (custom quests don't auto-create progress here) + result = evaluate_quest_progress("cust", "agent_a", {}) + assert result is None + + def test_cooldown_prevents_evaluation(self): + q = _make_quest("rep_iq", quest_type=QuestType.ISSUE_COUNT, repeatable=True, cooldown_hours=24, criteria={"target_count": 1}) + qs._quest_definitions["rep_iq"] = q + progress = get_or_create_progress("rep_iq", "agent_a") + recent = datetime.now(UTC) - timedelta(hours=1) + progress.last_completed_at = recent.isoformat() + + context = {"closed_issues": [{"labels": [{"name": "bug"}]}]} + result = evaluate_quest_progress("rep_iq", "agent_a", context) + # Should return existing progress without updating + assert result is progress + + +# --------------------------------------------------------------------------- +# reset_quest_progress +# --------------------------------------------------------------------------- + +class TestResetQuestProgress: + def setup_method(self): + qs._quest_definitions["q1"] = _make_quest("q1") + qs._quest_definitions["q2"] = _make_quest("q2") + + def test_reset_all(self): + get_or_create_progress("q1", "agent_a") + get_or_create_progress("q2", "agent_a") + count = reset_quest_progress() + assert count == 2 + assert get_quest_progress("q1", "agent_a") is None + assert get_quest_progress("q2", "agent_a") is None + + def test_reset_specific_quest(self): + get_or_create_progress("q1", "agent_a") + get_or_create_progress("q2", "agent_a") + count = reset_quest_progress(quest_id="q1") + assert count == 1 + assert get_quest_progress("q1", "agent_a") is None + assert get_quest_progress("q2", "agent_a") is not None + + def test_reset_specific_agent(self): + get_or_create_progress("q1", "agent_a") + get_or_create_progress("q1", "agent_b") + count = reset_quest_progress(agent_id="agent_a") + assert count == 1 + assert get_quest_progress("q1", "agent_a") is None + assert get_quest_progress("q1", "agent_b") is not None + + def test_reset_specific_quest_and_agent(self): + get_or_create_progress("q1", "agent_a") + get_or_create_progress("q1", "agent_b") + count = reset_quest_progress(quest_id="q1", agent_id="agent_a") + assert count == 1 + + def test_reset_empty_returns_zero(self): + count = reset_quest_progress() + assert count == 0 + + +# --------------------------------------------------------------------------- +# get_quest_leaderboard +# --------------------------------------------------------------------------- + +class TestGetQuestLeaderboard: + def setup_method(self): + qs._quest_definitions["q1"] = _make_quest("q1", reward_tokens=10) + qs._quest_definitions["q2"] = _make_quest("q2", reward_tokens=20) + + def test_empty_progress_returns_empty(self): + assert get_quest_leaderboard() == [] + + def test_leaderboard_sorted_by_tokens(self): + p_a = get_or_create_progress("q1", "agent_a") + p_a.completion_count = 1 + p_b = get_or_create_progress("q2", "agent_b") + p_b.completion_count = 2 + + board = get_quest_leaderboard() + assert board[0]["agent_id"] == "agent_b" # 40 tokens + assert board[1]["agent_id"] == "agent_a" # 10 tokens + + def test_leaderboard_aggregates_multiple_quests(self): + p1 = get_or_create_progress("q1", "agent_a") + p1.completion_count = 2 # 20 tokens + p2 = get_or_create_progress("q2", "agent_a") + p2.completion_count = 1 # 20 tokens + + board = get_quest_leaderboard() + assert len(board) == 1 + assert board[0]["total_tokens"] == 40 + assert board[0]["total_completions"] == 3 + + def test_leaderboard_counts_unique_quests(self): + p1 = get_or_create_progress("q1", "agent_a") + p1.completion_count = 2 + p2 = get_or_create_progress("q2", "agent_a") + p2.completion_count = 1 + + board = get_quest_leaderboard() + assert board[0]["unique_quests_completed"] == 2 + + +# --------------------------------------------------------------------------- +# get_agent_quests_status +# --------------------------------------------------------------------------- + +class TestGetAgentQuestsStatus: + def setup_method(self): + qs._quest_definitions["q1"] = _make_quest("q1", reward_tokens=10) + + def test_returns_status_structure(self): + result = get_agent_quests_status("agent_a") + assert result["agent_id"] == "agent_a" + assert isinstance(result["quests"], list) + assert "total_tokens_earned" in result + assert "total_quests_completed" in result + assert "active_quests_count" in result + + def test_includes_quest_info(self): + result = get_agent_quests_status("agent_a") + quest_info = result["quests"][0] + assert quest_info["quest_id"] == "q1" + assert quest_info["reward_tokens"] == 10 + assert quest_info["status"] == QuestStatus.NOT_STARTED.value + + def test_accumulates_tokens_from_completions(self): + p = get_or_create_progress("q1", "agent_a") + p.completion_count = 3 + result = get_agent_quests_status("agent_a") + assert result["total_tokens_earned"] == 30 + assert result["total_quests_completed"] == 3 + + def test_cooldown_hours_remaining_calculated(self): + q = _make_quest("qcool", repeatable=True, cooldown_hours=24, reward_tokens=5) + qs._quest_definitions["qcool"] = q + p = get_or_create_progress("qcool", "agent_a") + recent = datetime.now(UTC) - timedelta(hours=2) + p.last_completed_at = recent.isoformat() + p.completion_count = 1 + + result = get_agent_quests_status("agent_a") + qcool_info = next(qi for qi in result["quests"] if qi["quest_id"] == "qcool") + assert qcool_info["on_cooldown"] is True + assert qcool_info["cooldown_hours_remaining"] > 0