- Add token_rules.yaml config defining events, rewards, penalties, and gating thresholds - Create token_rules.py helper module for loading config and computing token deltas - Update orchestrator.py to compute token rewards for Daily Run completion - Add comprehensive tests for token_rules module The token economy is now configurable without code changes: - Events: triage actions, Daily Run completions, PR merges, test fixes - Rewards/penalties per event type - Gating thresholds for sensitive operations - Daily limits per category - Audit settings for transaction logging Fixes #711
525 lines
18 KiB
Python
525 lines
18 KiB
Python
"""Tests for token_rules module."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
# Add timmy_automations to path for imports
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations"))
|
|
|
|
from utils import token_rules as tr
|
|
|
|
|
|
class TestTokenEvent:
|
|
"""Test TokenEvent dataclass."""
|
|
|
|
def test_delta_calculation_reward(self):
|
|
"""Delta is positive for rewards."""
|
|
event = tr.TokenEvent(
|
|
name="test",
|
|
description="Test event",
|
|
reward=10,
|
|
penalty=0,
|
|
category="test",
|
|
)
|
|
assert event.delta == 10
|
|
|
|
def test_delta_calculation_penalty(self):
|
|
"""Delta is negative for penalties."""
|
|
event = tr.TokenEvent(
|
|
name="test",
|
|
description="Test event",
|
|
reward=0,
|
|
penalty=-5,
|
|
category="test",
|
|
)
|
|
assert event.delta == -5
|
|
|
|
def test_delta_calculation_mixed(self):
|
|
"""Delta is net of reward and penalty."""
|
|
event = tr.TokenEvent(
|
|
name="test",
|
|
description="Test event",
|
|
reward=10,
|
|
penalty=-3,
|
|
category="test",
|
|
)
|
|
assert event.delta == 7
|
|
|
|
|
|
class TestTokenRulesLoading:
|
|
"""Test TokenRules configuration loading."""
|
|
|
|
def test_loads_from_yaml_file(self, tmp_path):
|
|
"""Load configuration from YAML file."""
|
|
yaml = pytest.importorskip("yaml")
|
|
|
|
config_file = tmp_path / "token_rules.yaml"
|
|
config_data = {
|
|
"version": "1.0.0-test",
|
|
"events": {
|
|
"test_event": {
|
|
"description": "A test event",
|
|
"reward": 15,
|
|
"category": "test",
|
|
}
|
|
},
|
|
"gating_thresholds": {"test_op": 50},
|
|
"daily_limits": {"test": {"max_earn": 100, "max_spend": 10}},
|
|
"audit": {"log_all_transactions": False},
|
|
}
|
|
config_file.write_text(yaml.dump(config_data))
|
|
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
|
|
assert rules.get_config_version() == "1.0.0-test"
|
|
assert rules.get_delta("test_event") == 15
|
|
assert rules.get_gate_threshold("test_op") == 50
|
|
|
|
def test_fallback_when_yaml_missing(self, tmp_path):
|
|
"""Use fallback defaults when YAML file doesn't exist."""
|
|
config_file = tmp_path / "nonexistent.yaml"
|
|
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
|
|
assert rules.get_config_version() == "fallback"
|
|
# Fallback should have some basic events
|
|
assert rules.get_delta("pr_merged") == 10
|
|
assert rules.get_delta("test_fixed") == 8
|
|
assert rules.get_delta("automation_failure") == -2
|
|
|
|
def test_fallback_when_yaml_not_installed(self, tmp_path):
|
|
"""Use fallback when PyYAML is not installed."""
|
|
with patch.dict(sys.modules, {"yaml": None}):
|
|
config_file = tmp_path / "token_rules.yaml"
|
|
config_file.write_text("version: '1.0.0'")
|
|
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
|
|
assert rules.get_config_version() == "fallback"
|
|
|
|
|
|
class TestTokenRulesGetDelta:
|
|
"""Test get_delta method."""
|
|
|
|
def test_get_delta_existing_event(self, tmp_path):
|
|
"""Get delta for configured event."""
|
|
yaml = pytest.importorskip("yaml")
|
|
|
|
config_file = tmp_path / "token_rules.yaml"
|
|
config_data = {
|
|
"version": "1.0.0",
|
|
"events": {
|
|
"pr_merged": {"description": "PR merged", "reward": 10, "category": "merge"},
|
|
"automation_failure": {"description": "Failure", "penalty": -2, "category": "ops"},
|
|
},
|
|
}
|
|
config_file.write_text(yaml.dump(config_data))
|
|
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
|
|
assert rules.get_delta("pr_merged") == 10
|
|
assert rules.get_delta("automation_failure") == -2
|
|
|
|
def test_get_delta_unknown_event(self, tmp_path):
|
|
"""Return 0 for unknown events."""
|
|
config_file = tmp_path / "nonexistent.yaml"
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
|
|
assert rules.get_delta("unknown_event") == 0
|
|
|
|
|
|
class TestTokenRulesGetEvent:
|
|
"""Test get_event method."""
|
|
|
|
def test_get_event_returns_full_config(self, tmp_path):
|
|
"""Get full event configuration."""
|
|
yaml = pytest.importorskip("yaml")
|
|
|
|
config_file = tmp_path / "token_rules.yaml"
|
|
config_data = {
|
|
"version": "1.0.0",
|
|
"events": {
|
|
"pr_merged": {
|
|
"description": "PR merged successfully",
|
|
"reward": 10,
|
|
"category": "merge",
|
|
"gate_threshold": 0,
|
|
}
|
|
},
|
|
}
|
|
config_file.write_text(yaml.dump(config_data))
|
|
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
event = rules.get_event("pr_merged")
|
|
|
|
assert event is not None
|
|
assert event.name == "pr_merged"
|
|
assert event.description == "PR merged successfully"
|
|
assert event.reward == 10
|
|
assert event.category == "merge"
|
|
assert event.gate_threshold == 0
|
|
|
|
def test_get_event_unknown_returns_none(self, tmp_path):
|
|
"""Return None for unknown event."""
|
|
config_file = tmp_path / "nonexistent.yaml"
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
|
|
assert rules.get_event("unknown") is None
|
|
|
|
|
|
class TestTokenRulesListEvents:
|
|
"""Test list_events method."""
|
|
|
|
def test_list_all_events(self, tmp_path):
|
|
"""List all configured events."""
|
|
yaml = pytest.importorskip("yaml")
|
|
|
|
config_file = tmp_path / "token_rules.yaml"
|
|
config_data = {
|
|
"version": "1.0.0",
|
|
"events": {
|
|
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
|
|
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
|
|
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
|
|
},
|
|
}
|
|
config_file.write_text(yaml.dump(config_data))
|
|
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
events = rules.list_events()
|
|
|
|
assert len(events) == 3
|
|
event_names = {e.name for e in events}
|
|
assert "event_a" in event_names
|
|
assert "event_b" in event_names
|
|
assert "event_c" in event_names
|
|
|
|
def test_list_events_by_category(self, tmp_path):
|
|
"""Filter events by category."""
|
|
yaml = pytest.importorskip("yaml")
|
|
|
|
config_file = tmp_path / "token_rules.yaml"
|
|
config_data = {
|
|
"version": "1.0.0",
|
|
"events": {
|
|
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
|
|
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
|
|
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
|
|
},
|
|
}
|
|
config_file.write_text(yaml.dump(config_data))
|
|
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
events = rules.list_events(category="cat1")
|
|
|
|
assert len(events) == 2
|
|
for event in events:
|
|
assert event.category == "cat1"
|
|
|
|
|
|
class TestTokenRulesGating:
|
|
"""Test gating threshold methods."""
|
|
|
|
def test_check_gate_with_threshold(self, tmp_path):
|
|
"""Check gate when threshold is defined."""
|
|
yaml = pytest.importorskip("yaml")
|
|
|
|
config_file = tmp_path / "token_rules.yaml"
|
|
config_data = {
|
|
"version": "1.0.0",
|
|
"events": {},
|
|
"gating_thresholds": {"pr_merge": 50},
|
|
}
|
|
config_file.write_text(yaml.dump(config_data))
|
|
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
|
|
assert rules.check_gate("pr_merge", current_tokens=100) is True
|
|
assert rules.check_gate("pr_merge", current_tokens=50) is True
|
|
assert rules.check_gate("pr_merge", current_tokens=49) is False
|
|
assert rules.check_gate("pr_merge", current_tokens=0) is False
|
|
|
|
def test_check_gate_no_threshold(self, tmp_path):
|
|
"""Check gate when no threshold is defined (always allowed)."""
|
|
yaml = pytest.importorskip("yaml")
|
|
|
|
config_file = tmp_path / "token_rules.yaml"
|
|
config_data = {
|
|
"version": "1.0.0",
|
|
"events": {},
|
|
"gating_thresholds": {},
|
|
}
|
|
config_file.write_text(yaml.dump(config_data))
|
|
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
|
|
# No threshold defined, should always be allowed
|
|
assert rules.check_gate("unknown_op", current_tokens=0) is True
|
|
assert rules.check_gate("unknown_op", current_tokens=-100) is True
|
|
|
|
def test_get_gate_threshold(self, tmp_path):
|
|
"""Get threshold value."""
|
|
yaml = pytest.importorskip("yaml")
|
|
|
|
config_file = tmp_path / "token_rules.yaml"
|
|
config_data = {
|
|
"version": "1.0.0",
|
|
"gating_thresholds": {"pr_merge": 50, "sensitive_op": 100},
|
|
}
|
|
config_file.write_text(yaml.dump(config_data))
|
|
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
|
|
assert rules.get_gate_threshold("pr_merge") == 50
|
|
assert rules.get_gate_threshold("sensitive_op") == 100
|
|
assert rules.get_gate_threshold("unknown") is None
|
|
|
|
|
|
class TestTokenRulesDailyLimits:
|
|
"""Test daily limits methods."""
|
|
|
|
def test_get_daily_limits(self, tmp_path):
|
|
"""Get daily limits for a category."""
|
|
yaml = pytest.importorskip("yaml")
|
|
|
|
config_file = tmp_path / "token_rules.yaml"
|
|
config_data = {
|
|
"version": "1.0.0",
|
|
"daily_limits": {
|
|
"triage": {"max_earn": 100, "max_spend": 0},
|
|
"merge": {"max_earn": 50, "max_spend": 10},
|
|
},
|
|
}
|
|
config_file.write_text(yaml.dump(config_data))
|
|
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
|
|
triage_limits = rules.get_daily_limits("triage")
|
|
assert triage_limits is not None
|
|
assert triage_limits.max_earn == 100
|
|
assert triage_limits.max_spend == 0
|
|
|
|
merge_limits = rules.get_daily_limits("merge")
|
|
assert merge_limits is not None
|
|
assert merge_limits.max_earn == 50
|
|
assert merge_limits.max_spend == 10
|
|
|
|
def test_get_daily_limits_unknown(self, tmp_path):
|
|
"""Return None for unknown category."""
|
|
yaml = pytest.importorskip("yaml")
|
|
|
|
config_file = tmp_path / "token_rules.yaml"
|
|
config_data = {"version": "1.0.0", "daily_limits": {}}
|
|
config_file.write_text(yaml.dump(config_data))
|
|
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
|
|
assert rules.get_daily_limits("unknown") is None
|
|
|
|
|
|
class TestTokenRulesComputeTransaction:
|
|
"""Test compute_transaction method."""
|
|
|
|
def test_compute_successful_transaction(self, tmp_path):
|
|
"""Compute transaction for valid event."""
|
|
yaml = pytest.importorskip("yaml")
|
|
|
|
config_file = tmp_path / "token_rules.yaml"
|
|
config_data = {
|
|
"version": "1.0.0",
|
|
"events": {
|
|
"pr_merged": {"description": "PR merged", "reward": 10, "category": "merge"}
|
|
},
|
|
}
|
|
config_file.write_text(yaml.dump(config_data))
|
|
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
result = rules.compute_transaction("pr_merged", current_tokens=100)
|
|
|
|
assert result["event"] == "pr_merged"
|
|
assert result["delta"] == 10
|
|
assert result["category"] == "merge"
|
|
assert result["allowed"] is True
|
|
assert result["new_balance"] == 110
|
|
assert result["limit_reached"] is False
|
|
|
|
def test_compute_unknown_event(self, tmp_path):
|
|
"""Compute transaction for unknown event."""
|
|
config_file = tmp_path / "nonexistent.yaml"
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
result = rules.compute_transaction("unknown_event", current_tokens=50)
|
|
|
|
assert result["event"] == "unknown_event"
|
|
assert result["delta"] == 0
|
|
assert result["allowed"] is False
|
|
assert result["reason"] == "unknown_event"
|
|
assert result["new_balance"] == 50
|
|
|
|
def test_compute_with_gate_check(self, tmp_path):
|
|
"""Compute transaction respects gating."""
|
|
yaml = pytest.importorskip("yaml")
|
|
|
|
config_file = tmp_path / "token_rules.yaml"
|
|
config_data = {
|
|
"version": "1.0.0",
|
|
"events": {
|
|
"sensitive_op": {
|
|
"description": "Sensitive",
|
|
"reward": 50,
|
|
"category": "sensitive",
|
|
"gate_threshold": 100,
|
|
}
|
|
},
|
|
}
|
|
config_file.write_text(yaml.dump(config_data))
|
|
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
|
|
# With enough tokens
|
|
result = rules.compute_transaction("sensitive_op", current_tokens=150)
|
|
assert result["allowed"] is True
|
|
|
|
# Without enough tokens
|
|
result = rules.compute_transaction("sensitive_op", current_tokens=50)
|
|
assert result["allowed"] is False
|
|
assert "gate_reason" in result
|
|
|
|
def test_compute_with_daily_limits(self, tmp_path):
|
|
"""Compute transaction respects daily limits."""
|
|
yaml = pytest.importorskip("yaml")
|
|
|
|
config_file = tmp_path / "token_rules.yaml"
|
|
config_data = {
|
|
"version": "1.0.0",
|
|
"events": {
|
|
"triage_action": {
|
|
"description": "Triage",
|
|
"reward": 20,
|
|
"category": "triage",
|
|
}
|
|
},
|
|
"daily_limits": {"triage": {"max_earn": 50, "max_spend": 0}},
|
|
}
|
|
config_file.write_text(yaml.dump(config_data))
|
|
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
|
|
# Within limit
|
|
daily_earned = {"triage": 20}
|
|
result = rules.compute_transaction(
|
|
"triage_action", current_tokens=100, current_daily_earned=daily_earned
|
|
)
|
|
assert result["allowed"] is True
|
|
assert result["limit_reached"] is False
|
|
|
|
# Would exceed limit (20 + 20 > 50 is false, so this should be fine)
|
|
# Let's test with higher current earned
|
|
daily_earned = {"triage": 40}
|
|
result = rules.compute_transaction(
|
|
"triage_action", current_tokens=100, current_daily_earned=daily_earned
|
|
)
|
|
assert result["allowed"] is False
|
|
assert result["limit_reached"] is True
|
|
assert "limit_reason" in result
|
|
|
|
|
|
class TestTokenRulesCategories:
|
|
"""Test category methods."""
|
|
|
|
def test_get_categories(self, tmp_path):
|
|
"""Get all unique categories."""
|
|
yaml = pytest.importorskip("yaml")
|
|
|
|
config_file = tmp_path / "token_rules.yaml"
|
|
config_data = {
|
|
"version": "1.0.0",
|
|
"events": {
|
|
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
|
|
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
|
|
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
|
|
},
|
|
}
|
|
config_file.write_text(yaml.dump(config_data))
|
|
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
categories = rules.get_categories()
|
|
|
|
assert sorted(categories) == ["cat1", "cat2"]
|
|
|
|
|
|
class TestTokenRulesAudit:
|
|
"""Test audit methods."""
|
|
|
|
def test_is_auditable_true(self, tmp_path):
|
|
"""Check if auditable when enabled."""
|
|
yaml = pytest.importorskip("yaml")
|
|
|
|
config_file = tmp_path / "token_rules.yaml"
|
|
config_data = {"version": "1.0.0", "audit": {"log_all_transactions": True}}
|
|
config_file.write_text(yaml.dump(config_data))
|
|
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
assert rules.is_auditable() is True
|
|
|
|
def test_is_auditable_false(self, tmp_path):
|
|
"""Check if auditable when disabled."""
|
|
yaml = pytest.importorskip("yaml")
|
|
|
|
config_file = tmp_path / "token_rules.yaml"
|
|
config_data = {"version": "1.0.0", "audit": {"log_all_transactions": False}}
|
|
config_file.write_text(yaml.dump(config_data))
|
|
|
|
rules = tr.TokenRules(config_path=config_file)
|
|
assert rules.is_auditable() is False
|
|
|
|
|
|
class TestConvenienceFunctions:
|
|
"""Test module-level convenience functions."""
|
|
|
|
def test_get_token_delta(self, tmp_path):
|
|
"""Convenience function returns delta."""
|
|
config_file = tmp_path / "nonexistent.yaml"
|
|
|
|
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
|
delta = tr.get_token_delta("pr_merged")
|
|
assert delta == 10 # From fallback
|
|
|
|
def test_check_operation_gate(self, tmp_path):
|
|
"""Convenience function checks gate."""
|
|
config_file = tmp_path / "nonexistent.yaml"
|
|
|
|
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
|
# Fallback has pr_merge gate at 0
|
|
assert tr.check_operation_gate("pr_merge", current_tokens=0) is True
|
|
assert tr.check_operation_gate("pr_merge", current_tokens=100) is True
|
|
|
|
def test_compute_token_reward(self, tmp_path):
|
|
"""Convenience function computes reward."""
|
|
config_file = tmp_path / "nonexistent.yaml"
|
|
|
|
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
|
result = tr.compute_token_reward("pr_merged", current_tokens=50)
|
|
assert result["event"] == "pr_merged"
|
|
assert result["delta"] == 10
|
|
assert result["new_balance"] == 60
|
|
|
|
def test_list_token_events(self, tmp_path):
|
|
"""Convenience function lists events."""
|
|
config_file = tmp_path / "nonexistent.yaml"
|
|
|
|
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
|
events = tr.list_token_events()
|
|
assert len(events) >= 3 # Fallback has at least 3 events
|
|
|
|
# Check structure
|
|
for event in events:
|
|
assert "name" in event
|
|
assert "description" in event
|
|
assert "delta" in event
|
|
assert "category" in event
|