[kimi] Centralize agent token rules and hooks for automations (#711) (#792)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled

This commit was merged in pull request #792.
This commit is contained in:
2026-03-21 21:44:35 +00:00
parent 6dd48685e7
commit dfe40f5fe6
5 changed files with 1119 additions and 2 deletions

View File

@@ -0,0 +1,138 @@
# Token Rules — Agent reward/penalty configuration for automations
#
# This file defines the token economy for agent actions.
# Modify values here to adjust incentives without code changes.
#
# Used by: timmy_automations.utils.token_rules
version: "1.0.0"
description: "Token economy rules for agent automations"
# ── Events ─────────────────────────────────────────────────────────────────
# Each event type defines rewards/penalties and optional gating thresholds
events:
# Triage actions
triage_success:
description: "Successfully triaged an issue (scored and categorized)"
reward: 5
category: "triage"
deep_triage_refinement:
description: "LLM-driven issue refinement with acceptance criteria added"
reward: 20
category: "triage"
quarantine_candidate_found:
description: "Identified a repeat failure issue for quarantine"
reward: 10
category: "triage"
# Daily Run completions
daily_run_completed:
description: "Completed a daily run cycle successfully"
reward: 5
category: "daily_run"
golden_path_generated:
description: "Generated a coherent mini-session plan"
reward: 3
category: "daily_run"
weekly_narrative_created:
description: "Generated weekly summary of work themes"
reward: 15
category: "daily_run"
# PR merges
pr_merged:
description: "Successfully merged a pull request"
reward: 10
category: "merge"
# Gating: requires minimum tokens to perform
gate_threshold: 0
pr_merged_with_tests:
description: "Merged PR with all tests passing"
reward: 15
category: "merge"
gate_threshold: 0
# Test fixes
test_fixed:
description: "Fixed a failing test"
reward: 8
category: "test"
test_added:
description: "Added new test coverage"
reward: 5
category: "test"
critical_bug_fixed:
description: "Fixed a critical bug on main"
reward: 25
category: "test"
# General operations
automation_run:
description: "Ran any automation (resource usage)"
penalty: -1
category: "operation"
automation_failure:
description: "Automation failed or produced error"
penalty: -2
category: "operation"
cycle_retro_logged:
description: "Logged structured retrospective data"
reward: 5
category: "operation"
pre_commit_passed:
description: "Pre-commit checks passed"
reward: 2
category: "operation"
pre_commit_failed:
description: "Pre-commit checks failed"
penalty: -1
category: "operation"
# ── Gating Thresholds ──────────────────────────────────────────────────────
# Minimum token balances required for sensitive operations
gating_thresholds:
pr_merge: 0
sensitive_config_change: 50
agent_workspace_create: 10
deep_triage_run: 0
# ── Daily Limits ───────────────────────────────────────────────────────────
# Maximum tokens that can be earned/spent per category per day
daily_limits:
triage:
max_earn: 100
max_spend: 0
daily_run:
max_earn: 50
max_spend: 0
merge:
max_earn: 100
max_spend: 0
test:
max_earn: 100
max_spend: 0
operation:
max_earn: 50
max_spend: 50
# ── Audit Settings ─────────────────────────────────────────────────────────
# Settings for token audit and inspection
audit:
log_all_transactions: true
log_retention_days: 30
inspectable_by: ["orchestrator", "auditor", "timmy"]

View File

@@ -22,6 +22,14 @@ from typing import Any
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
# ── Token Economy Integration ──────────────────────────────────────────────
# Import token rules helpers for tracking Daily Run rewards
sys.path.insert(
0, str(Path(__file__).resolve().parent.parent)
)
from utils.token_rules import TokenRules, compute_token_reward
# ── Configuration ─────────────────────────────────────────────────────────
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
@@ -490,6 +498,43 @@ def parse_args() -> argparse.Namespace:
return p.parse_args()
def compute_daily_run_tokens(success: bool = True) -> dict[str, Any]:
"""Compute token rewards for Daily Run completion.
Uses the centralized token_rules configuration to calculate
rewards/penalties for automation actions.
Args:
success: Whether the Daily Run completed successfully
Returns:
Token transaction details
"""
rules = TokenRules()
if success:
# Daily run completed successfully
transaction = compute_token_reward("daily_run_completed", current_tokens=0)
# Also compute golden path generation if agenda was created
agenda_transaction = compute_token_reward("golden_path_generated", current_tokens=0)
return {
"daily_run": transaction,
"golden_path": agenda_transaction,
"total_delta": transaction.get("delta", 0) + agenda_transaction.get("delta", 0),
"config_version": rules.get_config_version(),
}
else:
# Automation failed
transaction = compute_token_reward("automation_failure", current_tokens=0)
return {
"automation_failure": transaction,
"total_delta": transaction.get("delta", 0),
"config_version": rules.get_config_version(),
}
def main() -> int:
args = parse_args()
config = load_config()
@@ -503,10 +548,13 @@ def main() -> int:
# Check Gitea availability
if not client.is_available():
error_msg = "[orchestrator] Error: Gitea API is not available"
# Compute failure tokens even when unavailable
tokens = compute_daily_run_tokens(success=False)
if args.json:
print(json.dumps({"error": error_msg}))
print(json.dumps({"error": error_msg, "tokens": tokens}))
else:
print(error_msg, file=sys.stderr)
print(f"[tokens] Failure penalty: {tokens['total_delta']}", file=sys.stderr)
return 1
# Fetch candidates and generate agenda
@@ -521,9 +569,12 @@ def main() -> int:
cycles = load_cycle_data()
day_summary = generate_day_summary(activity, cycles)
# Compute token rewards for successful completion
tokens = compute_daily_run_tokens(success=True)
# Output
if args.json:
output = {"agenda": agenda}
output = {"agenda": agenda, "tokens": tokens}
if day_summary:
output["day_summary"] = day_summary
print(json.dumps(output, indent=2))
@@ -531,6 +582,15 @@ def main() -> int:
print_agenda(agenda)
if day_summary and activity:
print_day_summary(day_summary, activity)
# Show token rewards
print("" * 60)
print("🪙 Token Rewards")
print("" * 60)
print(f"Daily Run completed: +{tokens['daily_run']['delta']} tokens")
if candidates:
print(f"Golden path generated: +{tokens['golden_path']['delta']} tokens")
print(f"Total: +{tokens['total_delta']} tokens")
print(f"Config version: {tokens['config_version']}")
return 0

View File

@@ -0,0 +1,6 @@
"""Timmy Automations utilities.
Shared helper modules for automations.
"""
from __future__ import annotations

View File

@@ -0,0 +1,389 @@
"""Token rules helper — Compute token deltas for agent actions.
This module loads token economy configuration from YAML and provides
functions for automations to compute token rewards/penalties.
Usage:
from timmy_automations.utils.token_rules import TokenRules
rules = TokenRules()
delta = rules.get_delta("pr_merged")
print(f"PR merge reward: {delta}") # 10
# Check if agent can perform sensitive operation
can_merge = rules.check_gate("pr_merge", current_tokens=25)
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@dataclass
class TokenEvent:
"""Represents a single token event configuration."""
name: str
description: str
reward: int
penalty: int
category: str
gate_threshold: int | None = None
@property
def delta(self) -> int:
"""Net token delta (reward + penalty)."""
return self.reward + self.penalty
@dataclass
class TokenCategoryLimits:
"""Daily limits for a token category."""
max_earn: int
max_spend: int
class TokenRules:
"""Token economy rules loader and calculator.
Loads configuration from timmy_automations/config/token_rules.yaml
and provides methods to compute token deltas and check gating.
"""
CONFIG_PATH = Path(__file__).parent.parent / "config" / "token_rules.yaml"
def __init__(self, config_path: Path | None = None) -> None:
"""Initialize token rules from configuration file.
Args:
config_path: Optional override for config file location.
"""
self._config_path = config_path or self.CONFIG_PATH
self._events: dict[str, TokenEvent] = {}
self._gating: dict[str, int] = {}
self._daily_limits: dict[str, TokenCategoryLimits] = {}
self._audit: dict[str, Any] = {}
self._version: str = "unknown"
self._load_config()
def _load_config(self) -> None:
"""Load configuration from YAML file."""
# Graceful degradation if yaml not available or file missing
try:
import yaml
except ImportError:
# YAML not installed, use fallback defaults
self._load_fallback_defaults()
return
if not self._config_path.exists():
self._load_fallback_defaults()
return
try:
config = yaml.safe_load(self._config_path.read_text())
if not config:
self._load_fallback_defaults()
return
self._version = config.get("version", "unknown")
self._parse_events(config.get("events", {}))
self._parse_gating(config.get("gating_thresholds", {}))
self._parse_daily_limits(config.get("daily_limits", {}))
self._audit = config.get("audit", {})
except Exception:
# Any error loading config, use fallbacks
self._load_fallback_defaults()
def _load_fallback_defaults(self) -> None:
"""Load minimal fallback defaults if config unavailable."""
self._version = "fallback"
self._events = {
"pr_merged": TokenEvent(
name="pr_merged",
description="Successfully merged a pull request",
reward=10,
penalty=0,
category="merge",
gate_threshold=0,
),
"test_fixed": TokenEvent(
name="test_fixed",
description="Fixed a failing test",
reward=8,
penalty=0,
category="test",
),
"automation_failure": TokenEvent(
name="automation_failure",
description="Automation failed",
reward=0,
penalty=-2,
category="operation",
),
}
self._gating = {"pr_merge": 0}
self._daily_limits = {}
self._audit = {"log_all_transactions": True}
def _parse_events(self, events_config: dict) -> None:
"""Parse event configurations from YAML."""
for name, config in events_config.items():
if not isinstance(config, dict):
continue
self._events[name] = TokenEvent(
name=name,
description=config.get("description", ""),
reward=config.get("reward", 0),
penalty=config.get("penalty", 0),
category=config.get("category", "unknown"),
gate_threshold=config.get("gate_threshold"),
)
def _parse_gating(self, gating_config: dict) -> None:
"""Parse gating thresholds from YAML."""
for name, threshold in gating_config.items():
if isinstance(threshold, int):
self._gating[name] = threshold
def _parse_daily_limits(self, limits_config: dict) -> None:
"""Parse daily limits from YAML."""
for category, limits in limits_config.items():
if isinstance(limits, dict):
self._daily_limits[category] = TokenCategoryLimits(
max_earn=limits.get("max_earn", 0),
max_spend=limits.get("max_spend", 0),
)
def get_delta(self, event_name: str) -> int:
"""Get token delta for an event.
Args:
event_name: Name of the event (e.g., "pr_merged", "test_fixed")
Returns:
Net token delta (positive for reward, negative for penalty)
"""
event = self._events.get(event_name)
if event:
return event.delta
return 0
def get_event(self, event_name: str) -> TokenEvent | None:
"""Get full event configuration.
Args:
event_name: Name of the event
Returns:
TokenEvent object or None if not found
"""
return self._events.get(event_name)
def list_events(self, category: str | None = None) -> list[TokenEvent]:
"""List all configured events.
Args:
category: Optional category filter
Returns:
List of TokenEvent objects
"""
events = list(self._events.values())
if category:
events = [e for e in events if e.category == category]
return events
def check_gate(self, operation: str, current_tokens: int) -> bool:
"""Check if agent meets token threshold for an operation.
Args:
operation: Operation name (e.g., "pr_merge")
current_tokens: Agent's current token balance
Returns:
True if agent can perform the operation
"""
threshold = self._gating.get(operation)
if threshold is None:
return True # No gate defined, allow
return current_tokens >= threshold
def get_gate_threshold(self, operation: str) -> int | None:
"""Get the gating threshold for an operation.
Args:
operation: Operation name
Returns:
Threshold value or None if no gate defined
"""
return self._gating.get(operation)
def get_daily_limits(self, category: str) -> TokenCategoryLimits | None:
"""Get daily limits for a category.
Args:
category: Category name
Returns:
TokenCategoryLimits or None if not defined
"""
return self._daily_limits.get(category)
def compute_transaction(
self,
event_name: str,
current_tokens: int = 0,
current_daily_earned: dict[str, int] | None = None,
) -> dict[str, Any]:
"""Compute a complete token transaction.
This is the main entry point for agents to use. It returns
a complete transaction record with delta, gating check, and limits.
Args:
event_name: Name of the event
current_tokens: Agent's current token balance
current_daily_earned: Dict of category -> tokens earned today
Returns:
Transaction dict with:
- event: Event name
- delta: Token delta
- allowed: Whether operation is allowed (gating)
- new_balance: Projected new balance
- limit_reached: Whether daily limit would be exceeded
"""
event = self._events.get(event_name)
if not event:
return {
"event": event_name,
"delta": 0,
"allowed": False,
"reason": "unknown_event",
"new_balance": current_tokens,
"limit_reached": False,
}
delta = event.delta
new_balance = current_tokens + delta
# Check gating (for penalties, we don't check gates)
allowed = True
gate_reason = None
if delta > 0 and event.gate_threshold is not None: # Only check gates for positive operations with thresholds
allowed = current_tokens >= event.gate_threshold
if not allowed:
gate_reason = f"requires {event.gate_threshold} tokens"
# Check daily limits
limit_reached = False
limit_reason = None
if current_daily_earned and event.category in current_daily_earned:
limits = self._daily_limits.get(event.category)
if limits:
current_earned = current_daily_earned.get(event.category, 0)
if delta > 0 and current_earned + delta > limits.max_earn:
limit_reached = True
limit_reason = f"daily earn limit ({limits.max_earn}) reached"
result = {
"event": event_name,
"delta": delta,
"category": event.category,
"allowed": allowed and not limit_reached,
"new_balance": new_balance,
"limit_reached": limit_reached,
}
if gate_reason:
result["gate_reason"] = gate_reason
if limit_reason:
result["limit_reason"] = limit_reason
return result
def get_config_version(self) -> str:
"""Get the loaded configuration version."""
return self._version
def get_categories(self) -> list[str]:
"""Get list of all configured categories."""
categories = {e.category for e in self._events.values()}
return sorted(categories)
def is_auditable(self) -> bool:
"""Check if transactions should be logged for audit."""
return self._audit.get("log_all_transactions", True)
# Convenience functions for simple use cases
def get_token_delta(event_name: str) -> int:
"""Get token delta for an event (convenience function).
Args:
event_name: Name of the event
Returns:
Token delta (positive for reward, negative for penalty)
"""
return TokenRules().get_delta(event_name)
def check_operation_gate(operation: str, current_tokens: int) -> bool:
"""Check if agent can perform operation (convenience function).
Args:
operation: Operation name
current_tokens: Agent's current token balance
Returns:
True if operation is allowed
"""
return TokenRules().check_gate(operation, current_tokens)
def compute_token_reward(
event_name: str,
current_tokens: int = 0,
) -> dict[str, Any]:
"""Compute token reward for an event (convenience function).
Args:
event_name: Name of the event
current_tokens: Agent's current token balance
Returns:
Transaction dict with delta, allowed status, new balance
"""
return TokenRules().compute_transaction(event_name, current_tokens)
def list_token_events(category: str | None = None) -> list[dict[str, Any]]:
"""List all token events (convenience function).
Args:
category: Optional category filter
Returns:
List of event dicts with name, description, delta, category
"""
rules = TokenRules()
events = rules.list_events(category)
return [
{
"name": e.name,
"description": e.description,
"delta": e.delta,
"category": e.category,
"gate_threshold": e.gate_threshold,
}
for e in events
]