Extract 1543-line monolith into 5 focused modules: - memory/db.py (213 lines): DB connection, schema, migrations, data classes - memory/crud.py (395 lines): CRUD operations, personal facts, reflections - memory/semantic.py (300 lines): SemanticMemory + MemorySearcher - memory/consolidation.py (301 lines): HotMemory + VaultMemory - memory/tools.py (253 lines): Tool functions (search, read, store, forget) memory_system.py (177 lines) remains as thin orchestrator with full re-exports — all existing imports continue to work unchanged. No module exceeds 400 lines. All 647 unit tests pass.
839 lines
30 KiB
Python
839 lines
30 KiB
Python
"""Unit tests for timmy.quest_system."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import UTC, datetime, timedelta
|
|
from typing import Any
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
import timmy.quest_system as qs
|
|
from timmy.quest_system import (
|
|
QuestDefinition,
|
|
QuestProgress,
|
|
QuestStatus,
|
|
QuestType,
|
|
_get_progress_key,
|
|
_get_target_value,
|
|
_is_on_cooldown,
|
|
check_daily_run_quest,
|
|
check_issue_count_quest,
|
|
check_issue_reduce_quest,
|
|
claim_quest_reward,
|
|
evaluate_quest_progress,
|
|
get_active_quests,
|
|
get_agent_quests_status,
|
|
get_or_create_progress,
|
|
get_quest_definition,
|
|
get_quest_definitions,
|
|
get_quest_leaderboard,
|
|
get_quest_progress,
|
|
load_quest_config,
|
|
reset_quest_progress,
|
|
update_quest_progress,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_quest(
|
|
quest_id: str = "test_quest",
|
|
quest_type: QuestType = QuestType.ISSUE_COUNT,
|
|
reward_tokens: int = 10,
|
|
enabled: bool = True,
|
|
repeatable: bool = False,
|
|
cooldown_hours: int = 0,
|
|
criteria: dict[str, Any] | None = None,
|
|
) -> QuestDefinition:
|
|
return QuestDefinition(
|
|
id=quest_id,
|
|
name=f"Quest {quest_id}",
|
|
description="Test quest",
|
|
reward_tokens=reward_tokens,
|
|
quest_type=quest_type,
|
|
enabled=enabled,
|
|
repeatable=repeatable,
|
|
cooldown_hours=cooldown_hours,
|
|
criteria=criteria or {"target_count": 3},
|
|
notification_message="Quest Complete! You earned {tokens} tokens.",
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def clean_state():
|
|
"""Reset module-level state before and after each test."""
|
|
reset_quest_progress()
|
|
qs._quest_definitions.clear()
|
|
qs._quest_settings.clear()
|
|
yield
|
|
reset_quest_progress()
|
|
qs._quest_definitions.clear()
|
|
qs._quest_settings.clear()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# QuestDefinition
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestQuestDefinition:
|
|
def test_from_dict_minimal(self):
|
|
data = {"id": "q1"}
|
|
defn = QuestDefinition.from_dict(data)
|
|
assert defn.id == "q1"
|
|
assert defn.name == "Unnamed Quest"
|
|
assert defn.reward_tokens == 0
|
|
assert defn.quest_type == QuestType.CUSTOM
|
|
assert defn.enabled is True
|
|
assert defn.repeatable is False
|
|
assert defn.cooldown_hours == 0
|
|
|
|
def test_from_dict_full(self):
|
|
data = {
|
|
"id": "q2",
|
|
"name": "Full Quest",
|
|
"description": "A full quest",
|
|
"reward_tokens": 50,
|
|
"type": "issue_count",
|
|
"enabled": False,
|
|
"repeatable": True,
|
|
"cooldown_hours": 24,
|
|
"criteria": {"target_count": 5},
|
|
"notification_message": "You earned {tokens}!",
|
|
}
|
|
defn = QuestDefinition.from_dict(data)
|
|
assert defn.id == "q2"
|
|
assert defn.name == "Full Quest"
|
|
assert defn.reward_tokens == 50
|
|
assert defn.quest_type == QuestType.ISSUE_COUNT
|
|
assert defn.enabled is False
|
|
assert defn.repeatable is True
|
|
assert defn.cooldown_hours == 24
|
|
assert defn.criteria == {"target_count": 5}
|
|
assert defn.notification_message == "You earned {tokens}!"
|
|
|
|
def test_from_dict_invalid_type_raises(self):
|
|
data = {"id": "q3", "type": "not_a_real_type"}
|
|
with pytest.raises(ValueError):
|
|
QuestDefinition.from_dict(data)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# QuestProgress
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestQuestProgress:
|
|
def test_to_dict_roundtrip(self):
|
|
progress = QuestProgress(
|
|
quest_id="q1",
|
|
agent_id="agent_a",
|
|
status=QuestStatus.IN_PROGRESS,
|
|
current_value=2,
|
|
target_value=5,
|
|
started_at="2026-01-01T00:00:00",
|
|
metadata={"key": "val"},
|
|
)
|
|
d = progress.to_dict()
|
|
assert d["quest_id"] == "q1"
|
|
assert d["agent_id"] == "agent_a"
|
|
assert d["status"] == "in_progress"
|
|
assert d["current_value"] == 2
|
|
assert d["target_value"] == 5
|
|
assert d["metadata"] == {"key": "val"}
|
|
|
|
def test_to_dict_defaults(self):
|
|
progress = QuestProgress(
|
|
quest_id="q1",
|
|
agent_id="agent_a",
|
|
status=QuestStatus.NOT_STARTED,
|
|
)
|
|
d = progress.to_dict()
|
|
assert d["completion_count"] == 0
|
|
assert d["started_at"] == ""
|
|
assert d["completed_at"] == ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _get_progress_key
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_get_progress_key():
|
|
assert _get_progress_key("q1", "agent_a") == "agent_a:q1"
|
|
|
|
|
|
def test_get_progress_key_different_agents():
|
|
key_a = _get_progress_key("q1", "agent_a")
|
|
key_b = _get_progress_key("q1", "agent_b")
|
|
assert key_a != key_b
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# load_quest_config
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestLoadQuestConfig:
|
|
def test_missing_file_returns_empty(self, tmp_path):
|
|
missing = tmp_path / "nonexistent.yaml"
|
|
with patch.object(qs, "QUEST_CONFIG_PATH", missing):
|
|
defs, settings = load_quest_config()
|
|
assert defs == {}
|
|
assert settings == {}
|
|
|
|
def test_valid_yaml_loads_quests(self, tmp_path):
|
|
config_path = tmp_path / "quests.yaml"
|
|
config_path.write_text(
|
|
"""
|
|
quests:
|
|
first_quest:
|
|
name: First Quest
|
|
description: Do stuff
|
|
reward_tokens: 25
|
|
type: issue_count
|
|
enabled: true
|
|
repeatable: false
|
|
cooldown_hours: 0
|
|
criteria:
|
|
target_count: 3
|
|
notification_message: "Done! {tokens} tokens"
|
|
settings:
|
|
some_setting: true
|
|
"""
|
|
)
|
|
with patch.object(qs, "QUEST_CONFIG_PATH", config_path):
|
|
defs, settings = load_quest_config()
|
|
|
|
assert "first_quest" in defs
|
|
assert defs["first_quest"].name == "First Quest"
|
|
assert defs["first_quest"].reward_tokens == 25
|
|
assert settings == {"some_setting": True}
|
|
|
|
def test_invalid_yaml_returns_empty(self, tmp_path):
|
|
config_path = tmp_path / "quests.yaml"
|
|
config_path.write_text(":: not valid yaml ::")
|
|
with patch.object(qs, "QUEST_CONFIG_PATH", config_path):
|
|
defs, settings = load_quest_config()
|
|
assert defs == {}
|
|
assert settings == {}
|
|
|
|
def test_non_dict_yaml_returns_empty(self, tmp_path):
|
|
config_path = tmp_path / "quests.yaml"
|
|
config_path.write_text("- item1\n- item2\n")
|
|
with patch.object(qs, "QUEST_CONFIG_PATH", config_path):
|
|
defs, settings = load_quest_config()
|
|
assert defs == {}
|
|
assert settings == {}
|
|
|
|
def test_bad_quest_entry_is_skipped(self, tmp_path):
|
|
config_path = tmp_path / "quests.yaml"
|
|
config_path.write_text(
|
|
"""
|
|
quests:
|
|
good_quest:
|
|
name: Good
|
|
type: issue_count
|
|
reward_tokens: 10
|
|
enabled: true
|
|
repeatable: false
|
|
cooldown_hours: 0
|
|
criteria: {}
|
|
notification_message: "{tokens}"
|
|
bad_quest:
|
|
type: invalid_type_that_does_not_exist
|
|
"""
|
|
)
|
|
with patch.object(qs, "QUEST_CONFIG_PATH", config_path):
|
|
defs, _ = load_quest_config()
|
|
assert "good_quest" in defs
|
|
assert "bad_quest" not in defs
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_quest_definitions / get_quest_definition / get_active_quests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestQuestLookup:
|
|
def setup_method(self):
|
|
q1 = _make_quest("q1", enabled=True)
|
|
q2 = _make_quest("q2", enabled=False)
|
|
qs._quest_definitions.update({"q1": q1, "q2": q2})
|
|
|
|
def test_get_quest_definitions_returns_all(self):
|
|
defs = get_quest_definitions()
|
|
assert "q1" in defs
|
|
assert "q2" in defs
|
|
|
|
def test_get_quest_definition_found(self):
|
|
defn = get_quest_definition("q1")
|
|
assert defn is not None
|
|
assert defn.id == "q1"
|
|
|
|
def test_get_quest_definition_not_found(self):
|
|
assert get_quest_definition("missing") is None
|
|
|
|
def test_get_active_quests_only_enabled(self):
|
|
active = get_active_quests()
|
|
ids = [q.id for q in active]
|
|
assert "q1" in ids
|
|
assert "q2" not in ids
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _get_target_value
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetTargetValue:
|
|
def test_issue_count(self):
|
|
q = _make_quest(quest_type=QuestType.ISSUE_COUNT, criteria={"target_count": 7})
|
|
assert _get_target_value(q) == 7
|
|
|
|
def test_issue_reduce(self):
|
|
q = _make_quest(quest_type=QuestType.ISSUE_REDUCE, criteria={"target_reduction": 5})
|
|
assert _get_target_value(q) == 5
|
|
|
|
def test_daily_run(self):
|
|
q = _make_quest(quest_type=QuestType.DAILY_RUN, criteria={"min_sessions": 3})
|
|
assert _get_target_value(q) == 3
|
|
|
|
def test_docs_update(self):
|
|
q = _make_quest(quest_type=QuestType.DOCS_UPDATE, criteria={"min_files_changed": 2})
|
|
assert _get_target_value(q) == 2
|
|
|
|
def test_test_improve(self):
|
|
q = _make_quest(quest_type=QuestType.TEST_IMPROVE, criteria={"min_new_tests": 4})
|
|
assert _get_target_value(q) == 4
|
|
|
|
def test_custom_defaults_to_one(self):
|
|
q = _make_quest(quest_type=QuestType.CUSTOM, criteria={})
|
|
assert _get_target_value(q) == 1
|
|
|
|
def test_missing_criteria_key_defaults_to_one(self):
|
|
q = _make_quest(quest_type=QuestType.ISSUE_COUNT, criteria={})
|
|
assert _get_target_value(q) == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_or_create_progress / get_quest_progress
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestProgressCreation:
|
|
def setup_method(self):
|
|
qs._quest_definitions["q1"] = _make_quest("q1", criteria={"target_count": 5})
|
|
|
|
def test_creates_new_progress(self):
|
|
progress = get_or_create_progress("q1", "agent_a")
|
|
assert progress.quest_id == "q1"
|
|
assert progress.agent_id == "agent_a"
|
|
assert progress.status == QuestStatus.NOT_STARTED
|
|
assert progress.target_value == 5
|
|
assert progress.current_value == 0
|
|
|
|
def test_returns_existing_progress(self):
|
|
p1 = get_or_create_progress("q1", "agent_a")
|
|
p1.current_value = 3
|
|
p2 = get_or_create_progress("q1", "agent_a")
|
|
assert p2.current_value == 3
|
|
assert p1 is p2
|
|
|
|
def test_raises_for_unknown_quest(self):
|
|
with pytest.raises(ValueError, match="Quest unknown not found"):
|
|
get_or_create_progress("unknown", "agent_a")
|
|
|
|
def test_get_quest_progress_none_before_creation(self):
|
|
assert get_quest_progress("q1", "agent_a") is None
|
|
|
|
def test_get_quest_progress_after_creation(self):
|
|
get_or_create_progress("q1", "agent_a")
|
|
progress = get_quest_progress("q1", "agent_a")
|
|
assert progress is not None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# update_quest_progress
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestUpdateQuestProgress:
|
|
def setup_method(self):
|
|
qs._quest_definitions["q1"] = _make_quest("q1", criteria={"target_count": 3})
|
|
|
|
def test_updates_current_value(self):
|
|
progress = update_quest_progress("q1", "agent_a", 2)
|
|
assert progress.current_value == 2
|
|
assert progress.status == QuestStatus.NOT_STARTED
|
|
|
|
def test_marks_completed_when_target_reached(self):
|
|
progress = update_quest_progress("q1", "agent_a", 3)
|
|
assert progress.status == QuestStatus.COMPLETED
|
|
assert progress.completed_at != ""
|
|
|
|
def test_marks_completed_when_value_exceeds_target(self):
|
|
progress = update_quest_progress("q1", "agent_a", 10)
|
|
assert progress.status == QuestStatus.COMPLETED
|
|
|
|
def test_does_not_re_complete_already_completed(self):
|
|
p = update_quest_progress("q1", "agent_a", 3)
|
|
first_completed_at = p.completed_at
|
|
p2 = update_quest_progress("q1", "agent_a", 5)
|
|
# should not change completed_at again
|
|
assert p2.completed_at == first_completed_at
|
|
|
|
def test_does_not_re_complete_claimed_quest(self):
|
|
p = update_quest_progress("q1", "agent_a", 3)
|
|
p.status = QuestStatus.CLAIMED
|
|
p2 = update_quest_progress("q1", "agent_a", 5)
|
|
assert p2.status == QuestStatus.CLAIMED
|
|
|
|
def test_updates_metadata(self):
|
|
progress = update_quest_progress("q1", "agent_a", 1, metadata={"info": "value"})
|
|
assert progress.metadata["info"] == "value"
|
|
|
|
def test_merges_metadata(self):
|
|
update_quest_progress("q1", "agent_a", 1, metadata={"a": 1})
|
|
progress = update_quest_progress("q1", "agent_a", 2, metadata={"b": 2})
|
|
assert progress.metadata["a"] == 1
|
|
assert progress.metadata["b"] == 2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _is_on_cooldown
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestIsOnCooldown:
|
|
def test_non_repeatable_never_on_cooldown(self):
|
|
quest = _make_quest(repeatable=False, cooldown_hours=24)
|
|
progress = QuestProgress(
|
|
quest_id="q1",
|
|
agent_id="agent_a",
|
|
status=QuestStatus.CLAIMED,
|
|
last_completed_at=datetime.now(UTC).isoformat(),
|
|
)
|
|
assert _is_on_cooldown(progress, quest) is False
|
|
|
|
def test_no_last_completed_not_on_cooldown(self):
|
|
quest = _make_quest(repeatable=True, cooldown_hours=24)
|
|
progress = QuestProgress(
|
|
quest_id="q1",
|
|
agent_id="agent_a",
|
|
status=QuestStatus.NOT_STARTED,
|
|
last_completed_at="",
|
|
)
|
|
assert _is_on_cooldown(progress, quest) is False
|
|
|
|
def test_zero_cooldown_not_on_cooldown(self):
|
|
quest = _make_quest(repeatable=True, cooldown_hours=0)
|
|
progress = QuestProgress(
|
|
quest_id="q1",
|
|
agent_id="agent_a",
|
|
status=QuestStatus.CLAIMED,
|
|
last_completed_at=datetime.now(UTC).isoformat(),
|
|
)
|
|
assert _is_on_cooldown(progress, quest) is False
|
|
|
|
def test_recent_completion_is_on_cooldown(self):
|
|
quest = _make_quest(repeatable=True, cooldown_hours=24)
|
|
recent = datetime.now(UTC) - timedelta(hours=1)
|
|
progress = QuestProgress(
|
|
quest_id="q1",
|
|
agent_id="agent_a",
|
|
status=QuestStatus.NOT_STARTED,
|
|
last_completed_at=recent.isoformat(),
|
|
)
|
|
assert _is_on_cooldown(progress, quest) is True
|
|
|
|
def test_expired_cooldown_not_on_cooldown(self):
|
|
quest = _make_quest(repeatable=True, cooldown_hours=24)
|
|
old = datetime.now(UTC) - timedelta(hours=25)
|
|
progress = QuestProgress(
|
|
quest_id="q1",
|
|
agent_id="agent_a",
|
|
status=QuestStatus.NOT_STARTED,
|
|
last_completed_at=old.isoformat(),
|
|
)
|
|
assert _is_on_cooldown(progress, quest) is False
|
|
|
|
def test_invalid_last_completed_returns_false(self):
|
|
quest = _make_quest(repeatable=True, cooldown_hours=24)
|
|
progress = QuestProgress(
|
|
quest_id="q1",
|
|
agent_id="agent_a",
|
|
status=QuestStatus.NOT_STARTED,
|
|
last_completed_at="not-a-date",
|
|
)
|
|
assert _is_on_cooldown(progress, quest) is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# claim_quest_reward
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestClaimQuestReward:
|
|
def setup_method(self):
|
|
qs._quest_definitions["q1"] = _make_quest("q1", reward_tokens=25)
|
|
|
|
def test_returns_none_if_no_progress(self):
|
|
assert claim_quest_reward("q1", "agent_a") is None
|
|
|
|
def test_returns_none_if_not_completed(self):
|
|
get_or_create_progress("q1", "agent_a")
|
|
assert claim_quest_reward("q1", "agent_a") is None
|
|
|
|
def test_returns_none_if_quest_not_found(self):
|
|
assert claim_quest_reward("nonexistent", "agent_a") is None
|
|
|
|
def test_successful_claim(self):
|
|
progress = get_or_create_progress("q1", "agent_a")
|
|
progress.status = QuestStatus.COMPLETED
|
|
progress.completed_at = datetime.now(UTC).isoformat()
|
|
|
|
mock_invoice = MagicMock()
|
|
mock_invoice.payment_hash = "quest_q1_agent_a_123"
|
|
|
|
with (
|
|
patch("timmy.quest_system.create_invoice_entry", return_value=mock_invoice),
|
|
patch("timmy.quest_system.mark_settled"),
|
|
):
|
|
result = claim_quest_reward("q1", "agent_a")
|
|
|
|
assert result is not None
|
|
assert result["tokens_awarded"] == 25
|
|
assert result["quest_id"] == "q1"
|
|
assert result["agent_id"] == "agent_a"
|
|
assert result["completion_count"] == 1
|
|
|
|
def test_successful_claim_marks_claimed(self):
|
|
progress = get_or_create_progress("q1", "agent_a")
|
|
progress.status = QuestStatus.COMPLETED
|
|
progress.completed_at = datetime.now(UTC).isoformat()
|
|
|
|
mock_invoice = MagicMock()
|
|
mock_invoice.payment_hash = "phash"
|
|
|
|
with (
|
|
patch("timmy.quest_system.create_invoice_entry", return_value=mock_invoice),
|
|
patch("timmy.quest_system.mark_settled"),
|
|
):
|
|
claim_quest_reward("q1", "agent_a")
|
|
|
|
assert progress.status == QuestStatus.CLAIMED
|
|
|
|
def test_repeatable_quest_resets_after_claim(self):
|
|
qs._quest_definitions["rep"] = _make_quest(
|
|
"rep", repeatable=True, cooldown_hours=0, reward_tokens=10
|
|
)
|
|
progress = get_or_create_progress("rep", "agent_a")
|
|
progress.status = QuestStatus.COMPLETED
|
|
progress.completed_at = datetime.now(UTC).isoformat()
|
|
progress.current_value = 5
|
|
|
|
mock_invoice = MagicMock()
|
|
mock_invoice.payment_hash = "phash"
|
|
|
|
with (
|
|
patch("timmy.quest_system.create_invoice_entry", return_value=mock_invoice),
|
|
patch("timmy.quest_system.mark_settled"),
|
|
):
|
|
result = claim_quest_reward("rep", "agent_a")
|
|
|
|
assert result is not None
|
|
assert progress.status == QuestStatus.NOT_STARTED
|
|
assert progress.current_value == 0
|
|
assert progress.completed_at == ""
|
|
|
|
def test_on_cooldown_returns_none(self):
|
|
qs._quest_definitions["rep"] = _make_quest("rep", repeatable=True, cooldown_hours=24)
|
|
progress = get_or_create_progress("rep", "agent_a")
|
|
progress.status = QuestStatus.COMPLETED
|
|
recent = datetime.now(UTC) - timedelta(hours=1)
|
|
progress.last_completed_at = recent.isoformat()
|
|
|
|
assert claim_quest_reward("rep", "agent_a") is None
|
|
|
|
def test_ledger_error_returns_none(self):
|
|
progress = get_or_create_progress("q1", "agent_a")
|
|
progress.status = QuestStatus.COMPLETED
|
|
progress.completed_at = datetime.now(UTC).isoformat()
|
|
|
|
with patch("timmy.quest_system.create_invoice_entry", side_effect=Exception("ledger error")):
|
|
result = claim_quest_reward("q1", "agent_a")
|
|
|
|
assert result is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# check_issue_count_quest
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCheckIssueCountQuest:
|
|
def setup_method(self):
|
|
qs._quest_definitions["iq"] = _make_quest(
|
|
"iq", quest_type=QuestType.ISSUE_COUNT, criteria={"target_count": 2, "issue_labels": ["bug"]}
|
|
)
|
|
|
|
def test_counts_matching_issues(self):
|
|
issues = [
|
|
{"labels": [{"name": "bug"}]},
|
|
{"labels": [{"name": "bug"}, {"name": "priority"}]},
|
|
{"labels": [{"name": "feature"}]}, # doesn't match
|
|
]
|
|
progress = check_issue_count_quest(
|
|
qs._quest_definitions["iq"], "agent_a", issues
|
|
)
|
|
assert progress.current_value == 2
|
|
assert progress.status == QuestStatus.COMPLETED
|
|
|
|
def test_empty_issues_returns_zero(self):
|
|
progress = check_issue_count_quest(qs._quest_definitions["iq"], "agent_a", [])
|
|
assert progress.current_value == 0
|
|
|
|
def test_no_labels_filter_counts_all_labeled(self):
|
|
q = _make_quest(
|
|
"nolabel",
|
|
quest_type=QuestType.ISSUE_COUNT,
|
|
criteria={"target_count": 1, "issue_labels": []},
|
|
)
|
|
qs._quest_definitions["nolabel"] = q
|
|
issues = [
|
|
{"labels": [{"name": "bug"}]},
|
|
{"labels": [{"name": "feature"}]},
|
|
]
|
|
progress = check_issue_count_quest(q, "agent_a", issues)
|
|
assert progress.current_value == 2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# check_issue_reduce_quest
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCheckIssueReduceQuest:
|
|
def setup_method(self):
|
|
qs._quest_definitions["ir"] = _make_quest(
|
|
"ir", quest_type=QuestType.ISSUE_REDUCE, criteria={"target_reduction": 5}
|
|
)
|
|
|
|
def test_computes_reduction(self):
|
|
progress = check_issue_reduce_quest(qs._quest_definitions["ir"], "agent_a", 20, 15)
|
|
assert progress.current_value == 5
|
|
assert progress.status == QuestStatus.COMPLETED
|
|
|
|
def test_negative_reduction_treated_as_zero(self):
|
|
progress = check_issue_reduce_quest(qs._quest_definitions["ir"], "agent_a", 10, 15)
|
|
assert progress.current_value == 0
|
|
|
|
def test_no_change_yields_zero(self):
|
|
progress = check_issue_reduce_quest(qs._quest_definitions["ir"], "agent_a", 10, 10)
|
|
assert progress.current_value == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# check_daily_run_quest
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCheckDailyRunQuest:
|
|
def setup_method(self):
|
|
qs._quest_definitions["dr"] = _make_quest(
|
|
"dr", quest_type=QuestType.DAILY_RUN, criteria={"min_sessions": 2}
|
|
)
|
|
|
|
def test_tracks_sessions(self):
|
|
progress = check_daily_run_quest(qs._quest_definitions["dr"], "agent_a", 2)
|
|
assert progress.current_value == 2
|
|
assert progress.status == QuestStatus.COMPLETED
|
|
|
|
def test_incomplete_sessions(self):
|
|
progress = check_daily_run_quest(qs._quest_definitions["dr"], "agent_a", 1)
|
|
assert progress.current_value == 1
|
|
assert progress.status != QuestStatus.COMPLETED
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# evaluate_quest_progress
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestEvaluateQuestProgress:
|
|
def setup_method(self):
|
|
qs._quest_definitions["iq"] = _make_quest(
|
|
"iq", quest_type=QuestType.ISSUE_COUNT, criteria={"target_count": 1}
|
|
)
|
|
qs._quest_definitions["dis"] = _make_quest("dis", enabled=False)
|
|
|
|
def test_disabled_quest_returns_none(self):
|
|
result = evaluate_quest_progress("dis", "agent_a", {})
|
|
assert result is None
|
|
|
|
def test_missing_quest_returns_none(self):
|
|
result = evaluate_quest_progress("nonexistent", "agent_a", {})
|
|
assert result is None
|
|
|
|
def test_issue_count_quest_evaluated(self):
|
|
context = {"closed_issues": [{"labels": [{"name": "bug"}]}]}
|
|
result = evaluate_quest_progress("iq", "agent_a", context)
|
|
assert result is not None
|
|
assert result.current_value == 1
|
|
|
|
def test_issue_reduce_quest_evaluated(self):
|
|
qs._quest_definitions["ir"] = _make_quest(
|
|
"ir", quest_type=QuestType.ISSUE_REDUCE, criteria={"target_reduction": 3}
|
|
)
|
|
context = {"previous_issue_count": 10, "current_issue_count": 7}
|
|
result = evaluate_quest_progress("ir", "agent_a", context)
|
|
assert result is not None
|
|
assert result.current_value == 3
|
|
|
|
def test_daily_run_quest_evaluated(self):
|
|
qs._quest_definitions["dr"] = _make_quest(
|
|
"dr", quest_type=QuestType.DAILY_RUN, criteria={"min_sessions": 1}
|
|
)
|
|
context = {"sessions_completed": 2}
|
|
result = evaluate_quest_progress("dr", "agent_a", context)
|
|
assert result is not None
|
|
assert result.current_value == 2
|
|
|
|
def test_custom_quest_returns_existing_progress(self):
|
|
qs._quest_definitions["cust"] = _make_quest("cust", quest_type=QuestType.CUSTOM)
|
|
# No progress yet => None (custom quests don't auto-create progress here)
|
|
result = evaluate_quest_progress("cust", "agent_a", {})
|
|
assert result is None
|
|
|
|
def test_cooldown_prevents_evaluation(self):
|
|
q = _make_quest("rep_iq", quest_type=QuestType.ISSUE_COUNT, repeatable=True, cooldown_hours=24, criteria={"target_count": 1})
|
|
qs._quest_definitions["rep_iq"] = q
|
|
progress = get_or_create_progress("rep_iq", "agent_a")
|
|
recent = datetime.now(UTC) - timedelta(hours=1)
|
|
progress.last_completed_at = recent.isoformat()
|
|
|
|
context = {"closed_issues": [{"labels": [{"name": "bug"}]}]}
|
|
result = evaluate_quest_progress("rep_iq", "agent_a", context)
|
|
# Should return existing progress without updating
|
|
assert result is progress
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# reset_quest_progress
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestResetQuestProgress:
|
|
def setup_method(self):
|
|
qs._quest_definitions["q1"] = _make_quest("q1")
|
|
qs._quest_definitions["q2"] = _make_quest("q2")
|
|
|
|
def test_reset_all(self):
|
|
get_or_create_progress("q1", "agent_a")
|
|
get_or_create_progress("q2", "agent_a")
|
|
count = reset_quest_progress()
|
|
assert count == 2
|
|
assert get_quest_progress("q1", "agent_a") is None
|
|
assert get_quest_progress("q2", "agent_a") is None
|
|
|
|
def test_reset_specific_quest(self):
|
|
get_or_create_progress("q1", "agent_a")
|
|
get_or_create_progress("q2", "agent_a")
|
|
count = reset_quest_progress(quest_id="q1")
|
|
assert count == 1
|
|
assert get_quest_progress("q1", "agent_a") is None
|
|
assert get_quest_progress("q2", "agent_a") is not None
|
|
|
|
def test_reset_specific_agent(self):
|
|
get_or_create_progress("q1", "agent_a")
|
|
get_or_create_progress("q1", "agent_b")
|
|
count = reset_quest_progress(agent_id="agent_a")
|
|
assert count == 1
|
|
assert get_quest_progress("q1", "agent_a") is None
|
|
assert get_quest_progress("q1", "agent_b") is not None
|
|
|
|
def test_reset_specific_quest_and_agent(self):
|
|
get_or_create_progress("q1", "agent_a")
|
|
get_or_create_progress("q1", "agent_b")
|
|
count = reset_quest_progress(quest_id="q1", agent_id="agent_a")
|
|
assert count == 1
|
|
|
|
def test_reset_empty_returns_zero(self):
|
|
count = reset_quest_progress()
|
|
assert count == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_quest_leaderboard
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetQuestLeaderboard:
|
|
def setup_method(self):
|
|
qs._quest_definitions["q1"] = _make_quest("q1", reward_tokens=10)
|
|
qs._quest_definitions["q2"] = _make_quest("q2", reward_tokens=20)
|
|
|
|
def test_empty_progress_returns_empty(self):
|
|
assert get_quest_leaderboard() == []
|
|
|
|
def test_leaderboard_sorted_by_tokens(self):
|
|
p_a = get_or_create_progress("q1", "agent_a")
|
|
p_a.completion_count = 1
|
|
p_b = get_or_create_progress("q2", "agent_b")
|
|
p_b.completion_count = 2
|
|
|
|
board = get_quest_leaderboard()
|
|
assert board[0]["agent_id"] == "agent_b" # 40 tokens
|
|
assert board[1]["agent_id"] == "agent_a" # 10 tokens
|
|
|
|
def test_leaderboard_aggregates_multiple_quests(self):
|
|
p1 = get_or_create_progress("q1", "agent_a")
|
|
p1.completion_count = 2 # 20 tokens
|
|
p2 = get_or_create_progress("q2", "agent_a")
|
|
p2.completion_count = 1 # 20 tokens
|
|
|
|
board = get_quest_leaderboard()
|
|
assert len(board) == 1
|
|
assert board[0]["total_tokens"] == 40
|
|
assert board[0]["total_completions"] == 3
|
|
|
|
def test_leaderboard_counts_unique_quests(self):
|
|
p1 = get_or_create_progress("q1", "agent_a")
|
|
p1.completion_count = 2
|
|
p2 = get_or_create_progress("q2", "agent_a")
|
|
p2.completion_count = 1
|
|
|
|
board = get_quest_leaderboard()
|
|
assert board[0]["unique_quests_completed"] == 2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_agent_quests_status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetAgentQuestsStatus:
|
|
def setup_method(self):
|
|
qs._quest_definitions["q1"] = _make_quest("q1", reward_tokens=10)
|
|
|
|
def test_returns_status_structure(self):
|
|
result = get_agent_quests_status("agent_a")
|
|
assert result["agent_id"] == "agent_a"
|
|
assert isinstance(result["quests"], list)
|
|
assert "total_tokens_earned" in result
|
|
assert "total_quests_completed" in result
|
|
assert "active_quests_count" in result
|
|
|
|
def test_includes_quest_info(self):
|
|
result = get_agent_quests_status("agent_a")
|
|
quest_info = result["quests"][0]
|
|
assert quest_info["quest_id"] == "q1"
|
|
assert quest_info["reward_tokens"] == 10
|
|
assert quest_info["status"] == QuestStatus.NOT_STARTED.value
|
|
|
|
def test_accumulates_tokens_from_completions(self):
|
|
p = get_or_create_progress("q1", "agent_a")
|
|
p.completion_count = 3
|
|
result = get_agent_quests_status("agent_a")
|
|
assert result["total_tokens_earned"] == 30
|
|
assert result["total_quests_completed"] == 3
|
|
|
|
def test_cooldown_hours_remaining_calculated(self):
|
|
q = _make_quest("qcool", repeatable=True, cooldown_hours=24, reward_tokens=5)
|
|
qs._quest_definitions["qcool"] = q
|
|
p = get_or_create_progress("qcool", "agent_a")
|
|
recent = datetime.now(UTC) - timedelta(hours=2)
|
|
p.last_completed_at = recent.isoformat()
|
|
p.completion_count = 1
|
|
|
|
result = get_agent_quests_status("agent_a")
|
|
qcool_info = next(qi for qi in result["quests"] if qi["quest_id"] == "qcool")
|
|
assert qcool_info["on_cooldown"] is True
|
|
assert qcool_info["cooldown_hours_remaining"] > 0
|