390 lines
12 KiB
Python
390 lines
12 KiB
Python
"""Token rules helper — Compute token deltas for agent actions.
|
|
|
|
This module loads token economy configuration from YAML and provides
|
|
functions for automations to compute token rewards/penalties.
|
|
|
|
Usage:
|
|
from timmy_automations.utils.token_rules import TokenRules
|
|
|
|
rules = TokenRules()
|
|
delta = rules.get_delta("pr_merged")
|
|
print(f"PR merge reward: {delta}") # 10
|
|
|
|
# Check if agent can perform sensitive operation
|
|
can_merge = rules.check_gate("pr_merge", current_tokens=25)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
@dataclass
|
|
class TokenEvent:
|
|
"""Represents a single token event configuration."""
|
|
|
|
name: str
|
|
description: str
|
|
reward: int
|
|
penalty: int
|
|
category: str
|
|
gate_threshold: int | None = None
|
|
|
|
@property
|
|
def delta(self) -> int:
|
|
"""Net token delta (reward + penalty)."""
|
|
return self.reward + self.penalty
|
|
|
|
|
|
@dataclass
|
|
class TokenCategoryLimits:
|
|
"""Daily limits for a token category."""
|
|
|
|
max_earn: int
|
|
max_spend: int
|
|
|
|
|
|
class TokenRules:
|
|
"""Token economy rules loader and calculator.
|
|
|
|
Loads configuration from timmy_automations/config/token_rules.yaml
|
|
and provides methods to compute token deltas and check gating.
|
|
"""
|
|
|
|
CONFIG_PATH = Path(__file__).parent.parent / "config" / "token_rules.yaml"
|
|
|
|
def __init__(self, config_path: Path | None = None) -> None:
|
|
"""Initialize token rules from configuration file.
|
|
|
|
Args:
|
|
config_path: Optional override for config file location.
|
|
"""
|
|
self._config_path = config_path or self.CONFIG_PATH
|
|
self._events: dict[str, TokenEvent] = {}
|
|
self._gating: dict[str, int] = {}
|
|
self._daily_limits: dict[str, TokenCategoryLimits] = {}
|
|
self._audit: dict[str, Any] = {}
|
|
self._version: str = "unknown"
|
|
self._load_config()
|
|
|
|
def _load_config(self) -> None:
|
|
"""Load configuration from YAML file."""
|
|
# Graceful degradation if yaml not available or file missing
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
# YAML not installed, use fallback defaults
|
|
self._load_fallback_defaults()
|
|
return
|
|
|
|
if not self._config_path.exists():
|
|
self._load_fallback_defaults()
|
|
return
|
|
|
|
try:
|
|
config = yaml.safe_load(self._config_path.read_text())
|
|
if not config:
|
|
self._load_fallback_defaults()
|
|
return
|
|
|
|
self._version = config.get("version", "unknown")
|
|
self._parse_events(config.get("events", {}))
|
|
self._parse_gating(config.get("gating_thresholds", {}))
|
|
self._parse_daily_limits(config.get("daily_limits", {}))
|
|
self._audit = config.get("audit", {})
|
|
|
|
except Exception:
|
|
# Any error loading config, use fallbacks
|
|
self._load_fallback_defaults()
|
|
|
|
def _load_fallback_defaults(self) -> None:
|
|
"""Load minimal fallback defaults if config unavailable."""
|
|
self._version = "fallback"
|
|
self._events = {
|
|
"pr_merged": TokenEvent(
|
|
name="pr_merged",
|
|
description="Successfully merged a pull request",
|
|
reward=10,
|
|
penalty=0,
|
|
category="merge",
|
|
gate_threshold=0,
|
|
),
|
|
"test_fixed": TokenEvent(
|
|
name="test_fixed",
|
|
description="Fixed a failing test",
|
|
reward=8,
|
|
penalty=0,
|
|
category="test",
|
|
),
|
|
"automation_failure": TokenEvent(
|
|
name="automation_failure",
|
|
description="Automation failed",
|
|
reward=0,
|
|
penalty=-2,
|
|
category="operation",
|
|
),
|
|
}
|
|
self._gating = {"pr_merge": 0}
|
|
self._daily_limits = {}
|
|
self._audit = {"log_all_transactions": True}
|
|
|
|
def _parse_events(self, events_config: dict) -> None:
|
|
"""Parse event configurations from YAML."""
|
|
for name, config in events_config.items():
|
|
if not isinstance(config, dict):
|
|
continue
|
|
|
|
self._events[name] = TokenEvent(
|
|
name=name,
|
|
description=config.get("description", ""),
|
|
reward=config.get("reward", 0),
|
|
penalty=config.get("penalty", 0),
|
|
category=config.get("category", "unknown"),
|
|
gate_threshold=config.get("gate_threshold"),
|
|
)
|
|
|
|
def _parse_gating(self, gating_config: dict) -> None:
|
|
"""Parse gating thresholds from YAML."""
|
|
for name, threshold in gating_config.items():
|
|
if isinstance(threshold, int):
|
|
self._gating[name] = threshold
|
|
|
|
def _parse_daily_limits(self, limits_config: dict) -> None:
|
|
"""Parse daily limits from YAML."""
|
|
for category, limits in limits_config.items():
|
|
if isinstance(limits, dict):
|
|
self._daily_limits[category] = TokenCategoryLimits(
|
|
max_earn=limits.get("max_earn", 0),
|
|
max_spend=limits.get("max_spend", 0),
|
|
)
|
|
|
|
def get_delta(self, event_name: str) -> int:
|
|
"""Get token delta for an event.
|
|
|
|
Args:
|
|
event_name: Name of the event (e.g., "pr_merged", "test_fixed")
|
|
|
|
Returns:
|
|
Net token delta (positive for reward, negative for penalty)
|
|
"""
|
|
event = self._events.get(event_name)
|
|
if event:
|
|
return event.delta
|
|
return 0
|
|
|
|
def get_event(self, event_name: str) -> TokenEvent | None:
|
|
"""Get full event configuration.
|
|
|
|
Args:
|
|
event_name: Name of the event
|
|
|
|
Returns:
|
|
TokenEvent object or None if not found
|
|
"""
|
|
return self._events.get(event_name)
|
|
|
|
def list_events(self, category: str | None = None) -> list[TokenEvent]:
|
|
"""List all configured events.
|
|
|
|
Args:
|
|
category: Optional category filter
|
|
|
|
Returns:
|
|
List of TokenEvent objects
|
|
"""
|
|
events = list(self._events.values())
|
|
if category:
|
|
events = [e for e in events if e.category == category]
|
|
return events
|
|
|
|
def check_gate(self, operation: str, current_tokens: int) -> bool:
|
|
"""Check if agent meets token threshold for an operation.
|
|
|
|
Args:
|
|
operation: Operation name (e.g., "pr_merge")
|
|
current_tokens: Agent's current token balance
|
|
|
|
Returns:
|
|
True if agent can perform the operation
|
|
"""
|
|
threshold = self._gating.get(operation)
|
|
if threshold is None:
|
|
return True # No gate defined, allow
|
|
return current_tokens >= threshold
|
|
|
|
def get_gate_threshold(self, operation: str) -> int | None:
|
|
"""Get the gating threshold for an operation.
|
|
|
|
Args:
|
|
operation: Operation name
|
|
|
|
Returns:
|
|
Threshold value or None if no gate defined
|
|
"""
|
|
return self._gating.get(operation)
|
|
|
|
def get_daily_limits(self, category: str) -> TokenCategoryLimits | None:
|
|
"""Get daily limits for a category.
|
|
|
|
Args:
|
|
category: Category name
|
|
|
|
Returns:
|
|
TokenCategoryLimits or None if not defined
|
|
"""
|
|
return self._daily_limits.get(category)
|
|
|
|
def compute_transaction(
|
|
self,
|
|
event_name: str,
|
|
current_tokens: int = 0,
|
|
current_daily_earned: dict[str, int] | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Compute a complete token transaction.
|
|
|
|
This is the main entry point for agents to use. It returns
|
|
a complete transaction record with delta, gating check, and limits.
|
|
|
|
Args:
|
|
event_name: Name of the event
|
|
current_tokens: Agent's current token balance
|
|
current_daily_earned: Dict of category -> tokens earned today
|
|
|
|
Returns:
|
|
Transaction dict with:
|
|
- event: Event name
|
|
- delta: Token delta
|
|
- allowed: Whether operation is allowed (gating)
|
|
- new_balance: Projected new balance
|
|
- limit_reached: Whether daily limit would be exceeded
|
|
"""
|
|
event = self._events.get(event_name)
|
|
if not event:
|
|
return {
|
|
"event": event_name,
|
|
"delta": 0,
|
|
"allowed": False,
|
|
"reason": "unknown_event",
|
|
"new_balance": current_tokens,
|
|
"limit_reached": False,
|
|
}
|
|
|
|
delta = event.delta
|
|
new_balance = current_tokens + delta
|
|
|
|
# Check gating (for penalties, we don't check gates)
|
|
allowed = True
|
|
gate_reason = None
|
|
if delta > 0 and event.gate_threshold is not None: # Only check gates for positive operations with thresholds
|
|
allowed = current_tokens >= event.gate_threshold
|
|
if not allowed:
|
|
gate_reason = f"requires {event.gate_threshold} tokens"
|
|
|
|
# Check daily limits
|
|
limit_reached = False
|
|
limit_reason = None
|
|
if current_daily_earned and event.category in current_daily_earned:
|
|
limits = self._daily_limits.get(event.category)
|
|
if limits:
|
|
current_earned = current_daily_earned.get(event.category, 0)
|
|
if delta > 0 and current_earned + delta > limits.max_earn:
|
|
limit_reached = True
|
|
limit_reason = f"daily earn limit ({limits.max_earn}) reached"
|
|
|
|
result = {
|
|
"event": event_name,
|
|
"delta": delta,
|
|
"category": event.category,
|
|
"allowed": allowed and not limit_reached,
|
|
"new_balance": new_balance,
|
|
"limit_reached": limit_reached,
|
|
}
|
|
|
|
if gate_reason:
|
|
result["gate_reason"] = gate_reason
|
|
if limit_reason:
|
|
result["limit_reason"] = limit_reason
|
|
|
|
return result
|
|
|
|
def get_config_version(self) -> str:
|
|
"""Get the loaded configuration version."""
|
|
return self._version
|
|
|
|
def get_categories(self) -> list[str]:
|
|
"""Get list of all configured categories."""
|
|
categories = {e.category for e in self._events.values()}
|
|
return sorted(categories)
|
|
|
|
def is_auditable(self) -> bool:
|
|
"""Check if transactions should be logged for audit."""
|
|
return self._audit.get("log_all_transactions", True)
|
|
|
|
|
|
# Convenience functions for simple use cases
|
|
|
|
def get_token_delta(event_name: str) -> int:
|
|
"""Get token delta for an event (convenience function).
|
|
|
|
Args:
|
|
event_name: Name of the event
|
|
|
|
Returns:
|
|
Token delta (positive for reward, negative for penalty)
|
|
"""
|
|
return TokenRules().get_delta(event_name)
|
|
|
|
|
|
def check_operation_gate(operation: str, current_tokens: int) -> bool:
|
|
"""Check if agent can perform operation (convenience function).
|
|
|
|
Args:
|
|
operation: Operation name
|
|
current_tokens: Agent's current token balance
|
|
|
|
Returns:
|
|
True if operation is allowed
|
|
"""
|
|
return TokenRules().check_gate(operation, current_tokens)
|
|
|
|
|
|
def compute_token_reward(
|
|
event_name: str,
|
|
current_tokens: int = 0,
|
|
) -> dict[str, Any]:
|
|
"""Compute token reward for an event (convenience function).
|
|
|
|
Args:
|
|
event_name: Name of the event
|
|
current_tokens: Agent's current token balance
|
|
|
|
Returns:
|
|
Transaction dict with delta, allowed status, new balance
|
|
"""
|
|
return TokenRules().compute_transaction(event_name, current_tokens)
|
|
|
|
|
|
def list_token_events(category: str | None = None) -> list[dict[str, Any]]:
|
|
"""List all token events (convenience function).
|
|
|
|
Args:
|
|
category: Optional category filter
|
|
|
|
Returns:
|
|
List of event dicts with name, description, delta, category
|
|
"""
|
|
rules = TokenRules()
|
|
events = rules.list_events(category)
|
|
return [
|
|
{
|
|
"name": e.name,
|
|
"description": e.description,
|
|
"delta": e.delta,
|
|
"category": e.category,
|
|
"gate_threshold": e.gate_threshold,
|
|
}
|
|
for e in events
|
|
]
|