"""Token rules helper — Compute token deltas for agent actions. This module loads token economy configuration from YAML and provides functions for automations to compute token rewards/penalties. Usage: from timmy_automations.utils.token_rules import TokenRules rules = TokenRules() delta = rules.get_delta("pr_merged") print(f"PR merge reward: {delta}") # 10 # Check if agent can perform sensitive operation can_merge = rules.check_gate("pr_merge", current_tokens=25) """ from __future__ import annotations from dataclasses import dataclass from pathlib import Path from typing import Any @dataclass class TokenEvent: """Represents a single token event configuration.""" name: str description: str reward: int penalty: int category: str gate_threshold: int | None = None @property def delta(self) -> int: """Net token delta (reward + penalty).""" return self.reward + self.penalty @dataclass class TokenCategoryLimits: """Daily limits for a token category.""" max_earn: int max_spend: int class TokenRules: """Token economy rules loader and calculator. Loads configuration from timmy_automations/config/token_rules.yaml and provides methods to compute token deltas and check gating. """ CONFIG_PATH = Path(__file__).parent.parent / "config" / "token_rules.yaml" def __init__(self, config_path: Path | None = None) -> None: """Initialize token rules from configuration file. Args: config_path: Optional override for config file location. """ self._config_path = config_path or self.CONFIG_PATH self._events: dict[str, TokenEvent] = {} self._gating: dict[str, int] = {} self._daily_limits: dict[str, TokenCategoryLimits] = {} self._audit: dict[str, Any] = {} self._version: str = "unknown" self._load_config() def _load_config(self) -> None: """Load configuration from YAML file.""" # Graceful degradation if yaml not available or file missing try: import yaml except ImportError: # YAML not installed, use fallback defaults self._load_fallback_defaults() return if not self._config_path.exists(): self._load_fallback_defaults() return try: config = yaml.safe_load(self._config_path.read_text()) if not config: self._load_fallback_defaults() return self._version = config.get("version", "unknown") self._parse_events(config.get("events", {})) self._parse_gating(config.get("gating_thresholds", {})) self._parse_daily_limits(config.get("daily_limits", {})) self._audit = config.get("audit", {}) except Exception: # Any error loading config, use fallbacks self._load_fallback_defaults() def _load_fallback_defaults(self) -> None: """Load minimal fallback defaults if config unavailable.""" self._version = "fallback" self._events = { "pr_merged": TokenEvent( name="pr_merged", description="Successfully merged a pull request", reward=10, penalty=0, category="merge", gate_threshold=0, ), "test_fixed": TokenEvent( name="test_fixed", description="Fixed a failing test", reward=8, penalty=0, category="test", ), "automation_failure": TokenEvent( name="automation_failure", description="Automation failed", reward=0, penalty=-2, category="operation", ), } self._gating = {"pr_merge": 0} self._daily_limits = {} self._audit = {"log_all_transactions": True} def _parse_events(self, events_config: dict) -> None: """Parse event configurations from YAML.""" for name, config in events_config.items(): if not isinstance(config, dict): continue self._events[name] = TokenEvent( name=name, description=config.get("description", ""), reward=config.get("reward", 0), penalty=config.get("penalty", 0), category=config.get("category", "unknown"), gate_threshold=config.get("gate_threshold"), ) def _parse_gating(self, gating_config: dict) -> None: """Parse gating thresholds from YAML.""" for name, threshold in gating_config.items(): if isinstance(threshold, int): self._gating[name] = threshold def _parse_daily_limits(self, limits_config: dict) -> None: """Parse daily limits from YAML.""" for category, limits in limits_config.items(): if isinstance(limits, dict): self._daily_limits[category] = TokenCategoryLimits( max_earn=limits.get("max_earn", 0), max_spend=limits.get("max_spend", 0), ) def get_delta(self, event_name: str) -> int: """Get token delta for an event. Args: event_name: Name of the event (e.g., "pr_merged", "test_fixed") Returns: Net token delta (positive for reward, negative for penalty) """ event = self._events.get(event_name) if event: return event.delta return 0 def get_event(self, event_name: str) -> TokenEvent | None: """Get full event configuration. Args: event_name: Name of the event Returns: TokenEvent object or None if not found """ return self._events.get(event_name) def list_events(self, category: str | None = None) -> list[TokenEvent]: """List all configured events. Args: category: Optional category filter Returns: List of TokenEvent objects """ events = list(self._events.values()) if category: events = [e for e in events if e.category == category] return events def check_gate(self, operation: str, current_tokens: int) -> bool: """Check if agent meets token threshold for an operation. Args: operation: Operation name (e.g., "pr_merge") current_tokens: Agent's current token balance Returns: True if agent can perform the operation """ threshold = self._gating.get(operation) if threshold is None: return True # No gate defined, allow return current_tokens >= threshold def get_gate_threshold(self, operation: str) -> int | None: """Get the gating threshold for an operation. Args: operation: Operation name Returns: Threshold value or None if no gate defined """ return self._gating.get(operation) def get_daily_limits(self, category: str) -> TokenCategoryLimits | None: """Get daily limits for a category. Args: category: Category name Returns: TokenCategoryLimits or None if not defined """ return self._daily_limits.get(category) def compute_transaction( self, event_name: str, current_tokens: int = 0, current_daily_earned: dict[str, int] | None = None, ) -> dict[str, Any]: """Compute a complete token transaction. This is the main entry point for agents to use. It returns a complete transaction record with delta, gating check, and limits. Args: event_name: Name of the event current_tokens: Agent's current token balance current_daily_earned: Dict of category -> tokens earned today Returns: Transaction dict with: - event: Event name - delta: Token delta - allowed: Whether operation is allowed (gating) - new_balance: Projected new balance - limit_reached: Whether daily limit would be exceeded """ event = self._events.get(event_name) if not event: return { "event": event_name, "delta": 0, "allowed": False, "reason": "unknown_event", "new_balance": current_tokens, "limit_reached": False, } delta = event.delta new_balance = current_tokens + delta # Check gating (for penalties, we don't check gates) allowed = True gate_reason = None if delta > 0 and event.gate_threshold is not None: # Only check gates for positive operations with thresholds allowed = current_tokens >= event.gate_threshold if not allowed: gate_reason = f"requires {event.gate_threshold} tokens" # Check daily limits limit_reached = False limit_reason = None if current_daily_earned and event.category in current_daily_earned: limits = self._daily_limits.get(event.category) if limits: current_earned = current_daily_earned.get(event.category, 0) if delta > 0 and current_earned + delta > limits.max_earn: limit_reached = True limit_reason = f"daily earn limit ({limits.max_earn}) reached" result = { "event": event_name, "delta": delta, "category": event.category, "allowed": allowed and not limit_reached, "new_balance": new_balance, "limit_reached": limit_reached, } if gate_reason: result["gate_reason"] = gate_reason if limit_reason: result["limit_reason"] = limit_reason return result def get_config_version(self) -> str: """Get the loaded configuration version.""" return self._version def get_categories(self) -> list[str]: """Get list of all configured categories.""" categories = {e.category for e in self._events.values()} return sorted(categories) def is_auditable(self) -> bool: """Check if transactions should be logged for audit.""" return self._audit.get("log_all_transactions", True) # Convenience functions for simple use cases def get_token_delta(event_name: str) -> int: """Get token delta for an event (convenience function). Args: event_name: Name of the event Returns: Token delta (positive for reward, negative for penalty) """ return TokenRules().get_delta(event_name) def check_operation_gate(operation: str, current_tokens: int) -> bool: """Check if agent can perform operation (convenience function). Args: operation: Operation name current_tokens: Agent's current token balance Returns: True if operation is allowed """ return TokenRules().check_gate(operation, current_tokens) def compute_token_reward( event_name: str, current_tokens: int = 0, ) -> dict[str, Any]: """Compute token reward for an event (convenience function). Args: event_name: Name of the event current_tokens: Agent's current token balance Returns: Transaction dict with delta, allowed status, new balance """ return TokenRules().compute_transaction(event_name, current_tokens) def list_token_events(category: str | None = None) -> list[dict[str, Any]]: """List all token events (convenience function). Args: category: Optional category filter Returns: List of event dicts with name, description, delta, category """ rules = TokenRules() events = rules.list_events(category) return [ { "name": e.name, "description": e.description, "delta": e.delta, "category": e.category, "gate_threshold": e.gate_threshold, } for e in events ]