forked from Rockachopa/Timmy-time-dashboard
Compare commits
1 Commits
main
...
kimi/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d3eb5c31bf |
524
tests/timmy_automations/test_token_rules.py
Normal file
524
tests/timmy_automations/test_token_rules.py
Normal file
@@ -0,0 +1,524 @@
|
|||||||
|
"""Tests for token_rules module."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# Add timmy_automations to path for imports
|
||||||
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations"))
|
||||||
|
|
||||||
|
from utils import token_rules as tr
|
||||||
|
|
||||||
|
|
||||||
|
class TestTokenEvent:
|
||||||
|
"""Test TokenEvent dataclass."""
|
||||||
|
|
||||||
|
def test_delta_calculation_reward(self):
|
||||||
|
"""Delta is positive for rewards."""
|
||||||
|
event = tr.TokenEvent(
|
||||||
|
name="test",
|
||||||
|
description="Test event",
|
||||||
|
reward=10,
|
||||||
|
penalty=0,
|
||||||
|
category="test",
|
||||||
|
)
|
||||||
|
assert event.delta == 10
|
||||||
|
|
||||||
|
def test_delta_calculation_penalty(self):
|
||||||
|
"""Delta is negative for penalties."""
|
||||||
|
event = tr.TokenEvent(
|
||||||
|
name="test",
|
||||||
|
description="Test event",
|
||||||
|
reward=0,
|
||||||
|
penalty=-5,
|
||||||
|
category="test",
|
||||||
|
)
|
||||||
|
assert event.delta == -5
|
||||||
|
|
||||||
|
def test_delta_calculation_mixed(self):
|
||||||
|
"""Delta is net of reward and penalty."""
|
||||||
|
event = tr.TokenEvent(
|
||||||
|
name="test",
|
||||||
|
description="Test event",
|
||||||
|
reward=10,
|
||||||
|
penalty=-3,
|
||||||
|
category="test",
|
||||||
|
)
|
||||||
|
assert event.delta == 7
|
||||||
|
|
||||||
|
|
||||||
|
class TestTokenRulesLoading:
|
||||||
|
"""Test TokenRules configuration loading."""
|
||||||
|
|
||||||
|
def test_loads_from_yaml_file(self, tmp_path):
|
||||||
|
"""Load configuration from YAML file."""
|
||||||
|
yaml = pytest.importorskip("yaml")
|
||||||
|
|
||||||
|
config_file = tmp_path / "token_rules.yaml"
|
||||||
|
config_data = {
|
||||||
|
"version": "1.0.0-test",
|
||||||
|
"events": {
|
||||||
|
"test_event": {
|
||||||
|
"description": "A test event",
|
||||||
|
"reward": 15,
|
||||||
|
"category": "test",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"gating_thresholds": {"test_op": 50},
|
||||||
|
"daily_limits": {"test": {"max_earn": 100, "max_spend": 10}},
|
||||||
|
"audit": {"log_all_transactions": False},
|
||||||
|
}
|
||||||
|
config_file.write_text(yaml.dump(config_data))
|
||||||
|
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
|
||||||
|
assert rules.get_config_version() == "1.0.0-test"
|
||||||
|
assert rules.get_delta("test_event") == 15
|
||||||
|
assert rules.get_gate_threshold("test_op") == 50
|
||||||
|
|
||||||
|
def test_fallback_when_yaml_missing(self, tmp_path):
|
||||||
|
"""Use fallback defaults when YAML file doesn't exist."""
|
||||||
|
config_file = tmp_path / "nonexistent.yaml"
|
||||||
|
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
|
||||||
|
assert rules.get_config_version() == "fallback"
|
||||||
|
# Fallback should have some basic events
|
||||||
|
assert rules.get_delta("pr_merged") == 10
|
||||||
|
assert rules.get_delta("test_fixed") == 8
|
||||||
|
assert rules.get_delta("automation_failure") == -2
|
||||||
|
|
||||||
|
def test_fallback_when_yaml_not_installed(self, tmp_path):
|
||||||
|
"""Use fallback when PyYAML is not installed."""
|
||||||
|
with patch.dict(sys.modules, {"yaml": None}):
|
||||||
|
config_file = tmp_path / "token_rules.yaml"
|
||||||
|
config_file.write_text("version: '1.0.0'")
|
||||||
|
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
|
||||||
|
assert rules.get_config_version() == "fallback"
|
||||||
|
|
||||||
|
|
||||||
|
class TestTokenRulesGetDelta:
|
||||||
|
"""Test get_delta method."""
|
||||||
|
|
||||||
|
def test_get_delta_existing_event(self, tmp_path):
|
||||||
|
"""Get delta for configured event."""
|
||||||
|
yaml = pytest.importorskip("yaml")
|
||||||
|
|
||||||
|
config_file = tmp_path / "token_rules.yaml"
|
||||||
|
config_data = {
|
||||||
|
"version": "1.0.0",
|
||||||
|
"events": {
|
||||||
|
"pr_merged": {"description": "PR merged", "reward": 10, "category": "merge"},
|
||||||
|
"automation_failure": {"description": "Failure", "penalty": -2, "category": "ops"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
config_file.write_text(yaml.dump(config_data))
|
||||||
|
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
|
||||||
|
assert rules.get_delta("pr_merged") == 10
|
||||||
|
assert rules.get_delta("automation_failure") == -2
|
||||||
|
|
||||||
|
def test_get_delta_unknown_event(self, tmp_path):
|
||||||
|
"""Return 0 for unknown events."""
|
||||||
|
config_file = tmp_path / "nonexistent.yaml"
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
|
||||||
|
assert rules.get_delta("unknown_event") == 0
|
||||||
|
|
||||||
|
|
||||||
|
class TestTokenRulesGetEvent:
|
||||||
|
"""Test get_event method."""
|
||||||
|
|
||||||
|
def test_get_event_returns_full_config(self, tmp_path):
|
||||||
|
"""Get full event configuration."""
|
||||||
|
yaml = pytest.importorskip("yaml")
|
||||||
|
|
||||||
|
config_file = tmp_path / "token_rules.yaml"
|
||||||
|
config_data = {
|
||||||
|
"version": "1.0.0",
|
||||||
|
"events": {
|
||||||
|
"pr_merged": {
|
||||||
|
"description": "PR merged successfully",
|
||||||
|
"reward": 10,
|
||||||
|
"category": "merge",
|
||||||
|
"gate_threshold": 0,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
config_file.write_text(yaml.dump(config_data))
|
||||||
|
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
event = rules.get_event("pr_merged")
|
||||||
|
|
||||||
|
assert event is not None
|
||||||
|
assert event.name == "pr_merged"
|
||||||
|
assert event.description == "PR merged successfully"
|
||||||
|
assert event.reward == 10
|
||||||
|
assert event.category == "merge"
|
||||||
|
assert event.gate_threshold == 0
|
||||||
|
|
||||||
|
def test_get_event_unknown_returns_none(self, tmp_path):
|
||||||
|
"""Return None for unknown event."""
|
||||||
|
config_file = tmp_path / "nonexistent.yaml"
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
|
||||||
|
assert rules.get_event("unknown") is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestTokenRulesListEvents:
|
||||||
|
"""Test list_events method."""
|
||||||
|
|
||||||
|
def test_list_all_events(self, tmp_path):
|
||||||
|
"""List all configured events."""
|
||||||
|
yaml = pytest.importorskip("yaml")
|
||||||
|
|
||||||
|
config_file = tmp_path / "token_rules.yaml"
|
||||||
|
config_data = {
|
||||||
|
"version": "1.0.0",
|
||||||
|
"events": {
|
||||||
|
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
|
||||||
|
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
|
||||||
|
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
config_file.write_text(yaml.dump(config_data))
|
||||||
|
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
events = rules.list_events()
|
||||||
|
|
||||||
|
assert len(events) == 3
|
||||||
|
event_names = {e.name for e in events}
|
||||||
|
assert "event_a" in event_names
|
||||||
|
assert "event_b" in event_names
|
||||||
|
assert "event_c" in event_names
|
||||||
|
|
||||||
|
def test_list_events_by_category(self, tmp_path):
|
||||||
|
"""Filter events by category."""
|
||||||
|
yaml = pytest.importorskip("yaml")
|
||||||
|
|
||||||
|
config_file = tmp_path / "token_rules.yaml"
|
||||||
|
config_data = {
|
||||||
|
"version": "1.0.0",
|
||||||
|
"events": {
|
||||||
|
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
|
||||||
|
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
|
||||||
|
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
config_file.write_text(yaml.dump(config_data))
|
||||||
|
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
events = rules.list_events(category="cat1")
|
||||||
|
|
||||||
|
assert len(events) == 2
|
||||||
|
for event in events:
|
||||||
|
assert event.category == "cat1"
|
||||||
|
|
||||||
|
|
||||||
|
class TestTokenRulesGating:
|
||||||
|
"""Test gating threshold methods."""
|
||||||
|
|
||||||
|
def test_check_gate_with_threshold(self, tmp_path):
|
||||||
|
"""Check gate when threshold is defined."""
|
||||||
|
yaml = pytest.importorskip("yaml")
|
||||||
|
|
||||||
|
config_file = tmp_path / "token_rules.yaml"
|
||||||
|
config_data = {
|
||||||
|
"version": "1.0.0",
|
||||||
|
"events": {},
|
||||||
|
"gating_thresholds": {"pr_merge": 50},
|
||||||
|
}
|
||||||
|
config_file.write_text(yaml.dump(config_data))
|
||||||
|
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
|
||||||
|
assert rules.check_gate("pr_merge", current_tokens=100) is True
|
||||||
|
assert rules.check_gate("pr_merge", current_tokens=50) is True
|
||||||
|
assert rules.check_gate("pr_merge", current_tokens=49) is False
|
||||||
|
assert rules.check_gate("pr_merge", current_tokens=0) is False
|
||||||
|
|
||||||
|
def test_check_gate_no_threshold(self, tmp_path):
|
||||||
|
"""Check gate when no threshold is defined (always allowed)."""
|
||||||
|
yaml = pytest.importorskip("yaml")
|
||||||
|
|
||||||
|
config_file = tmp_path / "token_rules.yaml"
|
||||||
|
config_data = {
|
||||||
|
"version": "1.0.0",
|
||||||
|
"events": {},
|
||||||
|
"gating_thresholds": {},
|
||||||
|
}
|
||||||
|
config_file.write_text(yaml.dump(config_data))
|
||||||
|
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
|
||||||
|
# No threshold defined, should always be allowed
|
||||||
|
assert rules.check_gate("unknown_op", current_tokens=0) is True
|
||||||
|
assert rules.check_gate("unknown_op", current_tokens=-100) is True
|
||||||
|
|
||||||
|
def test_get_gate_threshold(self, tmp_path):
|
||||||
|
"""Get threshold value."""
|
||||||
|
yaml = pytest.importorskip("yaml")
|
||||||
|
|
||||||
|
config_file = tmp_path / "token_rules.yaml"
|
||||||
|
config_data = {
|
||||||
|
"version": "1.0.0",
|
||||||
|
"gating_thresholds": {"pr_merge": 50, "sensitive_op": 100},
|
||||||
|
}
|
||||||
|
config_file.write_text(yaml.dump(config_data))
|
||||||
|
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
|
||||||
|
assert rules.get_gate_threshold("pr_merge") == 50
|
||||||
|
assert rules.get_gate_threshold("sensitive_op") == 100
|
||||||
|
assert rules.get_gate_threshold("unknown") is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestTokenRulesDailyLimits:
|
||||||
|
"""Test daily limits methods."""
|
||||||
|
|
||||||
|
def test_get_daily_limits(self, tmp_path):
|
||||||
|
"""Get daily limits for a category."""
|
||||||
|
yaml = pytest.importorskip("yaml")
|
||||||
|
|
||||||
|
config_file = tmp_path / "token_rules.yaml"
|
||||||
|
config_data = {
|
||||||
|
"version": "1.0.0",
|
||||||
|
"daily_limits": {
|
||||||
|
"triage": {"max_earn": 100, "max_spend": 0},
|
||||||
|
"merge": {"max_earn": 50, "max_spend": 10},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
config_file.write_text(yaml.dump(config_data))
|
||||||
|
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
|
||||||
|
triage_limits = rules.get_daily_limits("triage")
|
||||||
|
assert triage_limits is not None
|
||||||
|
assert triage_limits.max_earn == 100
|
||||||
|
assert triage_limits.max_spend == 0
|
||||||
|
|
||||||
|
merge_limits = rules.get_daily_limits("merge")
|
||||||
|
assert merge_limits is not None
|
||||||
|
assert merge_limits.max_earn == 50
|
||||||
|
assert merge_limits.max_spend == 10
|
||||||
|
|
||||||
|
def test_get_daily_limits_unknown(self, tmp_path):
|
||||||
|
"""Return None for unknown category."""
|
||||||
|
yaml = pytest.importorskip("yaml")
|
||||||
|
|
||||||
|
config_file = tmp_path / "token_rules.yaml"
|
||||||
|
config_data = {"version": "1.0.0", "daily_limits": {}}
|
||||||
|
config_file.write_text(yaml.dump(config_data))
|
||||||
|
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
|
||||||
|
assert rules.get_daily_limits("unknown") is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestTokenRulesComputeTransaction:
|
||||||
|
"""Test compute_transaction method."""
|
||||||
|
|
||||||
|
def test_compute_successful_transaction(self, tmp_path):
|
||||||
|
"""Compute transaction for valid event."""
|
||||||
|
yaml = pytest.importorskip("yaml")
|
||||||
|
|
||||||
|
config_file = tmp_path / "token_rules.yaml"
|
||||||
|
config_data = {
|
||||||
|
"version": "1.0.0",
|
||||||
|
"events": {
|
||||||
|
"pr_merged": {"description": "PR merged", "reward": 10, "category": "merge"}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
config_file.write_text(yaml.dump(config_data))
|
||||||
|
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
result = rules.compute_transaction("pr_merged", current_tokens=100)
|
||||||
|
|
||||||
|
assert result["event"] == "pr_merged"
|
||||||
|
assert result["delta"] == 10
|
||||||
|
assert result["category"] == "merge"
|
||||||
|
assert result["allowed"] is True
|
||||||
|
assert result["new_balance"] == 110
|
||||||
|
assert result["limit_reached"] is False
|
||||||
|
|
||||||
|
def test_compute_unknown_event(self, tmp_path):
|
||||||
|
"""Compute transaction for unknown event."""
|
||||||
|
config_file = tmp_path / "nonexistent.yaml"
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
result = rules.compute_transaction("unknown_event", current_tokens=50)
|
||||||
|
|
||||||
|
assert result["event"] == "unknown_event"
|
||||||
|
assert result["delta"] == 0
|
||||||
|
assert result["allowed"] is False
|
||||||
|
assert result["reason"] == "unknown_event"
|
||||||
|
assert result["new_balance"] == 50
|
||||||
|
|
||||||
|
def test_compute_with_gate_check(self, tmp_path):
|
||||||
|
"""Compute transaction respects gating."""
|
||||||
|
yaml = pytest.importorskip("yaml")
|
||||||
|
|
||||||
|
config_file = tmp_path / "token_rules.yaml"
|
||||||
|
config_data = {
|
||||||
|
"version": "1.0.0",
|
||||||
|
"events": {
|
||||||
|
"sensitive_op": {
|
||||||
|
"description": "Sensitive",
|
||||||
|
"reward": 50,
|
||||||
|
"category": "sensitive",
|
||||||
|
"gate_threshold": 100,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
config_file.write_text(yaml.dump(config_data))
|
||||||
|
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
|
||||||
|
# With enough tokens
|
||||||
|
result = rules.compute_transaction("sensitive_op", current_tokens=150)
|
||||||
|
assert result["allowed"] is True
|
||||||
|
|
||||||
|
# Without enough tokens
|
||||||
|
result = rules.compute_transaction("sensitive_op", current_tokens=50)
|
||||||
|
assert result["allowed"] is False
|
||||||
|
assert "gate_reason" in result
|
||||||
|
|
||||||
|
def test_compute_with_daily_limits(self, tmp_path):
|
||||||
|
"""Compute transaction respects daily limits."""
|
||||||
|
yaml = pytest.importorskip("yaml")
|
||||||
|
|
||||||
|
config_file = tmp_path / "token_rules.yaml"
|
||||||
|
config_data = {
|
||||||
|
"version": "1.0.0",
|
||||||
|
"events": {
|
||||||
|
"triage_action": {
|
||||||
|
"description": "Triage",
|
||||||
|
"reward": 20,
|
||||||
|
"category": "triage",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"daily_limits": {"triage": {"max_earn": 50, "max_spend": 0}},
|
||||||
|
}
|
||||||
|
config_file.write_text(yaml.dump(config_data))
|
||||||
|
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
|
||||||
|
# Within limit
|
||||||
|
daily_earned = {"triage": 20}
|
||||||
|
result = rules.compute_transaction(
|
||||||
|
"triage_action", current_tokens=100, current_daily_earned=daily_earned
|
||||||
|
)
|
||||||
|
assert result["allowed"] is True
|
||||||
|
assert result["limit_reached"] is False
|
||||||
|
|
||||||
|
# Would exceed limit (20 + 20 > 50 is false, so this should be fine)
|
||||||
|
# Let's test with higher current earned
|
||||||
|
daily_earned = {"triage": 40}
|
||||||
|
result = rules.compute_transaction(
|
||||||
|
"triage_action", current_tokens=100, current_daily_earned=daily_earned
|
||||||
|
)
|
||||||
|
assert result["allowed"] is False
|
||||||
|
assert result["limit_reached"] is True
|
||||||
|
assert "limit_reason" in result
|
||||||
|
|
||||||
|
|
||||||
|
class TestTokenRulesCategories:
|
||||||
|
"""Test category methods."""
|
||||||
|
|
||||||
|
def test_get_categories(self, tmp_path):
|
||||||
|
"""Get all unique categories."""
|
||||||
|
yaml = pytest.importorskip("yaml")
|
||||||
|
|
||||||
|
config_file = tmp_path / "token_rules.yaml"
|
||||||
|
config_data = {
|
||||||
|
"version": "1.0.0",
|
||||||
|
"events": {
|
||||||
|
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
|
||||||
|
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
|
||||||
|
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
config_file.write_text(yaml.dump(config_data))
|
||||||
|
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
categories = rules.get_categories()
|
||||||
|
|
||||||
|
assert sorted(categories) == ["cat1", "cat2"]
|
||||||
|
|
||||||
|
|
||||||
|
class TestTokenRulesAudit:
|
||||||
|
"""Test audit methods."""
|
||||||
|
|
||||||
|
def test_is_auditable_true(self, tmp_path):
|
||||||
|
"""Check if auditable when enabled."""
|
||||||
|
yaml = pytest.importorskip("yaml")
|
||||||
|
|
||||||
|
config_file = tmp_path / "token_rules.yaml"
|
||||||
|
config_data = {"version": "1.0.0", "audit": {"log_all_transactions": True}}
|
||||||
|
config_file.write_text(yaml.dump(config_data))
|
||||||
|
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
assert rules.is_auditable() is True
|
||||||
|
|
||||||
|
def test_is_auditable_false(self, tmp_path):
|
||||||
|
"""Check if auditable when disabled."""
|
||||||
|
yaml = pytest.importorskip("yaml")
|
||||||
|
|
||||||
|
config_file = tmp_path / "token_rules.yaml"
|
||||||
|
config_data = {"version": "1.0.0", "audit": {"log_all_transactions": False}}
|
||||||
|
config_file.write_text(yaml.dump(config_data))
|
||||||
|
|
||||||
|
rules = tr.TokenRules(config_path=config_file)
|
||||||
|
assert rules.is_auditable() is False
|
||||||
|
|
||||||
|
|
||||||
|
class TestConvenienceFunctions:
|
||||||
|
"""Test module-level convenience functions."""
|
||||||
|
|
||||||
|
def test_get_token_delta(self, tmp_path):
|
||||||
|
"""Convenience function returns delta."""
|
||||||
|
config_file = tmp_path / "nonexistent.yaml"
|
||||||
|
|
||||||
|
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
||||||
|
delta = tr.get_token_delta("pr_merged")
|
||||||
|
assert delta == 10 # From fallback
|
||||||
|
|
||||||
|
def test_check_operation_gate(self, tmp_path):
|
||||||
|
"""Convenience function checks gate."""
|
||||||
|
config_file = tmp_path / "nonexistent.yaml"
|
||||||
|
|
||||||
|
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
||||||
|
# Fallback has pr_merge gate at 0
|
||||||
|
assert tr.check_operation_gate("pr_merge", current_tokens=0) is True
|
||||||
|
assert tr.check_operation_gate("pr_merge", current_tokens=100) is True
|
||||||
|
|
||||||
|
def test_compute_token_reward(self, tmp_path):
|
||||||
|
"""Convenience function computes reward."""
|
||||||
|
config_file = tmp_path / "nonexistent.yaml"
|
||||||
|
|
||||||
|
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
||||||
|
result = tr.compute_token_reward("pr_merged", current_tokens=50)
|
||||||
|
assert result["event"] == "pr_merged"
|
||||||
|
assert result["delta"] == 10
|
||||||
|
assert result["new_balance"] == 60
|
||||||
|
|
||||||
|
def test_list_token_events(self, tmp_path):
|
||||||
|
"""Convenience function lists events."""
|
||||||
|
config_file = tmp_path / "nonexistent.yaml"
|
||||||
|
|
||||||
|
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
||||||
|
events = tr.list_token_events()
|
||||||
|
assert len(events) >= 3 # Fallback has at least 3 events
|
||||||
|
|
||||||
|
# Check structure
|
||||||
|
for event in events:
|
||||||
|
assert "name" in event
|
||||||
|
assert "description" in event
|
||||||
|
assert "delta" in event
|
||||||
|
assert "category" in event
|
||||||
138
timmy_automations/config/token_rules.yaml
Normal file
138
timmy_automations/config/token_rules.yaml
Normal file
@@ -0,0 +1,138 @@
|
|||||||
|
# Token Rules — Agent reward/penalty configuration for automations
|
||||||
|
#
|
||||||
|
# This file defines the token economy for agent actions.
|
||||||
|
# Modify values here to adjust incentives without code changes.
|
||||||
|
#
|
||||||
|
# Used by: timmy_automations.utils.token_rules
|
||||||
|
|
||||||
|
version: "1.0.0"
|
||||||
|
description: "Token economy rules for agent automations"
|
||||||
|
|
||||||
|
# ── Events ─────────────────────────────────────────────────────────────────
|
||||||
|
# Each event type defines rewards/penalties and optional gating thresholds
|
||||||
|
|
||||||
|
events:
|
||||||
|
# Triage actions
|
||||||
|
triage_success:
|
||||||
|
description: "Successfully triaged an issue (scored and categorized)"
|
||||||
|
reward: 5
|
||||||
|
category: "triage"
|
||||||
|
|
||||||
|
deep_triage_refinement:
|
||||||
|
description: "LLM-driven issue refinement with acceptance criteria added"
|
||||||
|
reward: 20
|
||||||
|
category: "triage"
|
||||||
|
|
||||||
|
quarantine_candidate_found:
|
||||||
|
description: "Identified a repeat failure issue for quarantine"
|
||||||
|
reward: 10
|
||||||
|
category: "triage"
|
||||||
|
|
||||||
|
# Daily Run completions
|
||||||
|
daily_run_completed:
|
||||||
|
description: "Completed a daily run cycle successfully"
|
||||||
|
reward: 5
|
||||||
|
category: "daily_run"
|
||||||
|
|
||||||
|
golden_path_generated:
|
||||||
|
description: "Generated a coherent mini-session plan"
|
||||||
|
reward: 3
|
||||||
|
category: "daily_run"
|
||||||
|
|
||||||
|
weekly_narrative_created:
|
||||||
|
description: "Generated weekly summary of work themes"
|
||||||
|
reward: 15
|
||||||
|
category: "daily_run"
|
||||||
|
|
||||||
|
# PR merges
|
||||||
|
pr_merged:
|
||||||
|
description: "Successfully merged a pull request"
|
||||||
|
reward: 10
|
||||||
|
category: "merge"
|
||||||
|
# Gating: requires minimum tokens to perform
|
||||||
|
gate_threshold: 0
|
||||||
|
|
||||||
|
pr_merged_with_tests:
|
||||||
|
description: "Merged PR with all tests passing"
|
||||||
|
reward: 15
|
||||||
|
category: "merge"
|
||||||
|
gate_threshold: 0
|
||||||
|
|
||||||
|
# Test fixes
|
||||||
|
test_fixed:
|
||||||
|
description: "Fixed a failing test"
|
||||||
|
reward: 8
|
||||||
|
category: "test"
|
||||||
|
|
||||||
|
test_added:
|
||||||
|
description: "Added new test coverage"
|
||||||
|
reward: 5
|
||||||
|
category: "test"
|
||||||
|
|
||||||
|
critical_bug_fixed:
|
||||||
|
description: "Fixed a critical bug on main"
|
||||||
|
reward: 25
|
||||||
|
category: "test"
|
||||||
|
|
||||||
|
# General operations
|
||||||
|
automation_run:
|
||||||
|
description: "Ran any automation (resource usage)"
|
||||||
|
penalty: -1
|
||||||
|
category: "operation"
|
||||||
|
|
||||||
|
automation_failure:
|
||||||
|
description: "Automation failed or produced error"
|
||||||
|
penalty: -2
|
||||||
|
category: "operation"
|
||||||
|
|
||||||
|
cycle_retro_logged:
|
||||||
|
description: "Logged structured retrospective data"
|
||||||
|
reward: 5
|
||||||
|
category: "operation"
|
||||||
|
|
||||||
|
pre_commit_passed:
|
||||||
|
description: "Pre-commit checks passed"
|
||||||
|
reward: 2
|
||||||
|
category: "operation"
|
||||||
|
|
||||||
|
pre_commit_failed:
|
||||||
|
description: "Pre-commit checks failed"
|
||||||
|
penalty: -1
|
||||||
|
category: "operation"
|
||||||
|
|
||||||
|
# ── Gating Thresholds ──────────────────────────────────────────────────────
|
||||||
|
# Minimum token balances required for sensitive operations
|
||||||
|
|
||||||
|
gating_thresholds:
|
||||||
|
pr_merge: 0
|
||||||
|
sensitive_config_change: 50
|
||||||
|
agent_workspace_create: 10
|
||||||
|
deep_triage_run: 0
|
||||||
|
|
||||||
|
# ── Daily Limits ───────────────────────────────────────────────────────────
|
||||||
|
# Maximum tokens that can be earned/spent per category per day
|
||||||
|
|
||||||
|
daily_limits:
|
||||||
|
triage:
|
||||||
|
max_earn: 100
|
||||||
|
max_spend: 0
|
||||||
|
daily_run:
|
||||||
|
max_earn: 50
|
||||||
|
max_spend: 0
|
||||||
|
merge:
|
||||||
|
max_earn: 100
|
||||||
|
max_spend: 0
|
||||||
|
test:
|
||||||
|
max_earn: 100
|
||||||
|
max_spend: 0
|
||||||
|
operation:
|
||||||
|
max_earn: 50
|
||||||
|
max_spend: 50
|
||||||
|
|
||||||
|
# ── Audit Settings ─────────────────────────────────────────────────────────
|
||||||
|
# Settings for token audit and inspection
|
||||||
|
|
||||||
|
audit:
|
||||||
|
log_all_transactions: true
|
||||||
|
log_retention_days: 30
|
||||||
|
inspectable_by: ["orchestrator", "auditor", "timmy"]
|
||||||
@@ -22,6 +22,14 @@ from typing import Any
|
|||||||
from urllib.request import Request, urlopen
|
from urllib.request import Request, urlopen
|
||||||
from urllib.error import HTTPError, URLError
|
from urllib.error import HTTPError, URLError
|
||||||
|
|
||||||
|
# ── Token Economy Integration ──────────────────────────────────────────────
|
||||||
|
# Import token rules helpers for tracking Daily Run rewards
|
||||||
|
|
||||||
|
sys.path.insert(
|
||||||
|
0, str(Path(__file__).resolve().parent.parent)
|
||||||
|
)
|
||||||
|
from utils.token_rules import TokenRules, compute_token_reward
|
||||||
|
|
||||||
# ── Configuration ─────────────────────────────────────────────────────────
|
# ── Configuration ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
|
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
|
||||||
@@ -490,6 +498,43 @@ def parse_args() -> argparse.Namespace:
|
|||||||
return p.parse_args()
|
return p.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
def compute_daily_run_tokens(success: bool = True) -> dict[str, Any]:
|
||||||
|
"""Compute token rewards for Daily Run completion.
|
||||||
|
|
||||||
|
Uses the centralized token_rules configuration to calculate
|
||||||
|
rewards/penalties for automation actions.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
success: Whether the Daily Run completed successfully
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Token transaction details
|
||||||
|
"""
|
||||||
|
rules = TokenRules()
|
||||||
|
|
||||||
|
if success:
|
||||||
|
# Daily run completed successfully
|
||||||
|
transaction = compute_token_reward("daily_run_completed", current_tokens=0)
|
||||||
|
|
||||||
|
# Also compute golden path generation if agenda was created
|
||||||
|
agenda_transaction = compute_token_reward("golden_path_generated", current_tokens=0)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"daily_run": transaction,
|
||||||
|
"golden_path": agenda_transaction,
|
||||||
|
"total_delta": transaction.get("delta", 0) + agenda_transaction.get("delta", 0),
|
||||||
|
"config_version": rules.get_config_version(),
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
# Automation failed
|
||||||
|
transaction = compute_token_reward("automation_failure", current_tokens=0)
|
||||||
|
return {
|
||||||
|
"automation_failure": transaction,
|
||||||
|
"total_delta": transaction.get("delta", 0),
|
||||||
|
"config_version": rules.get_config_version(),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def main() -> int:
|
def main() -> int:
|
||||||
args = parse_args()
|
args = parse_args()
|
||||||
config = load_config()
|
config = load_config()
|
||||||
@@ -503,10 +548,13 @@ def main() -> int:
|
|||||||
# Check Gitea availability
|
# Check Gitea availability
|
||||||
if not client.is_available():
|
if not client.is_available():
|
||||||
error_msg = "[orchestrator] Error: Gitea API is not available"
|
error_msg = "[orchestrator] Error: Gitea API is not available"
|
||||||
|
# Compute failure tokens even when unavailable
|
||||||
|
tokens = compute_daily_run_tokens(success=False)
|
||||||
if args.json:
|
if args.json:
|
||||||
print(json.dumps({"error": error_msg}))
|
print(json.dumps({"error": error_msg, "tokens": tokens}))
|
||||||
else:
|
else:
|
||||||
print(error_msg, file=sys.stderr)
|
print(error_msg, file=sys.stderr)
|
||||||
|
print(f"[tokens] Failure penalty: {tokens['total_delta']}", file=sys.stderr)
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
# Fetch candidates and generate agenda
|
# Fetch candidates and generate agenda
|
||||||
@@ -521,9 +569,12 @@ def main() -> int:
|
|||||||
cycles = load_cycle_data()
|
cycles = load_cycle_data()
|
||||||
day_summary = generate_day_summary(activity, cycles)
|
day_summary = generate_day_summary(activity, cycles)
|
||||||
|
|
||||||
|
# Compute token rewards for successful completion
|
||||||
|
tokens = compute_daily_run_tokens(success=True)
|
||||||
|
|
||||||
# Output
|
# Output
|
||||||
if args.json:
|
if args.json:
|
||||||
output = {"agenda": agenda}
|
output = {"agenda": agenda, "tokens": tokens}
|
||||||
if day_summary:
|
if day_summary:
|
||||||
output["day_summary"] = day_summary
|
output["day_summary"] = day_summary
|
||||||
print(json.dumps(output, indent=2))
|
print(json.dumps(output, indent=2))
|
||||||
@@ -531,6 +582,15 @@ def main() -> int:
|
|||||||
print_agenda(agenda)
|
print_agenda(agenda)
|
||||||
if day_summary and activity:
|
if day_summary and activity:
|
||||||
print_day_summary(day_summary, activity)
|
print_day_summary(day_summary, activity)
|
||||||
|
# Show token rewards
|
||||||
|
print("─" * 60)
|
||||||
|
print("🪙 Token Rewards")
|
||||||
|
print("─" * 60)
|
||||||
|
print(f"Daily Run completed: +{tokens['daily_run']['delta']} tokens")
|
||||||
|
if candidates:
|
||||||
|
print(f"Golden path generated: +{tokens['golden_path']['delta']} tokens")
|
||||||
|
print(f"Total: +{tokens['total_delta']} tokens")
|
||||||
|
print(f"Config version: {tokens['config_version']}")
|
||||||
|
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|||||||
6
timmy_automations/utils/__init__.py
Normal file
6
timmy_automations/utils/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
"""Timmy Automations utilities.
|
||||||
|
|
||||||
|
Shared helper modules for automations.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
389
timmy_automations/utils/token_rules.py
Normal file
389
timmy_automations/utils/token_rules.py
Normal file
@@ -0,0 +1,389 @@
|
|||||||
|
"""Token rules helper — Compute token deltas for agent actions.
|
||||||
|
|
||||||
|
This module loads token economy configuration from YAML and provides
|
||||||
|
functions for automations to compute token rewards/penalties.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
from timmy_automations.utils.token_rules import TokenRules
|
||||||
|
|
||||||
|
rules = TokenRules()
|
||||||
|
delta = rules.get_delta("pr_merged")
|
||||||
|
print(f"PR merge reward: {delta}") # 10
|
||||||
|
|
||||||
|
# Check if agent can perform sensitive operation
|
||||||
|
can_merge = rules.check_gate("pr_merge", current_tokens=25)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TokenEvent:
|
||||||
|
"""Represents a single token event configuration."""
|
||||||
|
|
||||||
|
name: str
|
||||||
|
description: str
|
||||||
|
reward: int
|
||||||
|
penalty: int
|
||||||
|
category: str
|
||||||
|
gate_threshold: int | None = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def delta(self) -> int:
|
||||||
|
"""Net token delta (reward + penalty)."""
|
||||||
|
return self.reward + self.penalty
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TokenCategoryLimits:
|
||||||
|
"""Daily limits for a token category."""
|
||||||
|
|
||||||
|
max_earn: int
|
||||||
|
max_spend: int
|
||||||
|
|
||||||
|
|
||||||
|
class TokenRules:
|
||||||
|
"""Token economy rules loader and calculator.
|
||||||
|
|
||||||
|
Loads configuration from timmy_automations/config/token_rules.yaml
|
||||||
|
and provides methods to compute token deltas and check gating.
|
||||||
|
"""
|
||||||
|
|
||||||
|
CONFIG_PATH = Path(__file__).parent.parent / "config" / "token_rules.yaml"
|
||||||
|
|
||||||
|
def __init__(self, config_path: Path | None = None) -> None:
|
||||||
|
"""Initialize token rules from configuration file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config_path: Optional override for config file location.
|
||||||
|
"""
|
||||||
|
self._config_path = config_path or self.CONFIG_PATH
|
||||||
|
self._events: dict[str, TokenEvent] = {}
|
||||||
|
self._gating: dict[str, int] = {}
|
||||||
|
self._daily_limits: dict[str, TokenCategoryLimits] = {}
|
||||||
|
self._audit: dict[str, Any] = {}
|
||||||
|
self._version: str = "unknown"
|
||||||
|
self._load_config()
|
||||||
|
|
||||||
|
def _load_config(self) -> None:
|
||||||
|
"""Load configuration from YAML file."""
|
||||||
|
# Graceful degradation if yaml not available or file missing
|
||||||
|
try:
|
||||||
|
import yaml
|
||||||
|
except ImportError:
|
||||||
|
# YAML not installed, use fallback defaults
|
||||||
|
self._load_fallback_defaults()
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self._config_path.exists():
|
||||||
|
self._load_fallback_defaults()
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
config = yaml.safe_load(self._config_path.read_text())
|
||||||
|
if not config:
|
||||||
|
self._load_fallback_defaults()
|
||||||
|
return
|
||||||
|
|
||||||
|
self._version = config.get("version", "unknown")
|
||||||
|
self._parse_events(config.get("events", {}))
|
||||||
|
self._parse_gating(config.get("gating_thresholds", {}))
|
||||||
|
self._parse_daily_limits(config.get("daily_limits", {}))
|
||||||
|
self._audit = config.get("audit", {})
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
# Any error loading config, use fallbacks
|
||||||
|
self._load_fallback_defaults()
|
||||||
|
|
||||||
|
def _load_fallback_defaults(self) -> None:
|
||||||
|
"""Load minimal fallback defaults if config unavailable."""
|
||||||
|
self._version = "fallback"
|
||||||
|
self._events = {
|
||||||
|
"pr_merged": TokenEvent(
|
||||||
|
name="pr_merged",
|
||||||
|
description="Successfully merged a pull request",
|
||||||
|
reward=10,
|
||||||
|
penalty=0,
|
||||||
|
category="merge",
|
||||||
|
gate_threshold=0,
|
||||||
|
),
|
||||||
|
"test_fixed": TokenEvent(
|
||||||
|
name="test_fixed",
|
||||||
|
description="Fixed a failing test",
|
||||||
|
reward=8,
|
||||||
|
penalty=0,
|
||||||
|
category="test",
|
||||||
|
),
|
||||||
|
"automation_failure": TokenEvent(
|
||||||
|
name="automation_failure",
|
||||||
|
description="Automation failed",
|
||||||
|
reward=0,
|
||||||
|
penalty=-2,
|
||||||
|
category="operation",
|
||||||
|
),
|
||||||
|
}
|
||||||
|
self._gating = {"pr_merge": 0}
|
||||||
|
self._daily_limits = {}
|
||||||
|
self._audit = {"log_all_transactions": True}
|
||||||
|
|
||||||
|
def _parse_events(self, events_config: dict) -> None:
|
||||||
|
"""Parse event configurations from YAML."""
|
||||||
|
for name, config in events_config.items():
|
||||||
|
if not isinstance(config, dict):
|
||||||
|
continue
|
||||||
|
|
||||||
|
self._events[name] = TokenEvent(
|
||||||
|
name=name,
|
||||||
|
description=config.get("description", ""),
|
||||||
|
reward=config.get("reward", 0),
|
||||||
|
penalty=config.get("penalty", 0),
|
||||||
|
category=config.get("category", "unknown"),
|
||||||
|
gate_threshold=config.get("gate_threshold"),
|
||||||
|
)
|
||||||
|
|
||||||
|
def _parse_gating(self, gating_config: dict) -> None:
|
||||||
|
"""Parse gating thresholds from YAML."""
|
||||||
|
for name, threshold in gating_config.items():
|
||||||
|
if isinstance(threshold, int):
|
||||||
|
self._gating[name] = threshold
|
||||||
|
|
||||||
|
def _parse_daily_limits(self, limits_config: dict) -> None:
|
||||||
|
"""Parse daily limits from YAML."""
|
||||||
|
for category, limits in limits_config.items():
|
||||||
|
if isinstance(limits, dict):
|
||||||
|
self._daily_limits[category] = TokenCategoryLimits(
|
||||||
|
max_earn=limits.get("max_earn", 0),
|
||||||
|
max_spend=limits.get("max_spend", 0),
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_delta(self, event_name: str) -> int:
|
||||||
|
"""Get token delta for an event.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_name: Name of the event (e.g., "pr_merged", "test_fixed")
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Net token delta (positive for reward, negative for penalty)
|
||||||
|
"""
|
||||||
|
event = self._events.get(event_name)
|
||||||
|
if event:
|
||||||
|
return event.delta
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def get_event(self, event_name: str) -> TokenEvent | None:
|
||||||
|
"""Get full event configuration.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_name: Name of the event
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
TokenEvent object or None if not found
|
||||||
|
"""
|
||||||
|
return self._events.get(event_name)
|
||||||
|
|
||||||
|
def list_events(self, category: str | None = None) -> list[TokenEvent]:
|
||||||
|
"""List all configured events.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
category: Optional category filter
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of TokenEvent objects
|
||||||
|
"""
|
||||||
|
events = list(self._events.values())
|
||||||
|
if category:
|
||||||
|
events = [e for e in events if e.category == category]
|
||||||
|
return events
|
||||||
|
|
||||||
|
def check_gate(self, operation: str, current_tokens: int) -> bool:
|
||||||
|
"""Check if agent meets token threshold for an operation.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
operation: Operation name (e.g., "pr_merge")
|
||||||
|
current_tokens: Agent's current token balance
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if agent can perform the operation
|
||||||
|
"""
|
||||||
|
threshold = self._gating.get(operation)
|
||||||
|
if threshold is None:
|
||||||
|
return True # No gate defined, allow
|
||||||
|
return current_tokens >= threshold
|
||||||
|
|
||||||
|
def get_gate_threshold(self, operation: str) -> int | None:
|
||||||
|
"""Get the gating threshold for an operation.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
operation: Operation name
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Threshold value or None if no gate defined
|
||||||
|
"""
|
||||||
|
return self._gating.get(operation)
|
||||||
|
|
||||||
|
def get_daily_limits(self, category: str) -> TokenCategoryLimits | None:
|
||||||
|
"""Get daily limits for a category.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
category: Category name
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
TokenCategoryLimits or None if not defined
|
||||||
|
"""
|
||||||
|
return self._daily_limits.get(category)
|
||||||
|
|
||||||
|
def compute_transaction(
|
||||||
|
self,
|
||||||
|
event_name: str,
|
||||||
|
current_tokens: int = 0,
|
||||||
|
current_daily_earned: dict[str, int] | None = None,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Compute a complete token transaction.
|
||||||
|
|
||||||
|
This is the main entry point for agents to use. It returns
|
||||||
|
a complete transaction record with delta, gating check, and limits.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_name: Name of the event
|
||||||
|
current_tokens: Agent's current token balance
|
||||||
|
current_daily_earned: Dict of category -> tokens earned today
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Transaction dict with:
|
||||||
|
- event: Event name
|
||||||
|
- delta: Token delta
|
||||||
|
- allowed: Whether operation is allowed (gating)
|
||||||
|
- new_balance: Projected new balance
|
||||||
|
- limit_reached: Whether daily limit would be exceeded
|
||||||
|
"""
|
||||||
|
event = self._events.get(event_name)
|
||||||
|
if not event:
|
||||||
|
return {
|
||||||
|
"event": event_name,
|
||||||
|
"delta": 0,
|
||||||
|
"allowed": False,
|
||||||
|
"reason": "unknown_event",
|
||||||
|
"new_balance": current_tokens,
|
||||||
|
"limit_reached": False,
|
||||||
|
}
|
||||||
|
|
||||||
|
delta = event.delta
|
||||||
|
new_balance = current_tokens + delta
|
||||||
|
|
||||||
|
# Check gating (for penalties, we don't check gates)
|
||||||
|
allowed = True
|
||||||
|
gate_reason = None
|
||||||
|
if delta > 0 and event.gate_threshold is not None: # Only check gates for positive operations with thresholds
|
||||||
|
allowed = current_tokens >= event.gate_threshold
|
||||||
|
if not allowed:
|
||||||
|
gate_reason = f"requires {event.gate_threshold} tokens"
|
||||||
|
|
||||||
|
# Check daily limits
|
||||||
|
limit_reached = False
|
||||||
|
limit_reason = None
|
||||||
|
if current_daily_earned and event.category in current_daily_earned:
|
||||||
|
limits = self._daily_limits.get(event.category)
|
||||||
|
if limits:
|
||||||
|
current_earned = current_daily_earned.get(event.category, 0)
|
||||||
|
if delta > 0 and current_earned + delta > limits.max_earn:
|
||||||
|
limit_reached = True
|
||||||
|
limit_reason = f"daily earn limit ({limits.max_earn}) reached"
|
||||||
|
|
||||||
|
result = {
|
||||||
|
"event": event_name,
|
||||||
|
"delta": delta,
|
||||||
|
"category": event.category,
|
||||||
|
"allowed": allowed and not limit_reached,
|
||||||
|
"new_balance": new_balance,
|
||||||
|
"limit_reached": limit_reached,
|
||||||
|
}
|
||||||
|
|
||||||
|
if gate_reason:
|
||||||
|
result["gate_reason"] = gate_reason
|
||||||
|
if limit_reason:
|
||||||
|
result["limit_reason"] = limit_reason
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def get_config_version(self) -> str:
|
||||||
|
"""Get the loaded configuration version."""
|
||||||
|
return self._version
|
||||||
|
|
||||||
|
def get_categories(self) -> list[str]:
|
||||||
|
"""Get list of all configured categories."""
|
||||||
|
categories = {e.category for e in self._events.values()}
|
||||||
|
return sorted(categories)
|
||||||
|
|
||||||
|
def is_auditable(self) -> bool:
|
||||||
|
"""Check if transactions should be logged for audit."""
|
||||||
|
return self._audit.get("log_all_transactions", True)
|
||||||
|
|
||||||
|
|
||||||
|
# Convenience functions for simple use cases
|
||||||
|
|
||||||
|
def get_token_delta(event_name: str) -> int:
|
||||||
|
"""Get token delta for an event (convenience function).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_name: Name of the event
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Token delta (positive for reward, negative for penalty)
|
||||||
|
"""
|
||||||
|
return TokenRules().get_delta(event_name)
|
||||||
|
|
||||||
|
|
||||||
|
def check_operation_gate(operation: str, current_tokens: int) -> bool:
|
||||||
|
"""Check if agent can perform operation (convenience function).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
operation: Operation name
|
||||||
|
current_tokens: Agent's current token balance
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if operation is allowed
|
||||||
|
"""
|
||||||
|
return TokenRules().check_gate(operation, current_tokens)
|
||||||
|
|
||||||
|
|
||||||
|
def compute_token_reward(
|
||||||
|
event_name: str,
|
||||||
|
current_tokens: int = 0,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Compute token reward for an event (convenience function).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_name: Name of the event
|
||||||
|
current_tokens: Agent's current token balance
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Transaction dict with delta, allowed status, new balance
|
||||||
|
"""
|
||||||
|
return TokenRules().compute_transaction(event_name, current_tokens)
|
||||||
|
|
||||||
|
|
||||||
|
def list_token_events(category: str | None = None) -> list[dict[str, Any]]:
|
||||||
|
"""List all token events (convenience function).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
category: Optional category filter
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of event dicts with name, description, delta, category
|
||||||
|
"""
|
||||||
|
rules = TokenRules()
|
||||||
|
events = rules.list_events(category)
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
"name": e.name,
|
||||||
|
"description": e.description,
|
||||||
|
"delta": e.delta,
|
||||||
|
"category": e.category,
|
||||||
|
"gate_threshold": e.gate_threshold,
|
||||||
|
}
|
||||||
|
for e in events
|
||||||
|
]
|
||||||
Reference in New Issue
Block a user