From d3eb5c31bf7442bd6dbdf091ad41b96d6168e13d Mon Sep 17 00:00:00 2001 From: kimi Date: Sat, 21 Mar 2026 17:42:43 -0400 Subject: [PATCH] feat: Centralize agent token rules and hooks for automations (#711) - Add token_rules.yaml config defining events, rewards, penalties, and gating thresholds - Create token_rules.py helper module for loading config and computing token deltas - Update orchestrator.py to compute token rewards for Daily Run completion - Add comprehensive tests for token_rules module The token economy is now configurable without code changes: - Events: triage actions, Daily Run completions, PR merges, test fixes - Rewards/penalties per event type - Gating thresholds for sensitive operations - Daily limits per category - Audit settings for transaction logging Fixes #711 --- tests/timmy_automations/test_token_rules.py | 524 ++++++++++++++++++++ timmy_automations/config/token_rules.yaml | 138 ++++++ timmy_automations/daily_run/orchestrator.py | 64 ++- timmy_automations/utils/__init__.py | 6 + timmy_automations/utils/token_rules.py | 389 +++++++++++++++ 5 files changed, 1119 insertions(+), 2 deletions(-) create mode 100644 tests/timmy_automations/test_token_rules.py create mode 100644 timmy_automations/config/token_rules.yaml create mode 100644 timmy_automations/utils/__init__.py create mode 100644 timmy_automations/utils/token_rules.py diff --git a/tests/timmy_automations/test_token_rules.py b/tests/timmy_automations/test_token_rules.py new file mode 100644 index 00000000..2a5cf05b --- /dev/null +++ b/tests/timmy_automations/test_token_rules.py @@ -0,0 +1,524 @@ +"""Tests for token_rules module.""" + +from __future__ import annotations + +import sys +from pathlib import Path +from unittest.mock import patch + +import pytest + +# Add timmy_automations to path for imports +sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations")) + +from utils import token_rules as tr + + +class TestTokenEvent: + """Test TokenEvent dataclass.""" + + def test_delta_calculation_reward(self): + """Delta is positive for rewards.""" + event = tr.TokenEvent( + name="test", + description="Test event", + reward=10, + penalty=0, + category="test", + ) + assert event.delta == 10 + + def test_delta_calculation_penalty(self): + """Delta is negative for penalties.""" + event = tr.TokenEvent( + name="test", + description="Test event", + reward=0, + penalty=-5, + category="test", + ) + assert event.delta == -5 + + def test_delta_calculation_mixed(self): + """Delta is net of reward and penalty.""" + event = tr.TokenEvent( + name="test", + description="Test event", + reward=10, + penalty=-3, + category="test", + ) + assert event.delta == 7 + + +class TestTokenRulesLoading: + """Test TokenRules configuration loading.""" + + def test_loads_from_yaml_file(self, tmp_path): + """Load configuration from YAML file.""" + yaml = pytest.importorskip("yaml") + + config_file = tmp_path / "token_rules.yaml" + config_data = { + "version": "1.0.0-test", + "events": { + "test_event": { + "description": "A test event", + "reward": 15, + "category": "test", + } + }, + "gating_thresholds": {"test_op": 50}, + "daily_limits": {"test": {"max_earn": 100, "max_spend": 10}}, + "audit": {"log_all_transactions": False}, + } + config_file.write_text(yaml.dump(config_data)) + + rules = tr.TokenRules(config_path=config_file) + + assert rules.get_config_version() == "1.0.0-test" + assert rules.get_delta("test_event") == 15 + assert rules.get_gate_threshold("test_op") == 50 + + def test_fallback_when_yaml_missing(self, tmp_path): + """Use fallback defaults when YAML file doesn't exist.""" + config_file = tmp_path / "nonexistent.yaml" + + rules = tr.TokenRules(config_path=config_file) + + assert rules.get_config_version() == "fallback" + # Fallback should have some basic events + assert rules.get_delta("pr_merged") == 10 + assert rules.get_delta("test_fixed") == 8 + assert rules.get_delta("automation_failure") == -2 + + def test_fallback_when_yaml_not_installed(self, tmp_path): + """Use fallback when PyYAML is not installed.""" + with patch.dict(sys.modules, {"yaml": None}): + config_file = tmp_path / "token_rules.yaml" + config_file.write_text("version: '1.0.0'") + + rules = tr.TokenRules(config_path=config_file) + + assert rules.get_config_version() == "fallback" + + +class TestTokenRulesGetDelta: + """Test get_delta method.""" + + def test_get_delta_existing_event(self, tmp_path): + """Get delta for configured event.""" + yaml = pytest.importorskip("yaml") + + config_file = tmp_path / "token_rules.yaml" + config_data = { + "version": "1.0.0", + "events": { + "pr_merged": {"description": "PR merged", "reward": 10, "category": "merge"}, + "automation_failure": {"description": "Failure", "penalty": -2, "category": "ops"}, + }, + } + config_file.write_text(yaml.dump(config_data)) + + rules = tr.TokenRules(config_path=config_file) + + assert rules.get_delta("pr_merged") == 10 + assert rules.get_delta("automation_failure") == -2 + + def test_get_delta_unknown_event(self, tmp_path): + """Return 0 for unknown events.""" + config_file = tmp_path / "nonexistent.yaml" + rules = tr.TokenRules(config_path=config_file) + + assert rules.get_delta("unknown_event") == 0 + + +class TestTokenRulesGetEvent: + """Test get_event method.""" + + def test_get_event_returns_full_config(self, tmp_path): + """Get full event configuration.""" + yaml = pytest.importorskip("yaml") + + config_file = tmp_path / "token_rules.yaml" + config_data = { + "version": "1.0.0", + "events": { + "pr_merged": { + "description": "PR merged successfully", + "reward": 10, + "category": "merge", + "gate_threshold": 0, + } + }, + } + config_file.write_text(yaml.dump(config_data)) + + rules = tr.TokenRules(config_path=config_file) + event = rules.get_event("pr_merged") + + assert event is not None + assert event.name == "pr_merged" + assert event.description == "PR merged successfully" + assert event.reward == 10 + assert event.category == "merge" + assert event.gate_threshold == 0 + + def test_get_event_unknown_returns_none(self, tmp_path): + """Return None for unknown event.""" + config_file = tmp_path / "nonexistent.yaml" + rules = tr.TokenRules(config_path=config_file) + + assert rules.get_event("unknown") is None + + +class TestTokenRulesListEvents: + """Test list_events method.""" + + def test_list_all_events(self, tmp_path): + """List all configured events.""" + yaml = pytest.importorskip("yaml") + + config_file = tmp_path / "token_rules.yaml" + config_data = { + "version": "1.0.0", + "events": { + "event_a": {"description": "A", "reward": 5, "category": "cat1"}, + "event_b": {"description": "B", "reward": 10, "category": "cat2"}, + "event_c": {"description": "C", "reward": 15, "category": "cat1"}, + }, + } + config_file.write_text(yaml.dump(config_data)) + + rules = tr.TokenRules(config_path=config_file) + events = rules.list_events() + + assert len(events) == 3 + event_names = {e.name for e in events} + assert "event_a" in event_names + assert "event_b" in event_names + assert "event_c" in event_names + + def test_list_events_by_category(self, tmp_path): + """Filter events by category.""" + yaml = pytest.importorskip("yaml") + + config_file = tmp_path / "token_rules.yaml" + config_data = { + "version": "1.0.0", + "events": { + "event_a": {"description": "A", "reward": 5, "category": "cat1"}, + "event_b": {"description": "B", "reward": 10, "category": "cat2"}, + "event_c": {"description": "C", "reward": 15, "category": "cat1"}, + }, + } + config_file.write_text(yaml.dump(config_data)) + + rules = tr.TokenRules(config_path=config_file) + events = rules.list_events(category="cat1") + + assert len(events) == 2 + for event in events: + assert event.category == "cat1" + + +class TestTokenRulesGating: + """Test gating threshold methods.""" + + def test_check_gate_with_threshold(self, tmp_path): + """Check gate when threshold is defined.""" + yaml = pytest.importorskip("yaml") + + config_file = tmp_path / "token_rules.yaml" + config_data = { + "version": "1.0.0", + "events": {}, + "gating_thresholds": {"pr_merge": 50}, + } + config_file.write_text(yaml.dump(config_data)) + + rules = tr.TokenRules(config_path=config_file) + + assert rules.check_gate("pr_merge", current_tokens=100) is True + assert rules.check_gate("pr_merge", current_tokens=50) is True + assert rules.check_gate("pr_merge", current_tokens=49) is False + assert rules.check_gate("pr_merge", current_tokens=0) is False + + def test_check_gate_no_threshold(self, tmp_path): + """Check gate when no threshold is defined (always allowed).""" + yaml = pytest.importorskip("yaml") + + config_file = tmp_path / "token_rules.yaml" + config_data = { + "version": "1.0.0", + "events": {}, + "gating_thresholds": {}, + } + config_file.write_text(yaml.dump(config_data)) + + rules = tr.TokenRules(config_path=config_file) + + # No threshold defined, should always be allowed + assert rules.check_gate("unknown_op", current_tokens=0) is True + assert rules.check_gate("unknown_op", current_tokens=-100) is True + + def test_get_gate_threshold(self, tmp_path): + """Get threshold value.""" + yaml = pytest.importorskip("yaml") + + config_file = tmp_path / "token_rules.yaml" + config_data = { + "version": "1.0.0", + "gating_thresholds": {"pr_merge": 50, "sensitive_op": 100}, + } + config_file.write_text(yaml.dump(config_data)) + + rules = tr.TokenRules(config_path=config_file) + + assert rules.get_gate_threshold("pr_merge") == 50 + assert rules.get_gate_threshold("sensitive_op") == 100 + assert rules.get_gate_threshold("unknown") is None + + +class TestTokenRulesDailyLimits: + """Test daily limits methods.""" + + def test_get_daily_limits(self, tmp_path): + """Get daily limits for a category.""" + yaml = pytest.importorskip("yaml") + + config_file = tmp_path / "token_rules.yaml" + config_data = { + "version": "1.0.0", + "daily_limits": { + "triage": {"max_earn": 100, "max_spend": 0}, + "merge": {"max_earn": 50, "max_spend": 10}, + }, + } + config_file.write_text(yaml.dump(config_data)) + + rules = tr.TokenRules(config_path=config_file) + + triage_limits = rules.get_daily_limits("triage") + assert triage_limits is not None + assert triage_limits.max_earn == 100 + assert triage_limits.max_spend == 0 + + merge_limits = rules.get_daily_limits("merge") + assert merge_limits is not None + assert merge_limits.max_earn == 50 + assert merge_limits.max_spend == 10 + + def test_get_daily_limits_unknown(self, tmp_path): + """Return None for unknown category.""" + yaml = pytest.importorskip("yaml") + + config_file = tmp_path / "token_rules.yaml" + config_data = {"version": "1.0.0", "daily_limits": {}} + config_file.write_text(yaml.dump(config_data)) + + rules = tr.TokenRules(config_path=config_file) + + assert rules.get_daily_limits("unknown") is None + + +class TestTokenRulesComputeTransaction: + """Test compute_transaction method.""" + + def test_compute_successful_transaction(self, tmp_path): + """Compute transaction for valid event.""" + yaml = pytest.importorskip("yaml") + + config_file = tmp_path / "token_rules.yaml" + config_data = { + "version": "1.0.0", + "events": { + "pr_merged": {"description": "PR merged", "reward": 10, "category": "merge"} + }, + } + config_file.write_text(yaml.dump(config_data)) + + rules = tr.TokenRules(config_path=config_file) + result = rules.compute_transaction("pr_merged", current_tokens=100) + + assert result["event"] == "pr_merged" + assert result["delta"] == 10 + assert result["category"] == "merge" + assert result["allowed"] is True + assert result["new_balance"] == 110 + assert result["limit_reached"] is False + + def test_compute_unknown_event(self, tmp_path): + """Compute transaction for unknown event.""" + config_file = tmp_path / "nonexistent.yaml" + rules = tr.TokenRules(config_path=config_file) + result = rules.compute_transaction("unknown_event", current_tokens=50) + + assert result["event"] == "unknown_event" + assert result["delta"] == 0 + assert result["allowed"] is False + assert result["reason"] == "unknown_event" + assert result["new_balance"] == 50 + + def test_compute_with_gate_check(self, tmp_path): + """Compute transaction respects gating.""" + yaml = pytest.importorskip("yaml") + + config_file = tmp_path / "token_rules.yaml" + config_data = { + "version": "1.0.0", + "events": { + "sensitive_op": { + "description": "Sensitive", + "reward": 50, + "category": "sensitive", + "gate_threshold": 100, + } + }, + } + config_file.write_text(yaml.dump(config_data)) + + rules = tr.TokenRules(config_path=config_file) + + # With enough tokens + result = rules.compute_transaction("sensitive_op", current_tokens=150) + assert result["allowed"] is True + + # Without enough tokens + result = rules.compute_transaction("sensitive_op", current_tokens=50) + assert result["allowed"] is False + assert "gate_reason" in result + + def test_compute_with_daily_limits(self, tmp_path): + """Compute transaction respects daily limits.""" + yaml = pytest.importorskip("yaml") + + config_file = tmp_path / "token_rules.yaml" + config_data = { + "version": "1.0.0", + "events": { + "triage_action": { + "description": "Triage", + "reward": 20, + "category": "triage", + } + }, + "daily_limits": {"triage": {"max_earn": 50, "max_spend": 0}}, + } + config_file.write_text(yaml.dump(config_data)) + + rules = tr.TokenRules(config_path=config_file) + + # Within limit + daily_earned = {"triage": 20} + result = rules.compute_transaction( + "triage_action", current_tokens=100, current_daily_earned=daily_earned + ) + assert result["allowed"] is True + assert result["limit_reached"] is False + + # Would exceed limit (20 + 20 > 50 is false, so this should be fine) + # Let's test with higher current earned + daily_earned = {"triage": 40} + result = rules.compute_transaction( + "triage_action", current_tokens=100, current_daily_earned=daily_earned + ) + assert result["allowed"] is False + assert result["limit_reached"] is True + assert "limit_reason" in result + + +class TestTokenRulesCategories: + """Test category methods.""" + + def test_get_categories(self, tmp_path): + """Get all unique categories.""" + yaml = pytest.importorskip("yaml") + + config_file = tmp_path / "token_rules.yaml" + config_data = { + "version": "1.0.0", + "events": { + "event_a": {"description": "A", "reward": 5, "category": "cat1"}, + "event_b": {"description": "B", "reward": 10, "category": "cat2"}, + "event_c": {"description": "C", "reward": 15, "category": "cat1"}, + }, + } + config_file.write_text(yaml.dump(config_data)) + + rules = tr.TokenRules(config_path=config_file) + categories = rules.get_categories() + + assert sorted(categories) == ["cat1", "cat2"] + + +class TestTokenRulesAudit: + """Test audit methods.""" + + def test_is_auditable_true(self, tmp_path): + """Check if auditable when enabled.""" + yaml = pytest.importorskip("yaml") + + config_file = tmp_path / "token_rules.yaml" + config_data = {"version": "1.0.0", "audit": {"log_all_transactions": True}} + config_file.write_text(yaml.dump(config_data)) + + rules = tr.TokenRules(config_path=config_file) + assert rules.is_auditable() is True + + def test_is_auditable_false(self, tmp_path): + """Check if auditable when disabled.""" + yaml = pytest.importorskip("yaml") + + config_file = tmp_path / "token_rules.yaml" + config_data = {"version": "1.0.0", "audit": {"log_all_transactions": False}} + config_file.write_text(yaml.dump(config_data)) + + rules = tr.TokenRules(config_path=config_file) + assert rules.is_auditable() is False + + +class TestConvenienceFunctions: + """Test module-level convenience functions.""" + + def test_get_token_delta(self, tmp_path): + """Convenience function returns delta.""" + config_file = tmp_path / "nonexistent.yaml" + + with patch.object(tr.TokenRules, "CONFIG_PATH", config_file): + delta = tr.get_token_delta("pr_merged") + assert delta == 10 # From fallback + + def test_check_operation_gate(self, tmp_path): + """Convenience function checks gate.""" + config_file = tmp_path / "nonexistent.yaml" + + with patch.object(tr.TokenRules, "CONFIG_PATH", config_file): + # Fallback has pr_merge gate at 0 + assert tr.check_operation_gate("pr_merge", current_tokens=0) is True + assert tr.check_operation_gate("pr_merge", current_tokens=100) is True + + def test_compute_token_reward(self, tmp_path): + """Convenience function computes reward.""" + config_file = tmp_path / "nonexistent.yaml" + + with patch.object(tr.TokenRules, "CONFIG_PATH", config_file): + result = tr.compute_token_reward("pr_merged", current_tokens=50) + assert result["event"] == "pr_merged" + assert result["delta"] == 10 + assert result["new_balance"] == 60 + + def test_list_token_events(self, tmp_path): + """Convenience function lists events.""" + config_file = tmp_path / "nonexistent.yaml" + + with patch.object(tr.TokenRules, "CONFIG_PATH", config_file): + events = tr.list_token_events() + assert len(events) >= 3 # Fallback has at least 3 events + + # Check structure + for event in events: + assert "name" in event + assert "description" in event + assert "delta" in event + assert "category" in event diff --git a/timmy_automations/config/token_rules.yaml b/timmy_automations/config/token_rules.yaml new file mode 100644 index 00000000..08d0db82 --- /dev/null +++ b/timmy_automations/config/token_rules.yaml @@ -0,0 +1,138 @@ +# Token Rules — Agent reward/penalty configuration for automations +# +# This file defines the token economy for agent actions. +# Modify values here to adjust incentives without code changes. +# +# Used by: timmy_automations.utils.token_rules + +version: "1.0.0" +description: "Token economy rules for agent automations" + +# ── Events ───────────────────────────────────────────────────────────────── +# Each event type defines rewards/penalties and optional gating thresholds + +events: + # Triage actions + triage_success: + description: "Successfully triaged an issue (scored and categorized)" + reward: 5 + category: "triage" + + deep_triage_refinement: + description: "LLM-driven issue refinement with acceptance criteria added" + reward: 20 + category: "triage" + + quarantine_candidate_found: + description: "Identified a repeat failure issue for quarantine" + reward: 10 + category: "triage" + + # Daily Run completions + daily_run_completed: + description: "Completed a daily run cycle successfully" + reward: 5 + category: "daily_run" + + golden_path_generated: + description: "Generated a coherent mini-session plan" + reward: 3 + category: "daily_run" + + weekly_narrative_created: + description: "Generated weekly summary of work themes" + reward: 15 + category: "daily_run" + + # PR merges + pr_merged: + description: "Successfully merged a pull request" + reward: 10 + category: "merge" + # Gating: requires minimum tokens to perform + gate_threshold: 0 + + pr_merged_with_tests: + description: "Merged PR with all tests passing" + reward: 15 + category: "merge" + gate_threshold: 0 + + # Test fixes + test_fixed: + description: "Fixed a failing test" + reward: 8 + category: "test" + + test_added: + description: "Added new test coverage" + reward: 5 + category: "test" + + critical_bug_fixed: + description: "Fixed a critical bug on main" + reward: 25 + category: "test" + + # General operations + automation_run: + description: "Ran any automation (resource usage)" + penalty: -1 + category: "operation" + + automation_failure: + description: "Automation failed or produced error" + penalty: -2 + category: "operation" + + cycle_retro_logged: + description: "Logged structured retrospective data" + reward: 5 + category: "operation" + + pre_commit_passed: + description: "Pre-commit checks passed" + reward: 2 + category: "operation" + + pre_commit_failed: + description: "Pre-commit checks failed" + penalty: -1 + category: "operation" + +# ── Gating Thresholds ────────────────────────────────────────────────────── +# Minimum token balances required for sensitive operations + +gating_thresholds: + pr_merge: 0 + sensitive_config_change: 50 + agent_workspace_create: 10 + deep_triage_run: 0 + +# ── Daily Limits ─────────────────────────────────────────────────────────── +# Maximum tokens that can be earned/spent per category per day + +daily_limits: + triage: + max_earn: 100 + max_spend: 0 + daily_run: + max_earn: 50 + max_spend: 0 + merge: + max_earn: 100 + max_spend: 0 + test: + max_earn: 100 + max_spend: 0 + operation: + max_earn: 50 + max_spend: 50 + +# ── Audit Settings ───────────────────────────────────────────────────────── +# Settings for token audit and inspection + +audit: + log_all_transactions: true + log_retention_days: 30 + inspectable_by: ["orchestrator", "auditor", "timmy"] diff --git a/timmy_automations/daily_run/orchestrator.py b/timmy_automations/daily_run/orchestrator.py index de117b4a..1001704a 100755 --- a/timmy_automations/daily_run/orchestrator.py +++ b/timmy_automations/daily_run/orchestrator.py @@ -22,6 +22,14 @@ from typing import Any from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError +# ── Token Economy Integration ────────────────────────────────────────────── +# Import token rules helpers for tracking Daily Run rewards + +sys.path.insert( + 0, str(Path(__file__).resolve().parent.parent) +) +from utils.token_rules import TokenRules, compute_token_reward + # ── Configuration ───────────────────────────────────────────────────────── REPO_ROOT = Path(__file__).resolve().parent.parent.parent @@ -490,6 +498,43 @@ def parse_args() -> argparse.Namespace: return p.parse_args() +def compute_daily_run_tokens(success: bool = True) -> dict[str, Any]: + """Compute token rewards for Daily Run completion. + + Uses the centralized token_rules configuration to calculate + rewards/penalties for automation actions. + + Args: + success: Whether the Daily Run completed successfully + + Returns: + Token transaction details + """ + rules = TokenRules() + + if success: + # Daily run completed successfully + transaction = compute_token_reward("daily_run_completed", current_tokens=0) + + # Also compute golden path generation if agenda was created + agenda_transaction = compute_token_reward("golden_path_generated", current_tokens=0) + + return { + "daily_run": transaction, + "golden_path": agenda_transaction, + "total_delta": transaction.get("delta", 0) + agenda_transaction.get("delta", 0), + "config_version": rules.get_config_version(), + } + else: + # Automation failed + transaction = compute_token_reward("automation_failure", current_tokens=0) + return { + "automation_failure": transaction, + "total_delta": transaction.get("delta", 0), + "config_version": rules.get_config_version(), + } + + def main() -> int: args = parse_args() config = load_config() @@ -503,10 +548,13 @@ def main() -> int: # Check Gitea availability if not client.is_available(): error_msg = "[orchestrator] Error: Gitea API is not available" + # Compute failure tokens even when unavailable + tokens = compute_daily_run_tokens(success=False) if args.json: - print(json.dumps({"error": error_msg})) + print(json.dumps({"error": error_msg, "tokens": tokens})) else: print(error_msg, file=sys.stderr) + print(f"[tokens] Failure penalty: {tokens['total_delta']}", file=sys.stderr) return 1 # Fetch candidates and generate agenda @@ -521,9 +569,12 @@ def main() -> int: cycles = load_cycle_data() day_summary = generate_day_summary(activity, cycles) + # Compute token rewards for successful completion + tokens = compute_daily_run_tokens(success=True) + # Output if args.json: - output = {"agenda": agenda} + output = {"agenda": agenda, "tokens": tokens} if day_summary: output["day_summary"] = day_summary print(json.dumps(output, indent=2)) @@ -531,6 +582,15 @@ def main() -> int: print_agenda(agenda) if day_summary and activity: print_day_summary(day_summary, activity) + # Show token rewards + print("─" * 60) + print("🪙 Token Rewards") + print("─" * 60) + print(f"Daily Run completed: +{tokens['daily_run']['delta']} tokens") + if candidates: + print(f"Golden path generated: +{tokens['golden_path']['delta']} tokens") + print(f"Total: +{tokens['total_delta']} tokens") + print(f"Config version: {tokens['config_version']}") return 0 diff --git a/timmy_automations/utils/__init__.py b/timmy_automations/utils/__init__.py new file mode 100644 index 00000000..1d422a8c --- /dev/null +++ b/timmy_automations/utils/__init__.py @@ -0,0 +1,6 @@ +"""Timmy Automations utilities. + +Shared helper modules for automations. +""" + +from __future__ import annotations diff --git a/timmy_automations/utils/token_rules.py b/timmy_automations/utils/token_rules.py new file mode 100644 index 00000000..eaab72da --- /dev/null +++ b/timmy_automations/utils/token_rules.py @@ -0,0 +1,389 @@ +"""Token rules helper — Compute token deltas for agent actions. + +This module loads token economy configuration from YAML and provides +functions for automations to compute token rewards/penalties. + +Usage: + from timmy_automations.utils.token_rules import TokenRules + + rules = TokenRules() + delta = rules.get_delta("pr_merged") + print(f"PR merge reward: {delta}") # 10 + + # Check if agent can perform sensitive operation + can_merge = rules.check_gate("pr_merge", current_tokens=25) +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Any + + +@dataclass +class TokenEvent: + """Represents a single token event configuration.""" + + name: str + description: str + reward: int + penalty: int + category: str + gate_threshold: int | None = None + + @property + def delta(self) -> int: + """Net token delta (reward + penalty).""" + return self.reward + self.penalty + + +@dataclass +class TokenCategoryLimits: + """Daily limits for a token category.""" + + max_earn: int + max_spend: int + + +class TokenRules: + """Token economy rules loader and calculator. + + Loads configuration from timmy_automations/config/token_rules.yaml + and provides methods to compute token deltas and check gating. + """ + + CONFIG_PATH = Path(__file__).parent.parent / "config" / "token_rules.yaml" + + def __init__(self, config_path: Path | None = None) -> None: + """Initialize token rules from configuration file. + + Args: + config_path: Optional override for config file location. + """ + self._config_path = config_path or self.CONFIG_PATH + self._events: dict[str, TokenEvent] = {} + self._gating: dict[str, int] = {} + self._daily_limits: dict[str, TokenCategoryLimits] = {} + self._audit: dict[str, Any] = {} + self._version: str = "unknown" + self._load_config() + + def _load_config(self) -> None: + """Load configuration from YAML file.""" + # Graceful degradation if yaml not available or file missing + try: + import yaml + except ImportError: + # YAML not installed, use fallback defaults + self._load_fallback_defaults() + return + + if not self._config_path.exists(): + self._load_fallback_defaults() + return + + try: + config = yaml.safe_load(self._config_path.read_text()) + if not config: + self._load_fallback_defaults() + return + + self._version = config.get("version", "unknown") + self._parse_events(config.get("events", {})) + self._parse_gating(config.get("gating_thresholds", {})) + self._parse_daily_limits(config.get("daily_limits", {})) + self._audit = config.get("audit", {}) + + except Exception: + # Any error loading config, use fallbacks + self._load_fallback_defaults() + + def _load_fallback_defaults(self) -> None: + """Load minimal fallback defaults if config unavailable.""" + self._version = "fallback" + self._events = { + "pr_merged": TokenEvent( + name="pr_merged", + description="Successfully merged a pull request", + reward=10, + penalty=0, + category="merge", + gate_threshold=0, + ), + "test_fixed": TokenEvent( + name="test_fixed", + description="Fixed a failing test", + reward=8, + penalty=0, + category="test", + ), + "automation_failure": TokenEvent( + name="automation_failure", + description="Automation failed", + reward=0, + penalty=-2, + category="operation", + ), + } + self._gating = {"pr_merge": 0} + self._daily_limits = {} + self._audit = {"log_all_transactions": True} + + def _parse_events(self, events_config: dict) -> None: + """Parse event configurations from YAML.""" + for name, config in events_config.items(): + if not isinstance(config, dict): + continue + + self._events[name] = TokenEvent( + name=name, + description=config.get("description", ""), + reward=config.get("reward", 0), + penalty=config.get("penalty", 0), + category=config.get("category", "unknown"), + gate_threshold=config.get("gate_threshold"), + ) + + def _parse_gating(self, gating_config: dict) -> None: + """Parse gating thresholds from YAML.""" + for name, threshold in gating_config.items(): + if isinstance(threshold, int): + self._gating[name] = threshold + + def _parse_daily_limits(self, limits_config: dict) -> None: + """Parse daily limits from YAML.""" + for category, limits in limits_config.items(): + if isinstance(limits, dict): + self._daily_limits[category] = TokenCategoryLimits( + max_earn=limits.get("max_earn", 0), + max_spend=limits.get("max_spend", 0), + ) + + def get_delta(self, event_name: str) -> int: + """Get token delta for an event. + + Args: + event_name: Name of the event (e.g., "pr_merged", "test_fixed") + + Returns: + Net token delta (positive for reward, negative for penalty) + """ + event = self._events.get(event_name) + if event: + return event.delta + return 0 + + def get_event(self, event_name: str) -> TokenEvent | None: + """Get full event configuration. + + Args: + event_name: Name of the event + + Returns: + TokenEvent object or None if not found + """ + return self._events.get(event_name) + + def list_events(self, category: str | None = None) -> list[TokenEvent]: + """List all configured events. + + Args: + category: Optional category filter + + Returns: + List of TokenEvent objects + """ + events = list(self._events.values()) + if category: + events = [e for e in events if e.category == category] + return events + + def check_gate(self, operation: str, current_tokens: int) -> bool: + """Check if agent meets token threshold for an operation. + + Args: + operation: Operation name (e.g., "pr_merge") + current_tokens: Agent's current token balance + + Returns: + True if agent can perform the operation + """ + threshold = self._gating.get(operation) + if threshold is None: + return True # No gate defined, allow + return current_tokens >= threshold + + def get_gate_threshold(self, operation: str) -> int | None: + """Get the gating threshold for an operation. + + Args: + operation: Operation name + + Returns: + Threshold value or None if no gate defined + """ + return self._gating.get(operation) + + def get_daily_limits(self, category: str) -> TokenCategoryLimits | None: + """Get daily limits for a category. + + Args: + category: Category name + + Returns: + TokenCategoryLimits or None if not defined + """ + return self._daily_limits.get(category) + + def compute_transaction( + self, + event_name: str, + current_tokens: int = 0, + current_daily_earned: dict[str, int] | None = None, + ) -> dict[str, Any]: + """Compute a complete token transaction. + + This is the main entry point for agents to use. It returns + a complete transaction record with delta, gating check, and limits. + + Args: + event_name: Name of the event + current_tokens: Agent's current token balance + current_daily_earned: Dict of category -> tokens earned today + + Returns: + Transaction dict with: + - event: Event name + - delta: Token delta + - allowed: Whether operation is allowed (gating) + - new_balance: Projected new balance + - limit_reached: Whether daily limit would be exceeded + """ + event = self._events.get(event_name) + if not event: + return { + "event": event_name, + "delta": 0, + "allowed": False, + "reason": "unknown_event", + "new_balance": current_tokens, + "limit_reached": False, + } + + delta = event.delta + new_balance = current_tokens + delta + + # Check gating (for penalties, we don't check gates) + allowed = True + gate_reason = None + if delta > 0 and event.gate_threshold is not None: # Only check gates for positive operations with thresholds + allowed = current_tokens >= event.gate_threshold + if not allowed: + gate_reason = f"requires {event.gate_threshold} tokens" + + # Check daily limits + limit_reached = False + limit_reason = None + if current_daily_earned and event.category in current_daily_earned: + limits = self._daily_limits.get(event.category) + if limits: + current_earned = current_daily_earned.get(event.category, 0) + if delta > 0 and current_earned + delta > limits.max_earn: + limit_reached = True + limit_reason = f"daily earn limit ({limits.max_earn}) reached" + + result = { + "event": event_name, + "delta": delta, + "category": event.category, + "allowed": allowed and not limit_reached, + "new_balance": new_balance, + "limit_reached": limit_reached, + } + + if gate_reason: + result["gate_reason"] = gate_reason + if limit_reason: + result["limit_reason"] = limit_reason + + return result + + def get_config_version(self) -> str: + """Get the loaded configuration version.""" + return self._version + + def get_categories(self) -> list[str]: + """Get list of all configured categories.""" + categories = {e.category for e in self._events.values()} + return sorted(categories) + + def is_auditable(self) -> bool: + """Check if transactions should be logged for audit.""" + return self._audit.get("log_all_transactions", True) + + +# Convenience functions for simple use cases + +def get_token_delta(event_name: str) -> int: + """Get token delta for an event (convenience function). + + Args: + event_name: Name of the event + + Returns: + Token delta (positive for reward, negative for penalty) + """ + return TokenRules().get_delta(event_name) + + +def check_operation_gate(operation: str, current_tokens: int) -> bool: + """Check if agent can perform operation (convenience function). + + Args: + operation: Operation name + current_tokens: Agent's current token balance + + Returns: + True if operation is allowed + """ + return TokenRules().check_gate(operation, current_tokens) + + +def compute_token_reward( + event_name: str, + current_tokens: int = 0, +) -> dict[str, Any]: + """Compute token reward for an event (convenience function). + + Args: + event_name: Name of the event + current_tokens: Agent's current token balance + + Returns: + Transaction dict with delta, allowed status, new balance + """ + return TokenRules().compute_transaction(event_name, current_tokens) + + +def list_token_events(category: str | None = None) -> list[dict[str, Any]]: + """List all token events (convenience function). + + Args: + category: Optional category filter + + Returns: + List of event dicts with name, description, delta, category + """ + rules = TokenRules() + events = rules.list_events(category) + return [ + { + "name": e.name, + "description": e.description, + "delta": e.delta, + "category": e.category, + "gate_threshold": e.gate_threshold, + } + for e in events + ] -- 2.43.0