forked from Rockachopa/Timmy-time-dashboard
Compare commits
2 Commits
kimi/issue
...
kimi/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d3eb5c31bf | ||
| 6dd48685e7 |
@@ -1,98 +0,0 @@
|
||||
# ── System Stress Modes Configuration ────────────────────────────────────────
|
||||
#
|
||||
# This configuration defines how token rewards adapt based on system stress.
|
||||
# When the system detects elevated stress (flaky tests, growing backlog,
|
||||
# CI failures), quest rewards are adjusted to incentivize agents to focus
|
||||
# on the most critical areas.
|
||||
#
|
||||
# ── How It Works ─────────────────────────────────────────────────────────────
|
||||
#
|
||||
# 1. SIGNALS: System metrics are monitored continuously
|
||||
# 2. SCORE: Weighted contributions from triggered signals create a stress score
|
||||
# 3. MODE: Score determines the stress mode (calm, elevated, high)
|
||||
# 4. MULTIPLIERS: Token rewards are multiplied based on the current mode
|
||||
#
|
||||
# ── Stress Thresholds ────────────────────────────────────────────────────────
|
||||
|
||||
thresholds:
|
||||
# Minimum score to enter elevated mode (0.0 - 1.0)
|
||||
elevated_min: 0.3
|
||||
|
||||
# Minimum score to enter high stress mode (0.0 - 1.0)
|
||||
high_min: 0.6
|
||||
|
||||
# ── Stress Signals ───────────────────────────────────────────────────────────
|
||||
#
|
||||
# Each signal has:
|
||||
# - threshold: Value at which signal is considered "triggered"
|
||||
# - weight: Contribution to overall stress score (should sum to ~1.0)
|
||||
|
||||
signals:
|
||||
flaky_test_rate:
|
||||
threshold: 0.15 # 15% of tests showing flakiness
|
||||
weight: 0.30
|
||||
description: "Percentage of test runs that are flaky"
|
||||
|
||||
p1_backlog_growth:
|
||||
threshold: 5 # 5 new P1 issues in lookback period
|
||||
weight: 0.25
|
||||
description: "Net growth in P1 priority issues over 7 days"
|
||||
|
||||
ci_failure_rate:
|
||||
threshold: 0.20 # 20% of CI runs failing
|
||||
weight: 0.25
|
||||
description: "Percentage of CI runs failing in lookback period"
|
||||
|
||||
open_bug_count:
|
||||
threshold: 20 # 20 open bugs
|
||||
weight: 0.20
|
||||
description: "Total open issues labeled as 'bug'"
|
||||
|
||||
# ── Token Multipliers ────────────────────────────────────────────────────────
|
||||
#
|
||||
# Multipliers are applied to quest rewards based on current stress mode.
|
||||
# Values > 1.0 increase rewards, < 1.0 decrease rewards.
|
||||
#
|
||||
# Quest types:
|
||||
# - test_improve: Test coverage/quality improvements
|
||||
# - docs_update: Documentation updates
|
||||
# - issue_count: Closing specific issue types
|
||||
# - issue_reduce: Reducing overall issue backlog
|
||||
# - daily_run: Daily Run session completion
|
||||
# - custom: Special/manual quests
|
||||
# - exploration: Exploratory work
|
||||
# - refactor: Code refactoring
|
||||
|
||||
multipliers:
|
||||
calm:
|
||||
# Calm periods: incentivize maintenance and exploration
|
||||
test_improve: 1.0
|
||||
docs_update: 1.2
|
||||
issue_count: 1.0
|
||||
issue_reduce: 1.0
|
||||
daily_run: 1.0
|
||||
custom: 1.0
|
||||
exploration: 1.3
|
||||
refactor: 1.2
|
||||
|
||||
elevated:
|
||||
# Elevated stress: start emphasizing stability
|
||||
test_improve: 1.2
|
||||
docs_update: 1.0
|
||||
issue_count: 1.1
|
||||
issue_reduce: 1.1
|
||||
daily_run: 1.0
|
||||
custom: 1.0
|
||||
exploration: 1.0
|
||||
refactor: 0.9 # Discourage risky changes
|
||||
|
||||
high:
|
||||
# High stress: crisis mode, focus on stabilization
|
||||
test_improve: 1.5 # Strongly incentivize testing
|
||||
docs_update: 0.8 # Deprioritize docs
|
||||
issue_count: 1.3 # Reward closing issues
|
||||
issue_reduce: 1.4 # Strongly reward reducing backlog
|
||||
daily_run: 1.1
|
||||
custom: 1.0
|
||||
exploration: 0.7 # Discourage exploration
|
||||
refactor: 0.6 # Discourage refactors during crisis
|
||||
@@ -330,6 +330,13 @@ class Settings(BaseSettings):
|
||||
autoresearch_max_iterations: int = 100
|
||||
autoresearch_metric: str = "val_bpb" # metric to optimise (lower = better)
|
||||
|
||||
# ── Weekly Narrative Summary ───────────────────────────────────────
|
||||
# Generates a human-readable weekly summary of development activity.
|
||||
# Disabling this will stop the weekly narrative generation.
|
||||
weekly_narrative_enabled: bool = True
|
||||
weekly_narrative_lookback_days: int = 7
|
||||
weekly_narrative_output_dir: str = ".loop"
|
||||
|
||||
# ── Local Hands (Shell + Git) ──────────────────────────────────────
|
||||
# Enable local shell/git execution hands.
|
||||
hands_shell_enabled: bool = True
|
||||
|
||||
@@ -187,76 +187,6 @@ async def reload_quest_config_api() -> JSONResponse:
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Stress Mode Endpoints
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@router.get("/api/stress")
|
||||
async def get_stress_status_api() -> JSONResponse:
|
||||
"""Get current stress mode status and multipliers.
|
||||
|
||||
Returns:
|
||||
Current stress mode, score, active signals, and multipliers
|
||||
"""
|
||||
try:
|
||||
from timmy.stress_detector import (
|
||||
detect_stress_mode,
|
||||
get_stress_summary,
|
||||
)
|
||||
|
||||
snapshot = detect_stress_mode()
|
||||
summary = get_stress_summary()
|
||||
|
||||
return JSONResponse(
|
||||
{
|
||||
"status": "ok",
|
||||
"stress": summary,
|
||||
"raw": snapshot.to_dict(),
|
||||
}
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to get stress status: %s", exc)
|
||||
return JSONResponse(
|
||||
{
|
||||
"status": "error",
|
||||
"error": str(exc),
|
||||
},
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
|
||||
@router.post("/api/stress/refresh")
|
||||
async def refresh_stress_detection_api() -> JSONResponse:
|
||||
"""Force a fresh stress detection check.
|
||||
|
||||
Normally stress is cached for 60 seconds. This endpoint
|
||||
bypasses the cache for immediate results.
|
||||
"""
|
||||
try:
|
||||
from timmy.stress_detector import detect_stress_mode, get_stress_summary
|
||||
|
||||
snapshot = detect_stress_mode(force_refresh=True)
|
||||
summary = get_stress_summary()
|
||||
|
||||
return JSONResponse(
|
||||
{
|
||||
"status": "ok",
|
||||
"stress": summary,
|
||||
"raw": snapshot.to_dict(),
|
||||
}
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to refresh stress detection: %s", exc)
|
||||
return JSONResponse(
|
||||
{
|
||||
"status": "error",
|
||||
"error": str(exc),
|
||||
},
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dashboard UI Endpoints
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -269,22 +269,6 @@ def _is_on_cooldown(progress: QuestProgress, quest: QuestDefinition) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _apply_stress_multiplier(base_reward: int, quest_type: QuestType) -> tuple[int, float]:
|
||||
"""Apply stress-based multiplier to quest reward.
|
||||
|
||||
Returns:
|
||||
Tuple of (adjusted_reward, multiplier_used)
|
||||
"""
|
||||
try:
|
||||
from timmy.stress_detector import apply_multiplier
|
||||
|
||||
multiplier = apply_multiplier(base_reward, quest_type.value)
|
||||
return multiplier, multiplier / max(base_reward, 1)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to apply stress multiplier: %s", exc)
|
||||
return base_reward, 1.0
|
||||
|
||||
|
||||
def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
|
||||
"""Claim the token reward for a completed quest.
|
||||
|
||||
@@ -308,18 +292,13 @@ def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
|
||||
return None
|
||||
|
||||
try:
|
||||
# Apply stress-based multiplier
|
||||
adjusted_reward, multiplier = _apply_stress_multiplier(
|
||||
quest.reward_tokens, quest.quest_type
|
||||
)
|
||||
|
||||
# Award tokens via ledger
|
||||
from lightning.ledger import create_invoice_entry, mark_settled
|
||||
|
||||
# Create a mock invoice for the reward
|
||||
invoice_entry = create_invoice_entry(
|
||||
payment_hash=f"quest_{quest_id}_{agent_id}_{int(time.time())}",
|
||||
amount_sats=adjusted_reward,
|
||||
amount_sats=quest.reward_tokens,
|
||||
memo=f"Quest reward: {quest.name}",
|
||||
source="quest_reward",
|
||||
agent_id=agent_id,
|
||||
@@ -341,21 +320,12 @@ def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
|
||||
progress.completed_at = ""
|
||||
progress.claimed_at = ""
|
||||
|
||||
# Build notification with multiplier info
|
||||
notification = quest.notification_message.format(tokens=adjusted_reward)
|
||||
if multiplier != 1.0:
|
||||
pct = int((multiplier - 1.0) * 100)
|
||||
if pct > 0:
|
||||
notification += f" (+{pct}% stress bonus)"
|
||||
else:
|
||||
notification += f" ({pct}% stress adjustment)"
|
||||
notification = quest.notification_message.format(tokens=quest.reward_tokens)
|
||||
|
||||
return {
|
||||
"quest_id": quest_id,
|
||||
"agent_id": agent_id,
|
||||
"tokens_awarded": adjusted_reward,
|
||||
"base_reward": quest.reward_tokens,
|
||||
"multiplier": round(multiplier, 2),
|
||||
"tokens_awarded": quest.reward_tokens,
|
||||
"notification": notification,
|
||||
"completion_count": progress.completion_count,
|
||||
}
|
||||
@@ -497,14 +467,6 @@ def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
|
||||
total_rewards = 0
|
||||
completed_count = 0
|
||||
|
||||
# Get current stress mode for adjusted rewards display
|
||||
try:
|
||||
from timmy.stress_detector import get_current_stress_mode, get_multiplier
|
||||
|
||||
current_mode = get_current_stress_mode()
|
||||
except Exception:
|
||||
current_mode = None
|
||||
|
||||
for quest_id, quest in definitions.items():
|
||||
progress = get_quest_progress(quest_id, agent_id)
|
||||
if not progress:
|
||||
@@ -512,23 +474,11 @@ def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
|
||||
|
||||
is_on_cooldown = _is_on_cooldown(progress, quest) if quest.repeatable else False
|
||||
|
||||
# Calculate adjusted reward with stress multiplier
|
||||
adjusted_reward = quest.reward_tokens
|
||||
multiplier = 1.0
|
||||
if current_mode:
|
||||
try:
|
||||
multiplier = get_multiplier(quest.quest_type.value, current_mode)
|
||||
adjusted_reward = int(quest.reward_tokens * multiplier)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
quest_info = {
|
||||
"quest_id": quest_id,
|
||||
"name": quest.name,
|
||||
"description": quest.description,
|
||||
"reward_tokens": quest.reward_tokens,
|
||||
"adjusted_reward": adjusted_reward,
|
||||
"multiplier": round(multiplier, 2),
|
||||
"type": quest.quest_type.value,
|
||||
"enabled": quest.enabled,
|
||||
"repeatable": quest.repeatable,
|
||||
@@ -559,7 +509,6 @@ def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
|
||||
"total_tokens_earned": total_rewards,
|
||||
"total_quests_completed": completed_count,
|
||||
"active_quests_count": len([q for q in quests_status if q["enabled"]]),
|
||||
"stress_mode": current_mode.value if current_mode else None,
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -1,565 +0,0 @@
|
||||
"""System stress detection for adaptive token rewards.
|
||||
|
||||
Monitors system signals like flakiness, backlog growth, and CI failures
|
||||
to determine the current stress mode. Token rewards are then adjusted
|
||||
based on the stress mode to incentivize agents to focus on critical areas.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from enum import StrEnum
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Path to stress mode configuration
|
||||
STRESS_CONFIG_PATH = Path(settings.repo_root) / "config" / "stress_modes.yaml"
|
||||
|
||||
|
||||
class StressMode(StrEnum):
|
||||
"""System stress modes.
|
||||
|
||||
- CALM: Normal operations, incentivize exploration and refactoring
|
||||
- ELEVATED: Some stress signals detected, balance incentives
|
||||
- HIGH: Critical stress, strongly incentivize bug fixes and stabilization
|
||||
"""
|
||||
|
||||
CALM = "calm"
|
||||
ELEVATED = "elevated"
|
||||
HIGH = "high"
|
||||
|
||||
|
||||
@dataclass
|
||||
class StressSignal:
|
||||
"""A single stress signal reading."""
|
||||
|
||||
name: str
|
||||
value: float
|
||||
threshold: float
|
||||
weight: float
|
||||
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
||||
|
||||
@property
|
||||
def is_triggered(self) -> bool:
|
||||
"""Whether this signal exceeds its threshold."""
|
||||
return self.value >= self.threshold
|
||||
|
||||
@property
|
||||
def contribution(self) -> float:
|
||||
"""Calculate this signal's contribution to stress score."""
|
||||
if not self.is_triggered:
|
||||
return 0.0
|
||||
# Contribution is weighted ratio of value to threshold
|
||||
return min(1.0, (self.value / max(self.threshold, 1.0))) * self.weight
|
||||
|
||||
|
||||
@dataclass
|
||||
class StressSnapshot:
|
||||
"""Complete stress assessment at a point in time."""
|
||||
|
||||
mode: StressMode
|
||||
score: float
|
||||
signals: list[StressSignal]
|
||||
multipliers: dict[str, float]
|
||||
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Convert to dictionary for serialization."""
|
||||
return {
|
||||
"mode": self.mode.value,
|
||||
"score": round(self.score, 3),
|
||||
"signals": [
|
||||
{
|
||||
"name": s.name,
|
||||
"value": s.value,
|
||||
"threshold": s.threshold,
|
||||
"triggered": s.is_triggered,
|
||||
"contribution": round(s.contribution, 3),
|
||||
}
|
||||
for s in self.signals
|
||||
],
|
||||
"multipliers": self.multipliers,
|
||||
"timestamp": self.timestamp,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class StressThresholds:
|
||||
"""Thresholds for entering/exiting stress modes."""
|
||||
|
||||
elevated_min: float = 0.3
|
||||
high_min: float = 0.6
|
||||
|
||||
def get_mode_for_score(self, score: float) -> StressMode:
|
||||
"""Determine stress mode based on score."""
|
||||
if score >= self.high_min:
|
||||
return StressMode.HIGH
|
||||
elif score >= self.elevated_min:
|
||||
return StressMode.ELEVATED
|
||||
return StressMode.CALM
|
||||
|
||||
|
||||
# In-memory storage for stress state
|
||||
_current_snapshot: StressSnapshot | None = None
|
||||
_last_check_time: datetime | None = None
|
||||
_config_cache: dict[str, Any] | None = None
|
||||
_config_mtime: float = 0.0
|
||||
|
||||
|
||||
def _load_stress_config() -> dict[str, Any]:
|
||||
"""Load stress mode configuration from YAML.
|
||||
|
||||
Returns:
|
||||
Configuration dictionary with default fallbacks
|
||||
"""
|
||||
global _config_cache, _config_mtime
|
||||
|
||||
# Check if config file has been modified
|
||||
if STRESS_CONFIG_PATH.exists():
|
||||
mtime = STRESS_CONFIG_PATH.stat().st_mtime
|
||||
if mtime != _config_mtime or _config_cache is None:
|
||||
try:
|
||||
raw = STRESS_CONFIG_PATH.read_text()
|
||||
_config_cache = yaml.safe_load(raw) or {}
|
||||
_config_mtime = mtime
|
||||
logger.debug("Loaded stress config from %s", STRESS_CONFIG_PATH)
|
||||
except (OSError, yaml.YAMLError) as exc:
|
||||
logger.warning("Failed to load stress config: %s", exc)
|
||||
_config_cache = {}
|
||||
|
||||
if _config_cache is None:
|
||||
_config_cache = {}
|
||||
|
||||
return _config_cache
|
||||
|
||||
|
||||
def get_default_config() -> dict[str, Any]:
|
||||
"""Get default stress configuration."""
|
||||
return {
|
||||
"thresholds": {
|
||||
"elevated_min": 0.3,
|
||||
"high_min": 0.6,
|
||||
},
|
||||
"signals": {
|
||||
"flaky_test_rate": {
|
||||
"threshold": 0.15, # 15% flaky test rate
|
||||
"weight": 0.3,
|
||||
"description": "Percentage of tests that are flaky",
|
||||
},
|
||||
"p1_backlog_growth": {
|
||||
"threshold": 5, # 5 new P1 issues
|
||||
"weight": 0.25,
|
||||
"description": "Net growth in P1 priority issues",
|
||||
},
|
||||
"ci_failure_rate": {
|
||||
"threshold": 0.2, # 20% CI failure rate
|
||||
"weight": 0.25,
|
||||
"description": "Percentage of CI runs failing",
|
||||
},
|
||||
"open_bug_count": {
|
||||
"threshold": 20, # 20 open bugs
|
||||
"weight": 0.2,
|
||||
"description": "Total open issues labeled as bugs",
|
||||
},
|
||||
},
|
||||
"multipliers": {
|
||||
StressMode.CALM.value: {
|
||||
"test_improve": 1.0,
|
||||
"docs_update": 1.2, # Calm periods good for docs
|
||||
"issue_count": 1.0,
|
||||
"issue_reduce": 1.0,
|
||||
"daily_run": 1.0,
|
||||
"custom": 1.0,
|
||||
"exploration": 1.3, # Encourage exploration
|
||||
"refactor": 1.2, # Encourage refactoring
|
||||
},
|
||||
StressMode.ELEVATED.value: {
|
||||
"test_improve": 1.2, # Start emphasizing tests
|
||||
"docs_update": 1.0,
|
||||
"issue_count": 1.1,
|
||||
"issue_reduce": 1.1,
|
||||
"daily_run": 1.0,
|
||||
"custom": 1.0,
|
||||
"exploration": 1.0,
|
||||
"refactor": 0.9, # Discourage risky refactors
|
||||
},
|
||||
StressMode.HIGH.value: {
|
||||
"test_improve": 1.5, # Strongly incentivize testing
|
||||
"docs_update": 0.8, # Deprioritize docs
|
||||
"issue_count": 1.3, # Reward closing issues
|
||||
"issue_reduce": 1.4, # Strongly reward reducing backlog
|
||||
"daily_run": 1.1,
|
||||
"custom": 1.0,
|
||||
"exploration": 0.7, # Discourage exploration
|
||||
"refactor": 0.6, # Discourage refactors during crisis
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _get_config_value(key_path: str, default: Any = None) -> Any:
|
||||
"""Get a value from config using dot notation path."""
|
||||
config = _load_stress_config()
|
||||
keys = key_path.split(".")
|
||||
value = config
|
||||
for key in keys:
|
||||
if isinstance(value, dict):
|
||||
value = value.get(key)
|
||||
else:
|
||||
return default
|
||||
return value if value is not None else default
|
||||
|
||||
|
||||
def _calculate_flaky_test_rate() -> float:
|
||||
"""Calculate current flaky test rate from available data."""
|
||||
try:
|
||||
# Try to load from daily run metrics or test results
|
||||
test_results_path = Path(settings.repo_root) / ".loop" / "test_results.jsonl"
|
||||
if not test_results_path.exists():
|
||||
return 0.0
|
||||
|
||||
# Count recent test runs and flaky results
|
||||
now = datetime.now(UTC)
|
||||
cutoff = now - timedelta(days=7)
|
||||
|
||||
total_runs = 0
|
||||
flaky_runs = 0
|
||||
|
||||
if test_results_path.exists():
|
||||
for line in test_results_path.read_text().strip().splitlines():
|
||||
try:
|
||||
entry = json.loads(line)
|
||||
ts_str = entry.get("timestamp", "")
|
||||
if not ts_str:
|
||||
continue
|
||||
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
||||
if ts >= cutoff:
|
||||
total_runs += 1
|
||||
if entry.get("is_flaky", False):
|
||||
flaky_runs += 1
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
|
||||
return flaky_runs / max(total_runs, 1)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to calculate flaky test rate: %s", exc)
|
||||
return 0.0
|
||||
|
||||
|
||||
def _calculate_p1_backlog_growth() -> float:
|
||||
"""Calculate P1 issue backlog growth."""
|
||||
try:
|
||||
from dashboard.routes.daily_run import GiteaClient, _load_config
|
||||
|
||||
config = _load_config()
|
||||
token = config.get("token")
|
||||
client = GiteaClient(config, token)
|
||||
|
||||
if not client.is_available():
|
||||
return 0.0
|
||||
|
||||
# Get current P1 issues
|
||||
now = datetime.now(UTC)
|
||||
cutoff_current = now - timedelta(days=7)
|
||||
cutoff_previous = now - timedelta(days=14)
|
||||
|
||||
issues = client.get_paginated("issues", {"state": "all", "labels": "P1", "limit": 100})
|
||||
|
||||
current_count = 0
|
||||
previous_count = 0
|
||||
|
||||
for issue in issues:
|
||||
created_at = issue.get("created_at", "")
|
||||
if not created_at:
|
||||
continue
|
||||
try:
|
||||
created = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
|
||||
if created >= cutoff_current:
|
||||
current_count += 1
|
||||
elif created >= cutoff_previous:
|
||||
previous_count += 1
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
|
||||
# Return net growth (positive means growing backlog)
|
||||
return max(0, current_count - previous_count)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to calculate P1 backlog growth: %s", exc)
|
||||
return 0.0
|
||||
|
||||
|
||||
def _calculate_ci_failure_rate() -> float:
|
||||
"""Calculate CI failure rate from recent runs."""
|
||||
try:
|
||||
# Try to get CI metrics from Gitea or local files
|
||||
ci_results_path = Path(settings.repo_root) / ".loop" / "ci_results.jsonl"
|
||||
if not ci_results_path.exists():
|
||||
return 0.0
|
||||
|
||||
now = datetime.now(UTC)
|
||||
cutoff = now - timedelta(days=7)
|
||||
|
||||
total_runs = 0
|
||||
failed_runs = 0
|
||||
|
||||
for line in ci_results_path.read_text().strip().splitlines():
|
||||
try:
|
||||
entry = json.loads(line)
|
||||
ts_str = entry.get("timestamp", "")
|
||||
if not ts_str:
|
||||
continue
|
||||
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
||||
if ts >= cutoff:
|
||||
total_runs += 1
|
||||
if entry.get("status") != "success":
|
||||
failed_runs += 1
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
|
||||
return failed_runs / max(total_runs, 1)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to calculate CI failure rate: %s", exc)
|
||||
return 0.0
|
||||
|
||||
|
||||
def _calculate_open_bug_count() -> float:
|
||||
"""Calculate current open bug count."""
|
||||
try:
|
||||
from dashboard.routes.daily_run import GiteaClient, _load_config
|
||||
|
||||
config = _load_config()
|
||||
token = config.get("token")
|
||||
client = GiteaClient(config, token)
|
||||
|
||||
if not client.is_available():
|
||||
return 0.0
|
||||
|
||||
issues = client.get_paginated("issues", {"state": "open", "labels": "bug", "limit": 100})
|
||||
|
||||
return float(len(issues))
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to calculate open bug count: %s", exc)
|
||||
return 0.0
|
||||
|
||||
|
||||
def _collect_stress_signals() -> list[StressSignal]:
|
||||
"""Collect all stress signals from the system."""
|
||||
config = _load_stress_config()
|
||||
default_config = get_default_config()
|
||||
signals_config = config.get("signals", default_config["signals"])
|
||||
|
||||
signals = []
|
||||
|
||||
# Define signal collectors
|
||||
collectors = {
|
||||
"flaky_test_rate": _calculate_flaky_test_rate,
|
||||
"p1_backlog_growth": _calculate_p1_backlog_growth,
|
||||
"ci_failure_rate": _calculate_ci_failure_rate,
|
||||
"open_bug_count": _calculate_open_bug_count,
|
||||
}
|
||||
|
||||
for signal_name, collector in collectors.items():
|
||||
signal_cfg = signals_config.get(signal_name, {})
|
||||
default_cfg = default_config["signals"].get(signal_name, {})
|
||||
|
||||
try:
|
||||
value = collector()
|
||||
threshold = signal_cfg.get("threshold", default_cfg.get("threshold", 1.0))
|
||||
weight = signal_cfg.get("weight", default_cfg.get("weight", 0.25))
|
||||
|
||||
signals.append(
|
||||
StressSignal(
|
||||
name=signal_name,
|
||||
value=value,
|
||||
threshold=threshold,
|
||||
weight=weight,
|
||||
)
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to collect signal %s: %s", signal_name, exc)
|
||||
|
||||
return signals
|
||||
|
||||
|
||||
def _calculate_stress_score(signals: list[StressSignal]) -> float:
|
||||
"""Calculate overall stress score from signals.
|
||||
|
||||
Score is weighted sum of triggered signal contributions,
|
||||
normalized to 0-1 range.
|
||||
"""
|
||||
if not signals:
|
||||
return 0.0
|
||||
|
||||
total_weight = sum(s.weight for s in signals)
|
||||
if total_weight == 0:
|
||||
return 0.0
|
||||
|
||||
triggered_contribution = sum(s.contribution for s in signals)
|
||||
return min(1.0, triggered_contribution / total_weight)
|
||||
|
||||
|
||||
def _get_multipliers_for_mode(mode: StressMode) -> dict[str, float]:
|
||||
"""Get token multipliers for a specific stress mode."""
|
||||
config = _load_stress_config()
|
||||
default_config = get_default_config()
|
||||
|
||||
multipliers = config.get("multipliers", default_config["multipliers"])
|
||||
mode_multipliers = multipliers.get(mode.value, {})
|
||||
default_mode_multipliers = default_config["multipliers"].get(mode.value, {})
|
||||
|
||||
# Merge with defaults
|
||||
result = default_mode_multipliers.copy()
|
||||
result.update(mode_multipliers)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def detect_stress_mode(
|
||||
force_refresh: bool = False,
|
||||
min_check_interval_seconds: int = 60,
|
||||
) -> StressSnapshot:
|
||||
"""Detect current system stress mode.
|
||||
|
||||
Args:
|
||||
force_refresh: Force a new check even if recently checked
|
||||
min_check_interval_seconds: Minimum seconds between checks
|
||||
|
||||
Returns:
|
||||
StressSnapshot with mode, score, signals, and multipliers
|
||||
"""
|
||||
global _current_snapshot, _last_check_time
|
||||
|
||||
now = datetime.now(UTC)
|
||||
|
||||
# Return cached snapshot if recent and not forced
|
||||
if not force_refresh and _current_snapshot is not None and _last_check_time is not None:
|
||||
elapsed = (now - _last_check_time).total_seconds()
|
||||
if elapsed < min_check_interval_seconds:
|
||||
return _current_snapshot
|
||||
|
||||
# Collect signals and calculate stress
|
||||
signals = _collect_stress_signals()
|
||||
score = _calculate_stress_score(signals)
|
||||
|
||||
# Determine mode from score
|
||||
config = _load_stress_config()
|
||||
default_config = get_default_config()
|
||||
thresholds_cfg = config.get("thresholds", default_config["thresholds"])
|
||||
thresholds = StressThresholds(
|
||||
elevated_min=thresholds_cfg.get("elevated_min", 0.3),
|
||||
high_min=thresholds_cfg.get("high_min", 0.6),
|
||||
)
|
||||
mode = thresholds.get_mode_for_score(score)
|
||||
|
||||
# Get multipliers for this mode
|
||||
multipliers = _get_multipliers_for_mode(mode)
|
||||
|
||||
# Create snapshot
|
||||
snapshot = StressSnapshot(
|
||||
mode=mode,
|
||||
score=score,
|
||||
signals=signals,
|
||||
multipliers=multipliers,
|
||||
timestamp=now.isoformat(),
|
||||
)
|
||||
|
||||
# Cache result
|
||||
_current_snapshot = snapshot
|
||||
_last_check_time = now
|
||||
|
||||
# Log mode changes
|
||||
if _current_snapshot is not None and _current_snapshot.mode != mode:
|
||||
logger.info(
|
||||
"Stress mode changed: %s -> %s (score: %.2f)",
|
||||
_current_snapshot.mode.value if _current_snapshot else "none",
|
||||
mode.value,
|
||||
score,
|
||||
)
|
||||
|
||||
return snapshot
|
||||
|
||||
|
||||
def get_current_stress_mode() -> StressMode:
|
||||
"""Get current stress mode (uses cached or fresh detection)."""
|
||||
snapshot = detect_stress_mode()
|
||||
return snapshot.mode
|
||||
|
||||
|
||||
def get_multiplier(quest_type: str, mode: StressMode | None = None) -> float:
|
||||
"""Get token multiplier for a quest type.
|
||||
|
||||
Args:
|
||||
quest_type: Type of quest (test_improve, issue_count, etc.)
|
||||
mode: Specific mode to get multiplier for, or None for current
|
||||
|
||||
Returns:
|
||||
Multiplier value (1.0 = normal, 1.5 = 50% bonus, etc.)
|
||||
"""
|
||||
if mode is None:
|
||||
mode = get_current_stress_mode()
|
||||
|
||||
multipliers = _get_multipliers_for_mode(mode)
|
||||
return multipliers.get(quest_type, 1.0)
|
||||
|
||||
|
||||
def apply_multiplier(base_reward: int, quest_type: str) -> int:
|
||||
"""Apply stress-based multiplier to a base reward.
|
||||
|
||||
Args:
|
||||
base_reward: Base token reward amount
|
||||
quest_type: Type of quest for multiplier lookup
|
||||
|
||||
Returns:
|
||||
Adjusted reward amount (always >= 1)
|
||||
"""
|
||||
multiplier = get_multiplier(quest_type)
|
||||
adjusted = int(base_reward * multiplier)
|
||||
return max(1, adjusted)
|
||||
|
||||
|
||||
def get_stress_summary() -> dict[str, Any]:
|
||||
"""Get a human-readable summary of current stress state."""
|
||||
snapshot = detect_stress_mode()
|
||||
|
||||
# Generate explanation
|
||||
explanations = {
|
||||
StressMode.CALM: "System is calm. Good time for exploration and refactoring.",
|
||||
StressMode.ELEVATED: "Elevated stress detected. Focus on stability and tests.",
|
||||
StressMode.HIGH: "HIGH STRESS MODE. Prioritize bug fixes and test hardening.",
|
||||
}
|
||||
|
||||
triggered_signals = [s for s in snapshot.signals if s.is_triggered]
|
||||
|
||||
return {
|
||||
"mode": snapshot.mode.value,
|
||||
"score": round(snapshot.score, 3),
|
||||
"explanation": explanations.get(snapshot.mode, "Unknown mode"),
|
||||
"active_signals": [
|
||||
{
|
||||
"name": s.name,
|
||||
"value": round(s.value, 3),
|
||||
"threshold": s.threshold,
|
||||
}
|
||||
for s in triggered_signals
|
||||
],
|
||||
"current_multipliers": snapshot.multipliers,
|
||||
"last_updated": snapshot.timestamp,
|
||||
}
|
||||
|
||||
|
||||
def reset_stress_state() -> None:
|
||||
"""Reset stress state cache (useful for testing)."""
|
||||
global _current_snapshot, _last_check_time, _config_cache, _config_mtime
|
||||
_current_snapshot = None
|
||||
_last_check_time = None
|
||||
_config_cache = None
|
||||
_config_mtime = 0.0
|
||||
524
tests/timmy_automations/test_token_rules.py
Normal file
524
tests/timmy_automations/test_token_rules.py
Normal file
@@ -0,0 +1,524 @@
|
||||
"""Tests for token_rules module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Add timmy_automations to path for imports
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations"))
|
||||
|
||||
from utils import token_rules as tr
|
||||
|
||||
|
||||
class TestTokenEvent:
|
||||
"""Test TokenEvent dataclass."""
|
||||
|
||||
def test_delta_calculation_reward(self):
|
||||
"""Delta is positive for rewards."""
|
||||
event = tr.TokenEvent(
|
||||
name="test",
|
||||
description="Test event",
|
||||
reward=10,
|
||||
penalty=0,
|
||||
category="test",
|
||||
)
|
||||
assert event.delta == 10
|
||||
|
||||
def test_delta_calculation_penalty(self):
|
||||
"""Delta is negative for penalties."""
|
||||
event = tr.TokenEvent(
|
||||
name="test",
|
||||
description="Test event",
|
||||
reward=0,
|
||||
penalty=-5,
|
||||
category="test",
|
||||
)
|
||||
assert event.delta == -5
|
||||
|
||||
def test_delta_calculation_mixed(self):
|
||||
"""Delta is net of reward and penalty."""
|
||||
event = tr.TokenEvent(
|
||||
name="test",
|
||||
description="Test event",
|
||||
reward=10,
|
||||
penalty=-3,
|
||||
category="test",
|
||||
)
|
||||
assert event.delta == 7
|
||||
|
||||
|
||||
class TestTokenRulesLoading:
|
||||
"""Test TokenRules configuration loading."""
|
||||
|
||||
def test_loads_from_yaml_file(self, tmp_path):
|
||||
"""Load configuration from YAML file."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0-test",
|
||||
"events": {
|
||||
"test_event": {
|
||||
"description": "A test event",
|
||||
"reward": 15,
|
||||
"category": "test",
|
||||
}
|
||||
},
|
||||
"gating_thresholds": {"test_op": 50},
|
||||
"daily_limits": {"test": {"max_earn": 100, "max_spend": 10}},
|
||||
"audit": {"log_all_transactions": False},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_config_version() == "1.0.0-test"
|
||||
assert rules.get_delta("test_event") == 15
|
||||
assert rules.get_gate_threshold("test_op") == 50
|
||||
|
||||
def test_fallback_when_yaml_missing(self, tmp_path):
|
||||
"""Use fallback defaults when YAML file doesn't exist."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_config_version() == "fallback"
|
||||
# Fallback should have some basic events
|
||||
assert rules.get_delta("pr_merged") == 10
|
||||
assert rules.get_delta("test_fixed") == 8
|
||||
assert rules.get_delta("automation_failure") == -2
|
||||
|
||||
def test_fallback_when_yaml_not_installed(self, tmp_path):
|
||||
"""Use fallback when PyYAML is not installed."""
|
||||
with patch.dict(sys.modules, {"yaml": None}):
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_file.write_text("version: '1.0.0'")
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_config_version() == "fallback"
|
||||
|
||||
|
||||
class TestTokenRulesGetDelta:
|
||||
"""Test get_delta method."""
|
||||
|
||||
def test_get_delta_existing_event(self, tmp_path):
|
||||
"""Get delta for configured event."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"pr_merged": {"description": "PR merged", "reward": 10, "category": "merge"},
|
||||
"automation_failure": {"description": "Failure", "penalty": -2, "category": "ops"},
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_delta("pr_merged") == 10
|
||||
assert rules.get_delta("automation_failure") == -2
|
||||
|
||||
def test_get_delta_unknown_event(self, tmp_path):
|
||||
"""Return 0 for unknown events."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_delta("unknown_event") == 0
|
||||
|
||||
|
||||
class TestTokenRulesGetEvent:
|
||||
"""Test get_event method."""
|
||||
|
||||
def test_get_event_returns_full_config(self, tmp_path):
|
||||
"""Get full event configuration."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"pr_merged": {
|
||||
"description": "PR merged successfully",
|
||||
"reward": 10,
|
||||
"category": "merge",
|
||||
"gate_threshold": 0,
|
||||
}
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
event = rules.get_event("pr_merged")
|
||||
|
||||
assert event is not None
|
||||
assert event.name == "pr_merged"
|
||||
assert event.description == "PR merged successfully"
|
||||
assert event.reward == 10
|
||||
assert event.category == "merge"
|
||||
assert event.gate_threshold == 0
|
||||
|
||||
def test_get_event_unknown_returns_none(self, tmp_path):
|
||||
"""Return None for unknown event."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_event("unknown") is None
|
||||
|
||||
|
||||
class TestTokenRulesListEvents:
|
||||
"""Test list_events method."""
|
||||
|
||||
def test_list_all_events(self, tmp_path):
|
||||
"""List all configured events."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
|
||||
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
|
||||
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
events = rules.list_events()
|
||||
|
||||
assert len(events) == 3
|
||||
event_names = {e.name for e in events}
|
||||
assert "event_a" in event_names
|
||||
assert "event_b" in event_names
|
||||
assert "event_c" in event_names
|
||||
|
||||
def test_list_events_by_category(self, tmp_path):
|
||||
"""Filter events by category."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
|
||||
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
|
||||
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
events = rules.list_events(category="cat1")
|
||||
|
||||
assert len(events) == 2
|
||||
for event in events:
|
||||
assert event.category == "cat1"
|
||||
|
||||
|
||||
class TestTokenRulesGating:
|
||||
"""Test gating threshold methods."""
|
||||
|
||||
def test_check_gate_with_threshold(self, tmp_path):
|
||||
"""Check gate when threshold is defined."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {},
|
||||
"gating_thresholds": {"pr_merge": 50},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.check_gate("pr_merge", current_tokens=100) is True
|
||||
assert rules.check_gate("pr_merge", current_tokens=50) is True
|
||||
assert rules.check_gate("pr_merge", current_tokens=49) is False
|
||||
assert rules.check_gate("pr_merge", current_tokens=0) is False
|
||||
|
||||
def test_check_gate_no_threshold(self, tmp_path):
|
||||
"""Check gate when no threshold is defined (always allowed)."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {},
|
||||
"gating_thresholds": {},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
# No threshold defined, should always be allowed
|
||||
assert rules.check_gate("unknown_op", current_tokens=0) is True
|
||||
assert rules.check_gate("unknown_op", current_tokens=-100) is True
|
||||
|
||||
def test_get_gate_threshold(self, tmp_path):
|
||||
"""Get threshold value."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"gating_thresholds": {"pr_merge": 50, "sensitive_op": 100},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_gate_threshold("pr_merge") == 50
|
||||
assert rules.get_gate_threshold("sensitive_op") == 100
|
||||
assert rules.get_gate_threshold("unknown") is None
|
||||
|
||||
|
||||
class TestTokenRulesDailyLimits:
|
||||
"""Test daily limits methods."""
|
||||
|
||||
def test_get_daily_limits(self, tmp_path):
|
||||
"""Get daily limits for a category."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"daily_limits": {
|
||||
"triage": {"max_earn": 100, "max_spend": 0},
|
||||
"merge": {"max_earn": 50, "max_spend": 10},
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
triage_limits = rules.get_daily_limits("triage")
|
||||
assert triage_limits is not None
|
||||
assert triage_limits.max_earn == 100
|
||||
assert triage_limits.max_spend == 0
|
||||
|
||||
merge_limits = rules.get_daily_limits("merge")
|
||||
assert merge_limits is not None
|
||||
assert merge_limits.max_earn == 50
|
||||
assert merge_limits.max_spend == 10
|
||||
|
||||
def test_get_daily_limits_unknown(self, tmp_path):
|
||||
"""Return None for unknown category."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {"version": "1.0.0", "daily_limits": {}}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_daily_limits("unknown") is None
|
||||
|
||||
|
||||
class TestTokenRulesComputeTransaction:
|
||||
"""Test compute_transaction method."""
|
||||
|
||||
def test_compute_successful_transaction(self, tmp_path):
|
||||
"""Compute transaction for valid event."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"pr_merged": {"description": "PR merged", "reward": 10, "category": "merge"}
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
result = rules.compute_transaction("pr_merged", current_tokens=100)
|
||||
|
||||
assert result["event"] == "pr_merged"
|
||||
assert result["delta"] == 10
|
||||
assert result["category"] == "merge"
|
||||
assert result["allowed"] is True
|
||||
assert result["new_balance"] == 110
|
||||
assert result["limit_reached"] is False
|
||||
|
||||
def test_compute_unknown_event(self, tmp_path):
|
||||
"""Compute transaction for unknown event."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
result = rules.compute_transaction("unknown_event", current_tokens=50)
|
||||
|
||||
assert result["event"] == "unknown_event"
|
||||
assert result["delta"] == 0
|
||||
assert result["allowed"] is False
|
||||
assert result["reason"] == "unknown_event"
|
||||
assert result["new_balance"] == 50
|
||||
|
||||
def test_compute_with_gate_check(self, tmp_path):
|
||||
"""Compute transaction respects gating."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"sensitive_op": {
|
||||
"description": "Sensitive",
|
||||
"reward": 50,
|
||||
"category": "sensitive",
|
||||
"gate_threshold": 100,
|
||||
}
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
# With enough tokens
|
||||
result = rules.compute_transaction("sensitive_op", current_tokens=150)
|
||||
assert result["allowed"] is True
|
||||
|
||||
# Without enough tokens
|
||||
result = rules.compute_transaction("sensitive_op", current_tokens=50)
|
||||
assert result["allowed"] is False
|
||||
assert "gate_reason" in result
|
||||
|
||||
def test_compute_with_daily_limits(self, tmp_path):
|
||||
"""Compute transaction respects daily limits."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"triage_action": {
|
||||
"description": "Triage",
|
||||
"reward": 20,
|
||||
"category": "triage",
|
||||
}
|
||||
},
|
||||
"daily_limits": {"triage": {"max_earn": 50, "max_spend": 0}},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
# Within limit
|
||||
daily_earned = {"triage": 20}
|
||||
result = rules.compute_transaction(
|
||||
"triage_action", current_tokens=100, current_daily_earned=daily_earned
|
||||
)
|
||||
assert result["allowed"] is True
|
||||
assert result["limit_reached"] is False
|
||||
|
||||
# Would exceed limit (20 + 20 > 50 is false, so this should be fine)
|
||||
# Let's test with higher current earned
|
||||
daily_earned = {"triage": 40}
|
||||
result = rules.compute_transaction(
|
||||
"triage_action", current_tokens=100, current_daily_earned=daily_earned
|
||||
)
|
||||
assert result["allowed"] is False
|
||||
assert result["limit_reached"] is True
|
||||
assert "limit_reason" in result
|
||||
|
||||
|
||||
class TestTokenRulesCategories:
|
||||
"""Test category methods."""
|
||||
|
||||
def test_get_categories(self, tmp_path):
|
||||
"""Get all unique categories."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
|
||||
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
|
||||
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
categories = rules.get_categories()
|
||||
|
||||
assert sorted(categories) == ["cat1", "cat2"]
|
||||
|
||||
|
||||
class TestTokenRulesAudit:
|
||||
"""Test audit methods."""
|
||||
|
||||
def test_is_auditable_true(self, tmp_path):
|
||||
"""Check if auditable when enabled."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {"version": "1.0.0", "audit": {"log_all_transactions": True}}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
assert rules.is_auditable() is True
|
||||
|
||||
def test_is_auditable_false(self, tmp_path):
|
||||
"""Check if auditable when disabled."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {"version": "1.0.0", "audit": {"log_all_transactions": False}}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
assert rules.is_auditable() is False
|
||||
|
||||
|
||||
class TestConvenienceFunctions:
|
||||
"""Test module-level convenience functions."""
|
||||
|
||||
def test_get_token_delta(self, tmp_path):
|
||||
"""Convenience function returns delta."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
|
||||
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
||||
delta = tr.get_token_delta("pr_merged")
|
||||
assert delta == 10 # From fallback
|
||||
|
||||
def test_check_operation_gate(self, tmp_path):
|
||||
"""Convenience function checks gate."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
|
||||
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
||||
# Fallback has pr_merge gate at 0
|
||||
assert tr.check_operation_gate("pr_merge", current_tokens=0) is True
|
||||
assert tr.check_operation_gate("pr_merge", current_tokens=100) is True
|
||||
|
||||
def test_compute_token_reward(self, tmp_path):
|
||||
"""Convenience function computes reward."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
|
||||
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
||||
result = tr.compute_token_reward("pr_merged", current_tokens=50)
|
||||
assert result["event"] == "pr_merged"
|
||||
assert result["delta"] == 10
|
||||
assert result["new_balance"] == 60
|
||||
|
||||
def test_list_token_events(self, tmp_path):
|
||||
"""Convenience function lists events."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
|
||||
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
||||
events = tr.list_token_events()
|
||||
assert len(events) >= 3 # Fallback has at least 3 events
|
||||
|
||||
# Check structure
|
||||
for event in events:
|
||||
assert "name" in event
|
||||
assert "description" in event
|
||||
assert "delta" in event
|
||||
assert "category" in event
|
||||
343
tests/timmy_automations/test_weekly_narrative.py
Normal file
343
tests/timmy_automations/test_weekly_narrative.py
Normal file
@@ -0,0 +1,343 @@
|
||||
"""Tests for weekly_narrative.py script."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
# Add timmy_automations to path for imports
|
||||
sys.path.insert(
|
||||
0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations" / "daily_run")
|
||||
)
|
||||
|
||||
import weekly_narrative as wn
|
||||
|
||||
|
||||
class TestParseTimestamp:
|
||||
"""Test timestamp parsing."""
|
||||
|
||||
def test_parse_iso_with_z(self):
|
||||
"""Parse ISO timestamp with Z suffix."""
|
||||
result = wn.parse_ts("2026-03-21T12:00:00Z")
|
||||
assert result is not None
|
||||
assert result.year == 2026
|
||||
assert result.month == 3
|
||||
assert result.day == 21
|
||||
|
||||
def test_parse_iso_with_offset(self):
|
||||
"""Parse ISO timestamp with timezone offset."""
|
||||
result = wn.parse_ts("2026-03-21T12:00:00+00:00")
|
||||
assert result is not None
|
||||
assert result.year == 2026
|
||||
|
||||
def test_parse_empty_string(self):
|
||||
"""Empty string returns None."""
|
||||
result = wn.parse_ts("")
|
||||
assert result is None
|
||||
|
||||
def test_parse_invalid_string(self):
|
||||
"""Invalid string returns None."""
|
||||
result = wn.parse_ts("not-a-timestamp")
|
||||
assert result is None
|
||||
|
||||
|
||||
class TestCollectCyclesData:
|
||||
"""Test cycle data collection."""
|
||||
|
||||
def test_no_cycles_file(self, tmp_path):
|
||||
"""Handle missing cycles file gracefully."""
|
||||
with patch.object(wn, "REPO_ROOT", tmp_path):
|
||||
since = datetime.now(UTC) - timedelta(days=7)
|
||||
result = wn.collect_cycles_data(since)
|
||||
assert result["total"] == 0
|
||||
assert result["successes"] == 0
|
||||
assert result["failures"] == 0
|
||||
|
||||
def test_collect_recent_cycles(self, tmp_path):
|
||||
"""Collect cycles within lookback period."""
|
||||
retro_dir = tmp_path / ".loop" / "retro"
|
||||
retro_dir.mkdir(parents=True)
|
||||
|
||||
now = datetime.now(UTC)
|
||||
cycles = [
|
||||
{"timestamp": now.isoformat(), "success": True, "cycle": 1},
|
||||
{"timestamp": now.isoformat(), "success": False, "cycle": 2},
|
||||
{"timestamp": (now - timedelta(days=10)).isoformat(), "success": True, "cycle": 3},
|
||||
]
|
||||
|
||||
with open(retro_dir / "cycles.jsonl", "w") as f:
|
||||
for c in cycles:
|
||||
f.write(json.dumps(c) + "\n")
|
||||
|
||||
with patch.object(wn, "REPO_ROOT", tmp_path):
|
||||
since = now - timedelta(days=7)
|
||||
result = wn.collect_cycles_data(since)
|
||||
assert result["total"] == 2 # Only recent 2
|
||||
assert result["successes"] == 1
|
||||
assert result["failures"] == 1
|
||||
|
||||
|
||||
class TestExtractThemes:
|
||||
"""Test theme extraction from issues."""
|
||||
|
||||
def test_extract_layer_labels(self):
|
||||
"""Extract layer labels from issues."""
|
||||
issues = [
|
||||
{"labels": [{"name": "layer:triage"}, {"name": "bug"}]},
|
||||
{"labels": [{"name": "layer:tests"}, {"name": "bug"}]},
|
||||
{"labels": [{"name": "layer:triage"}, {"name": "feature"}]},
|
||||
]
|
||||
|
||||
result = wn.extract_themes(issues)
|
||||
|
||||
assert len(result["layers"]) == 2
|
||||
layer_names = {layer["name"] for layer in result["layers"]}
|
||||
assert "triage" in layer_names
|
||||
assert "tests" in layer_names
|
||||
|
||||
def test_extract_type_labels(self):
|
||||
"""Extract type labels (bug/feature/etc)."""
|
||||
issues = [
|
||||
{"labels": [{"name": "bug"}]},
|
||||
{"labels": [{"name": "feature"}]},
|
||||
{"labels": [{"name": "bug"}]},
|
||||
]
|
||||
|
||||
result = wn.extract_themes(issues)
|
||||
|
||||
type_names = {t_type["name"] for t_type in result["types"]}
|
||||
assert "bug" in type_names
|
||||
assert "feature" in type_names
|
||||
|
||||
def test_empty_issues(self):
|
||||
"""Handle empty issue list."""
|
||||
result = wn.extract_themes([])
|
||||
assert result["layers"] == []
|
||||
assert result["types"] == []
|
||||
assert result["top_labels"] == []
|
||||
|
||||
|
||||
class TestExtractAgentContributions:
|
||||
"""Test agent contribution extraction."""
|
||||
|
||||
def test_extract_assignees(self):
|
||||
"""Extract assignee counts."""
|
||||
issues = [
|
||||
{"assignee": {"login": "kimi"}},
|
||||
{"assignee": {"login": "hermes"}},
|
||||
{"assignee": {"login": "kimi"}},
|
||||
]
|
||||
|
||||
result = wn.extract_agent_contributions(issues, [], [])
|
||||
|
||||
assert len(result["active_assignees"]) == 2
|
||||
assignee_logins = {a["login"] for a in result["active_assignees"]} # noqa: E741
|
||||
assert "kimi" in assignee_logins
|
||||
assert "hermes" in assignee_logins
|
||||
|
||||
def test_extract_pr_authors(self):
|
||||
"""Extract PR author counts."""
|
||||
prs = [
|
||||
{"user": {"login": "kimi"}},
|
||||
{"user": {"login": "claude"}},
|
||||
{"user": {"login": "kimi"}},
|
||||
]
|
||||
|
||||
result = wn.extract_agent_contributions([], prs, [])
|
||||
|
||||
assert len(result["pr_authors"]) == 2
|
||||
|
||||
def test_kimi_mentions_in_cycles(self):
|
||||
"""Count Kimi mentions in cycle notes."""
|
||||
cycles = [
|
||||
{"notes": "Kimi did great work", "reason": ""},
|
||||
{"notes": "", "reason": "Kimi timeout"},
|
||||
{"notes": "All good", "reason": ""},
|
||||
]
|
||||
|
||||
result = wn.extract_agent_contributions([], [], cycles)
|
||||
assert result["kimi_mentioned_cycles"] == 2
|
||||
|
||||
|
||||
class TestAnalyzeTestShifts:
|
||||
"""Test test pattern analysis."""
|
||||
|
||||
def test_no_cycles(self):
|
||||
"""Handle no cycle data."""
|
||||
result = wn.analyze_test_shifts([])
|
||||
assert "note" in result
|
||||
|
||||
def test_test_metrics(self):
|
||||
"""Calculate test metrics from cycles."""
|
||||
cycles = [
|
||||
{"tests_passed": 100, "tests_added": 5},
|
||||
{"tests_passed": 150, "tests_added": 3},
|
||||
]
|
||||
|
||||
result = wn.analyze_test_shifts(cycles)
|
||||
|
||||
assert result["total_tests_passed"] == 250
|
||||
assert result["total_tests_added"] == 8
|
||||
|
||||
|
||||
class TestGenerateVibeSummary:
|
||||
"""Test vibe summary generation."""
|
||||
|
||||
def test_productive_vibe(self):
|
||||
"""High success rate and activity = productive vibe."""
|
||||
cycles_data = {"success_rate": 0.95, "successes": 10, "failures": 1}
|
||||
issues_data = {"closed_count": 5}
|
||||
|
||||
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
|
||||
|
||||
assert result["overall"] == "productive"
|
||||
assert "strong week" in result["description"].lower()
|
||||
|
||||
def test_struggling_vibe(self):
|
||||
"""More failures than successes = struggling vibe."""
|
||||
cycles_data = {"success_rate": 0.3, "successes": 3, "failures": 7}
|
||||
issues_data = {"closed_count": 0}
|
||||
|
||||
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
|
||||
|
||||
assert result["overall"] == "struggling"
|
||||
|
||||
def test_quiet_vibe(self):
|
||||
"""Low activity = quiet vibe."""
|
||||
cycles_data = {"success_rate": 0.0, "successes": 0, "failures": 0}
|
||||
issues_data = {"closed_count": 0}
|
||||
|
||||
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
|
||||
|
||||
assert result["overall"] == "quiet"
|
||||
|
||||
|
||||
class TestGenerateMarkdownSummary:
|
||||
"""Test markdown summary generation."""
|
||||
|
||||
def test_includes_header(self):
|
||||
"""Markdown includes header."""
|
||||
narrative = {
|
||||
"period": {"start": "2026-03-14T00:00:00", "end": "2026-03-21T00:00:00"},
|
||||
"vibe": {"overall": "productive", "description": "Good week"},
|
||||
"activity": {
|
||||
"cycles": {"total": 10, "successes": 9, "failures": 1},
|
||||
"issues": {"closed": 5, "opened": 3},
|
||||
"pull_requests": {"merged": 4, "opened": 2},
|
||||
},
|
||||
}
|
||||
|
||||
result = wn.generate_markdown_summary(narrative)
|
||||
|
||||
assert "# Weekly Narrative Summary" in result
|
||||
assert "productive" in result.lower()
|
||||
assert "10 total" in result or "10" in result
|
||||
|
||||
def test_includes_focus_areas(self):
|
||||
"""Markdown includes focus areas when present."""
|
||||
narrative = {
|
||||
"period": {"start": "2026-03-14", "end": "2026-03-21"},
|
||||
"vibe": {
|
||||
"overall": "productive",
|
||||
"description": "Good week",
|
||||
"focus_areas": ["triage (5 items)", "tests (3 items)"],
|
||||
},
|
||||
"activity": {
|
||||
"cycles": {"total": 0, "successes": 0, "failures": 0},
|
||||
"issues": {"closed": 0, "opened": 0},
|
||||
"pull_requests": {"merged": 0, "opened": 0},
|
||||
},
|
||||
}
|
||||
|
||||
result = wn.generate_markdown_summary(narrative)
|
||||
|
||||
assert "Focus Areas" in result
|
||||
assert "triage" in result
|
||||
|
||||
|
||||
class TestConfigLoading:
|
||||
"""Test configuration loading."""
|
||||
|
||||
def test_default_config(self, tmp_path):
|
||||
"""Default config when manifest missing."""
|
||||
with patch.object(wn, "CONFIG_PATH", tmp_path / "nonexistent.json"):
|
||||
config = wn.load_automation_config()
|
||||
assert config["lookback_days"] == 7
|
||||
assert config["enabled"] is True
|
||||
|
||||
def test_environment_override(self, tmp_path):
|
||||
"""Environment variables override config."""
|
||||
with patch.dict("os.environ", {"TIMMY_WEEKLY_NARRATIVE_ENABLED": "false"}):
|
||||
with patch.object(wn, "CONFIG_PATH", tmp_path / "nonexistent.json"):
|
||||
config = wn.load_automation_config()
|
||||
assert config["enabled"] is False
|
||||
|
||||
|
||||
class TestMain:
|
||||
"""Test main function."""
|
||||
|
||||
def test_disabled_exits_cleanly(self, tmp_path):
|
||||
"""When disabled and no --force, exits cleanly."""
|
||||
with patch.object(wn, "REPO_ROOT", tmp_path):
|
||||
with patch.object(wn, "load_automation_config", return_value={"enabled": False}):
|
||||
with patch("sys.argv", ["weekly_narrative"]):
|
||||
result = wn.main()
|
||||
assert result == 0
|
||||
|
||||
def test_force_runs_when_disabled(self, tmp_path):
|
||||
"""--force runs even when disabled."""
|
||||
# Setup minimal structure
|
||||
(tmp_path / ".loop" / "retro").mkdir(parents=True)
|
||||
|
||||
with patch.object(wn, "REPO_ROOT", tmp_path):
|
||||
with patch.object(
|
||||
wn,
|
||||
"load_automation_config",
|
||||
return_value={
|
||||
"enabled": False,
|
||||
"lookback_days": 7,
|
||||
"gitea_api": "http://localhost:3000/api/v1",
|
||||
"repo_slug": "test/repo",
|
||||
"token_file": "~/.hermes/gitea_token",
|
||||
},
|
||||
):
|
||||
with patch.object(wn, "GiteaClient") as mock_client:
|
||||
mock_instance = MagicMock()
|
||||
mock_instance.is_available.return_value = False
|
||||
mock_client.return_value = mock_instance
|
||||
|
||||
with patch("sys.argv", ["weekly_narrative", "--force"]):
|
||||
result = wn.main()
|
||||
# Should complete without error even though Gitea unavailable
|
||||
assert result == 0
|
||||
|
||||
|
||||
class TestGiteaClient:
|
||||
"""Test Gitea API client."""
|
||||
|
||||
def test_is_available_when_unavailable(self):
|
||||
"""is_available returns False when server down."""
|
||||
config = {"gitea_api": "http://localhost:99999", "repo_slug": "test/repo"}
|
||||
client = wn.GiteaClient(config, None)
|
||||
|
||||
# Should return False without raising
|
||||
assert client.is_available() is False
|
||||
|
||||
def test_headers_with_token(self):
|
||||
"""Headers include Authorization when token provided."""
|
||||
config = {"gitea_api": "http://localhost:3000", "repo_slug": "test/repo"}
|
||||
client = wn.GiteaClient(config, "test-token")
|
||||
|
||||
headers = client._headers()
|
||||
assert headers["Authorization"] == "token test-token"
|
||||
|
||||
def test_headers_without_token(self):
|
||||
"""Headers don't include Authorization when no token."""
|
||||
config = {"gitea_api": "http://localhost:3000", "repo_slug": "test/repo"}
|
||||
client = wn.GiteaClient(config, None)
|
||||
|
||||
headers = client._headers()
|
||||
assert "Authorization" not in headers
|
||||
@@ -1,294 +0,0 @@
|
||||
"""Unit tests for the stress detector module.
|
||||
|
||||
Tests stress signal calculation, mode detection, multipliers,
|
||||
and integration with the quest system.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.stress_detector import (
|
||||
StressMode,
|
||||
StressSignal,
|
||||
StressSnapshot,
|
||||
StressThresholds,
|
||||
_calculate_stress_score,
|
||||
_get_multipliers_for_mode,
|
||||
apply_multiplier,
|
||||
get_default_config,
|
||||
reset_stress_state,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def clean_stress_state():
|
||||
"""Reset stress state between tests."""
|
||||
reset_stress_state()
|
||||
yield
|
||||
reset_stress_state()
|
||||
|
||||
|
||||
# ── Stress Mode Tests ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStressMode:
|
||||
def test_stress_mode_values(self):
|
||||
"""StressMode enum has expected values."""
|
||||
assert StressMode.CALM.value == "calm"
|
||||
assert StressMode.ELEVATED.value == "elevated"
|
||||
assert StressMode.HIGH.value == "high"
|
||||
|
||||
|
||||
# ── Stress Signal Tests ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStressSignal:
|
||||
def test_signal_not_triggered(self):
|
||||
"""Signal with value below threshold is not triggered."""
|
||||
signal = StressSignal(
|
||||
name="test_signal",
|
||||
value=5.0,
|
||||
threshold=10.0,
|
||||
weight=0.5,
|
||||
)
|
||||
assert not signal.is_triggered
|
||||
assert signal.contribution == 0.0
|
||||
|
||||
def test_signal_triggered(self):
|
||||
"""Signal with value at threshold is triggered."""
|
||||
signal = StressSignal(
|
||||
name="test_signal",
|
||||
value=10.0,
|
||||
threshold=10.0,
|
||||
weight=0.5,
|
||||
)
|
||||
assert signal.is_triggered
|
||||
assert signal.contribution == 0.5 # weight * min(1, value/threshold)
|
||||
|
||||
def test_signal_contribution_capped(self):
|
||||
"""Signal contribution is capped at weight when value >> threshold."""
|
||||
signal = StressSignal(
|
||||
name="test_signal",
|
||||
value=100.0,
|
||||
threshold=10.0,
|
||||
weight=0.5,
|
||||
)
|
||||
assert signal.is_triggered
|
||||
assert signal.contribution == 0.5 # Capped at weight
|
||||
|
||||
def test_signal_partial_contribution(self):
|
||||
"""Signal contribution scales with value/threshold ratio."""
|
||||
signal = StressSignal(
|
||||
name="test_signal",
|
||||
value=15.0,
|
||||
threshold=10.0,
|
||||
weight=0.5,
|
||||
)
|
||||
assert signal.is_triggered
|
||||
# contribution = min(1, 15/10) * 0.5 = 0.5 (capped)
|
||||
assert signal.contribution == 0.5
|
||||
|
||||
|
||||
# ── Stress Thresholds Tests ────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStressThresholds:
|
||||
def test_calm_mode(self):
|
||||
"""Score below elevated_min returns CALM mode."""
|
||||
thresholds = StressThresholds(elevated_min=0.3, high_min=0.6)
|
||||
assert thresholds.get_mode_for_score(0.0) == StressMode.CALM
|
||||
assert thresholds.get_mode_for_score(0.1) == StressMode.CALM
|
||||
assert thresholds.get_mode_for_score(0.29) == StressMode.CALM
|
||||
|
||||
def test_elevated_mode(self):
|
||||
"""Score between elevated_min and high_min returns ELEVATED mode."""
|
||||
thresholds = StressThresholds(elevated_min=0.3, high_min=0.6)
|
||||
assert thresholds.get_mode_for_score(0.3) == StressMode.ELEVATED
|
||||
assert thresholds.get_mode_for_score(0.5) == StressMode.ELEVATED
|
||||
assert thresholds.get_mode_for_score(0.59) == StressMode.ELEVATED
|
||||
|
||||
def test_high_mode(self):
|
||||
"""Score at or above high_min returns HIGH mode."""
|
||||
thresholds = StressThresholds(elevated_min=0.3, high_min=0.6)
|
||||
assert thresholds.get_mode_for_score(0.6) == StressMode.HIGH
|
||||
assert thresholds.get_mode_for_score(0.8) == StressMode.HIGH
|
||||
assert thresholds.get_mode_for_score(1.0) == StressMode.HIGH
|
||||
|
||||
|
||||
# ── Stress Score Calculation Tests ─────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStressScoreCalculation:
|
||||
def test_empty_signals(self):
|
||||
"""Empty signal list returns zero stress score."""
|
||||
score = _calculate_stress_score([])
|
||||
assert score == 0.0
|
||||
|
||||
def test_no_triggered_signals(self):
|
||||
"""No triggered signals means zero stress score."""
|
||||
signals = [
|
||||
StressSignal(name="s1", value=1.0, threshold=10.0, weight=0.5),
|
||||
StressSignal(name="s2", value=2.0, threshold=10.0, weight=0.5),
|
||||
]
|
||||
score = _calculate_stress_score(signals)
|
||||
assert score == 0.0
|
||||
|
||||
def test_single_triggered_signal(self):
|
||||
"""Single triggered signal contributes its weight."""
|
||||
signals = [
|
||||
StressSignal(name="s1", value=10.0, threshold=10.0, weight=0.5),
|
||||
]
|
||||
score = _calculate_stress_score(signals)
|
||||
# contribution = 0.5, total_weight = 0.5, score = 0.5/0.5 = 1.0
|
||||
assert score == 1.0
|
||||
|
||||
def test_mixed_signals(self):
|
||||
"""Mix of triggered and non-triggered signals."""
|
||||
signals = [
|
||||
StressSignal(name="s1", value=10.0, threshold=10.0, weight=0.3),
|
||||
StressSignal(name="s2", value=1.0, threshold=10.0, weight=0.3),
|
||||
StressSignal(name="s3", value=10.0, threshold=10.0, weight=0.4),
|
||||
]
|
||||
score = _calculate_stress_score(signals)
|
||||
# triggered contributions: 0.3 + 0.4 = 0.7
|
||||
# total_weight: 0.3 + 0.3 + 0.4 = 1.0
|
||||
# score = 0.7 / 1.0 = 0.7
|
||||
assert score == 0.7
|
||||
|
||||
def test_score_capped_at_one(self):
|
||||
"""Stress score is capped at 1.0."""
|
||||
signals = [
|
||||
StressSignal(name="s1", value=100.0, threshold=10.0, weight=1.0),
|
||||
StressSignal(name="s2", value=100.0, threshold=10.0, weight=1.0),
|
||||
]
|
||||
score = _calculate_stress_score(signals)
|
||||
assert score == 1.0 # Capped
|
||||
|
||||
|
||||
# ── Multiplier Tests ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestMultipliers:
|
||||
def test_default_config_structure(self):
|
||||
"""Default config has expected structure."""
|
||||
config = get_default_config()
|
||||
assert "thresholds" in config
|
||||
assert "signals" in config
|
||||
assert "multipliers" in config
|
||||
|
||||
def test_calm_mode_multipliers(self):
|
||||
"""Calm mode has expected multipliers."""
|
||||
multipliers = _get_multipliers_for_mode(StressMode.CALM)
|
||||
assert multipliers["test_improve"] == 1.0
|
||||
assert multipliers["docs_update"] == 1.2
|
||||
assert multipliers["exploration"] == 1.3
|
||||
assert multipliers["refactor"] == 1.2
|
||||
|
||||
def test_elevated_mode_multipliers(self):
|
||||
"""Elevated mode has expected multipliers."""
|
||||
multipliers = _get_multipliers_for_mode(StressMode.ELEVATED)
|
||||
assert multipliers["test_improve"] == 1.2
|
||||
assert multipliers["issue_reduce"] == 1.1
|
||||
assert multipliers["refactor"] == 0.9
|
||||
|
||||
def test_high_mode_multipliers(self):
|
||||
"""High stress mode has expected multipliers."""
|
||||
multipliers = _get_multipliers_for_mode(StressMode.HIGH)
|
||||
assert multipliers["test_improve"] == 1.5
|
||||
assert multipliers["issue_reduce"] == 1.4
|
||||
assert multipliers["exploration"] == 0.7
|
||||
assert multipliers["refactor"] == 0.6
|
||||
|
||||
def test_multiplier_fallback_for_unknown_type(self):
|
||||
"""Unknown quest types return default multiplier of 1.0."""
|
||||
multipliers = _get_multipliers_for_mode(StressMode.CALM)
|
||||
assert multipliers.get("unknown_type", 1.0) == 1.0
|
||||
|
||||
|
||||
# ── Apply Multiplier Tests ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestApplyMultiplier:
|
||||
def test_apply_multiplier_calm(self):
|
||||
"""Multiplier applies correctly in calm mode."""
|
||||
# This test uses get_multiplier which reads from current stress mode
|
||||
# Since we can't easily mock the stress mode, we test the apply_multiplier logic
|
||||
base = 100
|
||||
# In calm mode with test_improve = 1.0
|
||||
result = apply_multiplier(base, "unknown_type")
|
||||
assert result >= 1 # At least 1 token
|
||||
|
||||
def test_apply_multiplier_minimum_one(self):
|
||||
"""Applied reward is at least 1 token."""
|
||||
# Even with very low multiplier, result should be >= 1
|
||||
result = apply_multiplier(1, "any_type")
|
||||
assert result >= 1
|
||||
|
||||
|
||||
# ── Stress Snapshot Tests ──────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStressSnapshot:
|
||||
def test_snapshot_to_dict(self):
|
||||
"""Snapshot can be converted to dictionary."""
|
||||
signals = [
|
||||
StressSignal(name="test", value=10.0, threshold=5.0, weight=0.5),
|
||||
]
|
||||
snapshot = StressSnapshot(
|
||||
mode=StressMode.ELEVATED,
|
||||
score=0.5,
|
||||
signals=signals,
|
||||
multipliers={"test_improve": 1.2},
|
||||
)
|
||||
|
||||
data = snapshot.to_dict()
|
||||
assert data["mode"] == "elevated"
|
||||
assert data["score"] == 0.5
|
||||
assert len(data["signals"]) == 1
|
||||
assert data["multipliers"]["test_improve"] == 1.2
|
||||
|
||||
|
||||
# ── Integration Tests ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStressDetectorIntegration:
|
||||
def test_reset_stress_state(self):
|
||||
"""Reset clears internal state."""
|
||||
# Just verify reset doesn't error
|
||||
reset_stress_state()
|
||||
|
||||
def test_default_config_contains_all_signals(self):
|
||||
"""Default config defines all expected signals."""
|
||||
config = get_default_config()
|
||||
signals = config["signals"]
|
||||
|
||||
expected_signals = [
|
||||
"flaky_test_rate",
|
||||
"p1_backlog_growth",
|
||||
"ci_failure_rate",
|
||||
"open_bug_count",
|
||||
]
|
||||
|
||||
for signal in expected_signals:
|
||||
assert signal in signals
|
||||
assert "threshold" in signals[signal]
|
||||
assert "weight" in signals[signal]
|
||||
|
||||
def test_default_config_contains_all_modes(self):
|
||||
"""Default config defines all stress modes."""
|
||||
config = get_default_config()
|
||||
multipliers = config["multipliers"]
|
||||
|
||||
assert "calm" in multipliers
|
||||
assert "elevated" in multipliers
|
||||
assert "high" in multipliers
|
||||
|
||||
def test_multiplier_weights_sum_approximately_one(self):
|
||||
"""Signal weights should approximately sum to 1.0."""
|
||||
config = get_default_config()
|
||||
signals = config["signals"]
|
||||
|
||||
total_weight = sum(s["weight"] for s in signals.values())
|
||||
# Allow some flexibility but should be close to 1.0
|
||||
assert 0.9 <= total_weight <= 1.1
|
||||
@@ -228,6 +228,27 @@
|
||||
"max_items": 5
|
||||
},
|
||||
"outputs": []
|
||||
},
|
||||
{
|
||||
"id": "weekly_narrative",
|
||||
"name": "Weekly Narrative Summary",
|
||||
"description": "Generates a human-readable weekly summary of work themes, agent contributions, and token economy shifts",
|
||||
"script": "timmy_automations/daily_run/weekly_narrative.py",
|
||||
"category": "daily_run",
|
||||
"enabled": true,
|
||||
"trigger": "scheduled",
|
||||
"schedule": "weekly",
|
||||
"executable": "python3",
|
||||
"config": {
|
||||
"lookback_days": 7,
|
||||
"output_file": ".loop/weekly_narrative.json",
|
||||
"gitea_api": "http://localhost:3000/api/v1",
|
||||
"repo_slug": "rockachopa/Timmy-time-dashboard"
|
||||
},
|
||||
"outputs": [
|
||||
".loop/weekly_narrative.json",
|
||||
".loop/weekly_narrative.md"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -17,6 +17,10 @@
|
||||
"manual": {
|
||||
"description": "Run on-demand only",
|
||||
"automations": ["agent_workspace", "kimi_bootstrap", "kimi_resume", "backfill_retro"]
|
||||
},
|
||||
"weekly": {
|
||||
"description": "Run once per week (Sundays)",
|
||||
"automations": ["weekly_narrative"]
|
||||
}
|
||||
},
|
||||
"triggers": {
|
||||
|
||||
138
timmy_automations/config/token_rules.yaml
Normal file
138
timmy_automations/config/token_rules.yaml
Normal file
@@ -0,0 +1,138 @@
|
||||
# Token Rules — Agent reward/penalty configuration for automations
|
||||
#
|
||||
# This file defines the token economy for agent actions.
|
||||
# Modify values here to adjust incentives without code changes.
|
||||
#
|
||||
# Used by: timmy_automations.utils.token_rules
|
||||
|
||||
version: "1.0.0"
|
||||
description: "Token economy rules for agent automations"
|
||||
|
||||
# ── Events ─────────────────────────────────────────────────────────────────
|
||||
# Each event type defines rewards/penalties and optional gating thresholds
|
||||
|
||||
events:
|
||||
# Triage actions
|
||||
triage_success:
|
||||
description: "Successfully triaged an issue (scored and categorized)"
|
||||
reward: 5
|
||||
category: "triage"
|
||||
|
||||
deep_triage_refinement:
|
||||
description: "LLM-driven issue refinement with acceptance criteria added"
|
||||
reward: 20
|
||||
category: "triage"
|
||||
|
||||
quarantine_candidate_found:
|
||||
description: "Identified a repeat failure issue for quarantine"
|
||||
reward: 10
|
||||
category: "triage"
|
||||
|
||||
# Daily Run completions
|
||||
daily_run_completed:
|
||||
description: "Completed a daily run cycle successfully"
|
||||
reward: 5
|
||||
category: "daily_run"
|
||||
|
||||
golden_path_generated:
|
||||
description: "Generated a coherent mini-session plan"
|
||||
reward: 3
|
||||
category: "daily_run"
|
||||
|
||||
weekly_narrative_created:
|
||||
description: "Generated weekly summary of work themes"
|
||||
reward: 15
|
||||
category: "daily_run"
|
||||
|
||||
# PR merges
|
||||
pr_merged:
|
||||
description: "Successfully merged a pull request"
|
||||
reward: 10
|
||||
category: "merge"
|
||||
# Gating: requires minimum tokens to perform
|
||||
gate_threshold: 0
|
||||
|
||||
pr_merged_with_tests:
|
||||
description: "Merged PR with all tests passing"
|
||||
reward: 15
|
||||
category: "merge"
|
||||
gate_threshold: 0
|
||||
|
||||
# Test fixes
|
||||
test_fixed:
|
||||
description: "Fixed a failing test"
|
||||
reward: 8
|
||||
category: "test"
|
||||
|
||||
test_added:
|
||||
description: "Added new test coverage"
|
||||
reward: 5
|
||||
category: "test"
|
||||
|
||||
critical_bug_fixed:
|
||||
description: "Fixed a critical bug on main"
|
||||
reward: 25
|
||||
category: "test"
|
||||
|
||||
# General operations
|
||||
automation_run:
|
||||
description: "Ran any automation (resource usage)"
|
||||
penalty: -1
|
||||
category: "operation"
|
||||
|
||||
automation_failure:
|
||||
description: "Automation failed or produced error"
|
||||
penalty: -2
|
||||
category: "operation"
|
||||
|
||||
cycle_retro_logged:
|
||||
description: "Logged structured retrospective data"
|
||||
reward: 5
|
||||
category: "operation"
|
||||
|
||||
pre_commit_passed:
|
||||
description: "Pre-commit checks passed"
|
||||
reward: 2
|
||||
category: "operation"
|
||||
|
||||
pre_commit_failed:
|
||||
description: "Pre-commit checks failed"
|
||||
penalty: -1
|
||||
category: "operation"
|
||||
|
||||
# ── Gating Thresholds ──────────────────────────────────────────────────────
|
||||
# Minimum token balances required for sensitive operations
|
||||
|
||||
gating_thresholds:
|
||||
pr_merge: 0
|
||||
sensitive_config_change: 50
|
||||
agent_workspace_create: 10
|
||||
deep_triage_run: 0
|
||||
|
||||
# ── Daily Limits ───────────────────────────────────────────────────────────
|
||||
# Maximum tokens that can be earned/spent per category per day
|
||||
|
||||
daily_limits:
|
||||
triage:
|
||||
max_earn: 100
|
||||
max_spend: 0
|
||||
daily_run:
|
||||
max_earn: 50
|
||||
max_spend: 0
|
||||
merge:
|
||||
max_earn: 100
|
||||
max_spend: 0
|
||||
test:
|
||||
max_earn: 100
|
||||
max_spend: 0
|
||||
operation:
|
||||
max_earn: 50
|
||||
max_spend: 50
|
||||
|
||||
# ── Audit Settings ─────────────────────────────────────────────────────────
|
||||
# Settings for token audit and inspection
|
||||
|
||||
audit:
|
||||
log_all_transactions: true
|
||||
log_retention_days: 30
|
||||
inspectable_by: ["orchestrator", "auditor", "timmy"]
|
||||
@@ -22,6 +22,14 @@ from typing import Any
|
||||
from urllib.request import Request, urlopen
|
||||
from urllib.error import HTTPError, URLError
|
||||
|
||||
# ── Token Economy Integration ──────────────────────────────────────────────
|
||||
# Import token rules helpers for tracking Daily Run rewards
|
||||
|
||||
sys.path.insert(
|
||||
0, str(Path(__file__).resolve().parent.parent)
|
||||
)
|
||||
from utils.token_rules import TokenRules, compute_token_reward
|
||||
|
||||
# ── Configuration ─────────────────────────────────────────────────────────
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
|
||||
@@ -490,6 +498,43 @@ def parse_args() -> argparse.Namespace:
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def compute_daily_run_tokens(success: bool = True) -> dict[str, Any]:
|
||||
"""Compute token rewards for Daily Run completion.
|
||||
|
||||
Uses the centralized token_rules configuration to calculate
|
||||
rewards/penalties for automation actions.
|
||||
|
||||
Args:
|
||||
success: Whether the Daily Run completed successfully
|
||||
|
||||
Returns:
|
||||
Token transaction details
|
||||
"""
|
||||
rules = TokenRules()
|
||||
|
||||
if success:
|
||||
# Daily run completed successfully
|
||||
transaction = compute_token_reward("daily_run_completed", current_tokens=0)
|
||||
|
||||
# Also compute golden path generation if agenda was created
|
||||
agenda_transaction = compute_token_reward("golden_path_generated", current_tokens=0)
|
||||
|
||||
return {
|
||||
"daily_run": transaction,
|
||||
"golden_path": agenda_transaction,
|
||||
"total_delta": transaction.get("delta", 0) + agenda_transaction.get("delta", 0),
|
||||
"config_version": rules.get_config_version(),
|
||||
}
|
||||
else:
|
||||
# Automation failed
|
||||
transaction = compute_token_reward("automation_failure", current_tokens=0)
|
||||
return {
|
||||
"automation_failure": transaction,
|
||||
"total_delta": transaction.get("delta", 0),
|
||||
"config_version": rules.get_config_version(),
|
||||
}
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
config = load_config()
|
||||
@@ -503,10 +548,13 @@ def main() -> int:
|
||||
# Check Gitea availability
|
||||
if not client.is_available():
|
||||
error_msg = "[orchestrator] Error: Gitea API is not available"
|
||||
# Compute failure tokens even when unavailable
|
||||
tokens = compute_daily_run_tokens(success=False)
|
||||
if args.json:
|
||||
print(json.dumps({"error": error_msg}))
|
||||
print(json.dumps({"error": error_msg, "tokens": tokens}))
|
||||
else:
|
||||
print(error_msg, file=sys.stderr)
|
||||
print(f"[tokens] Failure penalty: {tokens['total_delta']}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
# Fetch candidates and generate agenda
|
||||
@@ -521,9 +569,12 @@ def main() -> int:
|
||||
cycles = load_cycle_data()
|
||||
day_summary = generate_day_summary(activity, cycles)
|
||||
|
||||
# Compute token rewards for successful completion
|
||||
tokens = compute_daily_run_tokens(success=True)
|
||||
|
||||
# Output
|
||||
if args.json:
|
||||
output = {"agenda": agenda}
|
||||
output = {"agenda": agenda, "tokens": tokens}
|
||||
if day_summary:
|
||||
output["day_summary"] = day_summary
|
||||
print(json.dumps(output, indent=2))
|
||||
@@ -531,6 +582,15 @@ def main() -> int:
|
||||
print_agenda(agenda)
|
||||
if day_summary and activity:
|
||||
print_day_summary(day_summary, activity)
|
||||
# Show token rewards
|
||||
print("─" * 60)
|
||||
print("🪙 Token Rewards")
|
||||
print("─" * 60)
|
||||
print(f"Daily Run completed: +{tokens['daily_run']['delta']} tokens")
|
||||
if candidates:
|
||||
print(f"Golden path generated: +{tokens['golden_path']['delta']} tokens")
|
||||
print(f"Total: +{tokens['total_delta']} tokens")
|
||||
print(f"Config version: {tokens['config_version']}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
745
timmy_automations/daily_run/weekly_narrative.py
Normal file
745
timmy_automations/daily_run/weekly_narrative.py
Normal file
@@ -0,0 +1,745 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Weekly narrative summary generator — human-readable loop analysis.
|
||||
|
||||
Analyzes the past week's activity across the development loop to produce
|
||||
a narrative summary of:
|
||||
- What changed (themes, areas of focus)
|
||||
- How agents and Timmy contributed
|
||||
- Any shifts in tests, triage, or token economy
|
||||
|
||||
The output is designed to be skimmable — a quick read that gives context
|
||||
on the week's progress without drowning in metrics.
|
||||
|
||||
Run: python3 timmy_automations/daily_run/weekly_narrative.py [--json]
|
||||
Env: See timmy_automations/config/automations.json for configuration
|
||||
|
||||
Refs: #719
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from collections import Counter
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
# ── Configuration ─────────────────────────────────────────────────────────
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
|
||||
CONFIG_PATH = Path(__file__).parent.parent / "config" / "automations.json"
|
||||
|
||||
DEFAULT_CONFIG = {
|
||||
"gitea_api": "http://localhost:3000/api/v1",
|
||||
"repo_slug": "rockachopa/Timmy-time-dashboard",
|
||||
"token_file": "~/.hermes/gitea_token",
|
||||
"lookback_days": 7,
|
||||
"output_file": ".loop/weekly_narrative.json",
|
||||
"enabled": True,
|
||||
}
|
||||
|
||||
|
||||
# ── Data Loading ───────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def load_automation_config() -> dict:
|
||||
"""Load configuration for weekly_narrative from automations manifest."""
|
||||
config = DEFAULT_CONFIG.copy()
|
||||
if CONFIG_PATH.exists():
|
||||
try:
|
||||
manifest = json.loads(CONFIG_PATH.read_text())
|
||||
for auto in manifest.get("automations", []):
|
||||
if auto.get("id") == "weekly_narrative":
|
||||
config.update(auto.get("config", {}))
|
||||
config["enabled"] = auto.get("enabled", True)
|
||||
break
|
||||
except (json.JSONDecodeError, OSError) as exc:
|
||||
print(f"[weekly_narrative] Warning: Could not load config: {exc}", file=sys.stderr)
|
||||
|
||||
# Environment variable overrides
|
||||
if os.environ.get("TIMMY_GITEA_API"):
|
||||
config["gitea_api"] = os.environ.get("TIMMY_GITEA_API")
|
||||
if os.environ.get("TIMMY_REPO_SLUG"):
|
||||
config["repo_slug"] = os.environ.get("TIMMY_REPO_SLUG")
|
||||
if os.environ.get("TIMMY_GITEA_TOKEN"):
|
||||
config["token"] = os.environ.get("TIMMY_GITEA_TOKEN")
|
||||
if os.environ.get("TIMMY_WEEKLY_NARRATIVE_ENABLED"):
|
||||
config["enabled"] = os.environ.get("TIMMY_WEEKLY_NARRATIVE_ENABLED", "true").lower() == "true"
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def get_token(config: dict) -> str | None:
|
||||
"""Get Gitea token from environment or file."""
|
||||
if "token" in config:
|
||||
return config["token"]
|
||||
|
||||
token_file = Path(config["token_file"]).expanduser()
|
||||
if token_file.exists():
|
||||
return token_file.read_text().strip()
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def load_jsonl(path: Path) -> list[dict]:
|
||||
"""Load a JSONL file, skipping bad lines."""
|
||||
if not path.exists():
|
||||
return []
|
||||
entries = []
|
||||
for line in path.read_text().strip().splitlines():
|
||||
try:
|
||||
entries.append(json.loads(line))
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
return entries
|
||||
|
||||
|
||||
def parse_ts(ts_str: str) -> datetime | None:
|
||||
"""Parse an ISO timestamp, tolerating missing tz."""
|
||||
if not ts_str:
|
||||
return None
|
||||
try:
|
||||
dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=UTC)
|
||||
return dt
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
|
||||
# ── Gitea API Client ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
"""Simple Gitea API client with graceful degradation."""
|
||||
|
||||
def __init__(self, config: dict, token: str | None):
|
||||
self.api_base = config["gitea_api"].rstrip("/")
|
||||
self.repo_slug = config["repo_slug"]
|
||||
self.token = token
|
||||
self._available: bool | None = None
|
||||
|
||||
def _headers(self) -> dict:
|
||||
headers = {"Accept": "application/json"}
|
||||
if self.token:
|
||||
headers["Authorization"] = f"token {self.token}"
|
||||
return headers
|
||||
|
||||
def _api_url(self, path: str) -> str:
|
||||
return f"{self.api_base}/repos/{self.repo_slug}/{path}"
|
||||
|
||||
def is_available(self) -> bool:
|
||||
"""Check if Gitea API is reachable."""
|
||||
if self._available is not None:
|
||||
return self._available
|
||||
|
||||
try:
|
||||
req = Request(
|
||||
f"{self.api_base}/version",
|
||||
headers=self._headers(),
|
||||
method="GET",
|
||||
)
|
||||
with urlopen(req, timeout=5) as resp:
|
||||
self._available = resp.status == 200
|
||||
return self._available
|
||||
except (HTTPError, URLError, TimeoutError):
|
||||
self._available = False
|
||||
return False
|
||||
|
||||
def get_paginated(self, path: str, params: dict | None = None) -> list:
|
||||
"""Fetch all pages of a paginated endpoint."""
|
||||
all_items = []
|
||||
page = 1
|
||||
limit = 50
|
||||
|
||||
while True:
|
||||
url = self._api_url(path)
|
||||
query_parts = [f"limit={limit}", f"page={page}"]
|
||||
if params:
|
||||
for key, val in params.items():
|
||||
query_parts.append(f"{key}={val}")
|
||||
url = f"{url}?{'&'.join(query_parts)}"
|
||||
|
||||
req = Request(url, headers=self._headers(), method="GET")
|
||||
with urlopen(req, timeout=15) as resp:
|
||||
batch = json.loads(resp.read())
|
||||
|
||||
if not batch:
|
||||
break
|
||||
|
||||
all_items.extend(batch)
|
||||
if len(batch) < limit:
|
||||
break
|
||||
page += 1
|
||||
|
||||
return all_items
|
||||
|
||||
|
||||
# ── Data Collection ────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def collect_cycles_data(since: datetime) -> dict:
|
||||
"""Load cycle retrospective data from the lookback period."""
|
||||
cycles_file = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
|
||||
if not cycles_file.exists():
|
||||
return {"cycles": [], "total": 0, "successes": 0, "failures": 0}
|
||||
|
||||
entries = load_jsonl(cycles_file)
|
||||
recent = []
|
||||
for e in entries:
|
||||
ts = parse_ts(e.get("timestamp", ""))
|
||||
if ts and ts >= since:
|
||||
recent.append(e)
|
||||
|
||||
successes = [e for e in recent if e.get("success")]
|
||||
failures = [e for e in recent if not e.get("success")]
|
||||
|
||||
return {
|
||||
"cycles": recent,
|
||||
"total": len(recent),
|
||||
"successes": len(successes),
|
||||
"failures": len(failures),
|
||||
"success_rate": round(len(successes) / len(recent), 2) if recent else 0,
|
||||
}
|
||||
|
||||
|
||||
def collect_issues_data(client: GiteaClient, since: datetime) -> dict:
|
||||
"""Collect issue activity from Gitea."""
|
||||
if not client.is_available():
|
||||
return {"error": "Gitea unavailable", "issues": [], "closed": [], "opened": []}
|
||||
|
||||
try:
|
||||
issues = client.get_paginated("issues", {"state": "all", "sort": "updated", "limit": 100})
|
||||
except (HTTPError, URLError) as exc:
|
||||
return {"error": str(exc), "issues": [], "closed": [], "opened": []}
|
||||
|
||||
touched = []
|
||||
closed = []
|
||||
opened = []
|
||||
|
||||
for issue in issues:
|
||||
updated_at = issue.get("updated_at", "")
|
||||
created_at = issue.get("created_at", "")
|
||||
|
||||
updated = parse_ts(updated_at)
|
||||
created = parse_ts(created_at)
|
||||
|
||||
if updated and updated >= since:
|
||||
touched.append(issue)
|
||||
|
||||
if issue.get("state") == "closed":
|
||||
closed_at = issue.get("closed_at", "")
|
||||
closed_dt = parse_ts(closed_at)
|
||||
if closed_dt and closed_dt >= since:
|
||||
closed.append(issue)
|
||||
elif created and created >= since:
|
||||
opened.append(issue)
|
||||
|
||||
return {
|
||||
"issues": touched,
|
||||
"closed": closed,
|
||||
"opened": opened,
|
||||
"touched_count": len(touched),
|
||||
"closed_count": len(closed),
|
||||
"opened_count": len(opened),
|
||||
}
|
||||
|
||||
|
||||
def collect_prs_data(client: GiteaClient, since: datetime) -> dict:
|
||||
"""Collect PR activity from Gitea."""
|
||||
if not client.is_available():
|
||||
return {"error": "Gitea unavailable", "prs": [], "merged": [], "opened": []}
|
||||
|
||||
try:
|
||||
prs = client.get_paginated("pulls", {"state": "all", "sort": "updated", "limit": 100})
|
||||
except (HTTPError, URLError) as exc:
|
||||
return {"error": str(exc), "prs": [], "merged": [], "opened": []}
|
||||
|
||||
touched = []
|
||||
merged = []
|
||||
opened = []
|
||||
|
||||
for pr in prs:
|
||||
updated_at = pr.get("updated_at", "")
|
||||
created_at = pr.get("created_at", "")
|
||||
merged_at = pr.get("merged_at", "")
|
||||
|
||||
updated = parse_ts(updated_at)
|
||||
created = parse_ts(created_at)
|
||||
merged_dt = parse_ts(merged_at) if merged_at else None
|
||||
|
||||
if updated and updated >= since:
|
||||
touched.append(pr)
|
||||
|
||||
if pr.get("merged") and merged_dt and merged_dt >= since:
|
||||
merged.append(pr)
|
||||
elif created and created >= since:
|
||||
opened.append(pr)
|
||||
|
||||
return {
|
||||
"prs": touched,
|
||||
"merged": merged,
|
||||
"opened": opened,
|
||||
"touched_count": len(touched),
|
||||
"merged_count": len(merged),
|
||||
"opened_count": len(opened),
|
||||
}
|
||||
|
||||
|
||||
def collect_triage_data(since: datetime) -> dict:
|
||||
"""Load triage and introspection data."""
|
||||
triage_file = REPO_ROOT / ".loop" / "retro" / "triage.jsonl"
|
||||
insights_file = REPO_ROOT / ".loop" / "retro" / "insights.json"
|
||||
|
||||
triage_entries = load_jsonl(triage_file)
|
||||
recent_triage = [
|
||||
e for e in triage_entries
|
||||
if parse_ts(e.get("timestamp", "")) and parse_ts(e.get("timestamp", "")) >= since
|
||||
]
|
||||
|
||||
insights = {}
|
||||
if insights_file.exists():
|
||||
try:
|
||||
insights = json.loads(insights_file.read_text())
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
|
||||
return {
|
||||
"triage_runs": len(recent_triage),
|
||||
"triage_entries": recent_triage,
|
||||
"latest_insights": insights,
|
||||
}
|
||||
|
||||
|
||||
def collect_token_data(since: datetime) -> dict:
|
||||
"""Load token economy data from the lightning ledger."""
|
||||
# The ledger is in-memory but we can look for any persisted data
|
||||
# For now, return placeholder that will be filled by the ledger module
|
||||
return {
|
||||
"note": "Token economy data is ephemeral — check dashboard for live metrics",
|
||||
"balance_sats": 0, # Placeholder
|
||||
"transactions_week": 0,
|
||||
}
|
||||
|
||||
|
||||
# ── Analysis Functions ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def extract_themes(issues: list[dict]) -> list[dict]:
|
||||
"""Extract themes from issue labels."""
|
||||
label_counts = Counter()
|
||||
layer_counts = Counter()
|
||||
type_counts = Counter()
|
||||
|
||||
for issue in issues:
|
||||
for label in issue.get("labels", []):
|
||||
name = label.get("name", "")
|
||||
label_counts[name] += 1
|
||||
|
||||
if name.startswith("layer:"):
|
||||
layer_counts[name.replace("layer:", "")] += 1
|
||||
if name in ("bug", "feature", "refactor", "docs", "test", "chore"):
|
||||
type_counts[name] += 1
|
||||
|
||||
# Top themes (labels excluding layer prefixes)
|
||||
themes = [
|
||||
{"name": name, "count": count}
|
||||
for name, count in label_counts.most_common(10)
|
||||
if not name.startswith(("layer:", "size:"))
|
||||
]
|
||||
|
||||
# Layers
|
||||
layers = [
|
||||
{"name": name, "count": count}
|
||||
for name, count in layer_counts.most_common()
|
||||
]
|
||||
|
||||
# Types
|
||||
types = [
|
||||
{"name": name, "count": count}
|
||||
for name, count in type_counts.most_common()
|
||||
]
|
||||
|
||||
return {
|
||||
"top_labels": themes,
|
||||
"layers": layers,
|
||||
"types": types,
|
||||
}
|
||||
|
||||
|
||||
def extract_agent_contributions(issues: list[dict], prs: list[dict], cycles: list[dict]) -> dict:
|
||||
"""Extract agent contribution patterns."""
|
||||
# Count by assignee
|
||||
assignee_counts = Counter()
|
||||
for issue in issues:
|
||||
assignee = issue.get("assignee")
|
||||
if assignee and isinstance(assignee, dict):
|
||||
assignee_counts[assignee.get("login", "unknown")] += 1
|
||||
|
||||
# Count PR authors
|
||||
pr_authors = Counter()
|
||||
for pr in prs:
|
||||
user = pr.get("user")
|
||||
if user and isinstance(user, dict):
|
||||
pr_authors[user.get("login", "unknown")] += 1
|
||||
|
||||
# Check for Kimi mentions in cycle notes
|
||||
kimi_mentions = sum(
|
||||
1 for c in cycles
|
||||
if "kimi" in c.get("notes", "").lower() or "kimi" in c.get("reason", "").lower()
|
||||
)
|
||||
|
||||
return {
|
||||
"active_assignees": [
|
||||
{"login": login, "issues_count": count}
|
||||
for login, count in assignee_counts.most_common()
|
||||
],
|
||||
"pr_authors": [
|
||||
{"login": login, "prs_count": count}
|
||||
for login, count in pr_authors.most_common()
|
||||
],
|
||||
"kimi_mentioned_cycles": kimi_mentions,
|
||||
}
|
||||
|
||||
|
||||
def analyze_test_shifts(cycles: list[dict]) -> dict:
|
||||
"""Analyze shifts in test patterns."""
|
||||
if not cycles:
|
||||
return {"note": "No cycle data available"}
|
||||
|
||||
total_tests_passed = sum(c.get("tests_passed", 0) for c in cycles)
|
||||
total_tests_added = sum(c.get("tests_added", 0) for c in cycles)
|
||||
avg_tests_per_cycle = round(total_tests_passed / len(cycles), 1) if cycles else 0
|
||||
|
||||
# Look for test-related issues
|
||||
test_focused = [
|
||||
c for c in cycles
|
||||
if c.get("type") == "test" or "test" in c.get("notes", "").lower()
|
||||
]
|
||||
|
||||
return {
|
||||
"total_tests_passed": total_tests_passed,
|
||||
"total_tests_added": total_tests_added,
|
||||
"avg_tests_per_cycle": avg_tests_per_cycle,
|
||||
"test_focused_cycles": len(test_focused),
|
||||
}
|
||||
|
||||
|
||||
def analyze_triage_shifts(triage_data: dict) -> dict:
|
||||
"""Analyze shifts in triage patterns."""
|
||||
insights = triage_data.get("latest_insights", {})
|
||||
recommendations = insights.get("recommendations", [])
|
||||
|
||||
high_priority_recs = [
|
||||
r for r in recommendations
|
||||
if r.get("severity") == "high"
|
||||
]
|
||||
|
||||
return {
|
||||
"triage_runs": triage_data.get("triage_runs", 0),
|
||||
"insights_generated": insights.get("generated_at") is not None,
|
||||
"high_priority_recommendations": len(high_priority_recs),
|
||||
"recent_recommendations": recommendations[:3] if recommendations else [],
|
||||
}
|
||||
|
||||
|
||||
def generate_vibe_summary(
|
||||
cycles_data: dict,
|
||||
issues_data: dict,
|
||||
prs_data: dict,
|
||||
themes: dict,
|
||||
agent_contrib: dict,
|
||||
test_shifts: dict,
|
||||
triage_shifts: dict,
|
||||
) -> dict:
|
||||
"""Generate the human-readable 'vibe' summary."""
|
||||
# Determine overall vibe
|
||||
success_rate = cycles_data.get("success_rate", 0)
|
||||
failures = cycles_data.get("failures", 0)
|
||||
closed_count = issues_data.get("closed_count", 0)
|
||||
merged_count = prs_data.get("merged_count", 0)
|
||||
|
||||
if success_rate >= 0.9 and closed_count > 0:
|
||||
vibe = "productive"
|
||||
vibe_description = "A strong week with solid delivery and healthy success rates."
|
||||
elif success_rate >= 0.7:
|
||||
vibe = "steady"
|
||||
vibe_description = "Steady progress with some bumps. Things are moving forward."
|
||||
elif failures > cycles_data.get("successes", 0):
|
||||
vibe = "struggling"
|
||||
vibe_description = "A challenging week with more failures than successes. Time to regroup."
|
||||
else:
|
||||
vibe = "quiet"
|
||||
vibe_description = "A lighter week with limited activity."
|
||||
|
||||
# Focus areas from themes
|
||||
focus_areas = []
|
||||
for layer in themes.get("layers", [])[:3]:
|
||||
focus_areas.append(f"{layer['name']} ({layer['count']} items)")
|
||||
|
||||
# Agent activity summary
|
||||
agent_summary = ""
|
||||
active_assignees = agent_contrib.get("active_assignees", [])
|
||||
if active_assignees:
|
||||
top_agent = active_assignees[0]
|
||||
agent_summary = f"{top_agent['login']} led with {top_agent['issues_count']} assigned issues."
|
||||
|
||||
# Notable events
|
||||
notable = []
|
||||
if merged_count > 5:
|
||||
notable.append(f"{merged_count} PRs merged — high integration velocity")
|
||||
if triage_shifts.get("high_priority_recommendations", 0) > 0:
|
||||
notable.append("High-priority recommendations from loop introspection")
|
||||
if test_shifts.get("test_focused_cycles", 0) > 3:
|
||||
notable.append("Strong test coverage focus")
|
||||
if not notable:
|
||||
notable.append("Regular development flow")
|
||||
|
||||
return {
|
||||
"overall": vibe,
|
||||
"description": vibe_description,
|
||||
"focus_areas": focus_areas,
|
||||
"agent_summary": agent_summary,
|
||||
"notable_events": notable,
|
||||
}
|
||||
|
||||
|
||||
# ── Narrative Generation ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
def generate_narrative(
|
||||
cycles_data: dict,
|
||||
issues_data: dict,
|
||||
prs_data: dict,
|
||||
triage_data: dict,
|
||||
themes: dict,
|
||||
agent_contrib: dict,
|
||||
test_shifts: dict,
|
||||
triage_shifts: dict,
|
||||
token_data: dict,
|
||||
since: datetime,
|
||||
until: datetime,
|
||||
) -> dict:
|
||||
"""Generate the complete weekly narrative."""
|
||||
vibe = generate_vibe_summary(
|
||||
cycles_data, issues_data, prs_data, themes, agent_contrib, test_shifts, triage_shifts
|
||||
)
|
||||
|
||||
return {
|
||||
"generated_at": datetime.now(UTC).isoformat(),
|
||||
"period": {
|
||||
"start": since.isoformat(),
|
||||
"end": until.isoformat(),
|
||||
"days": 7,
|
||||
},
|
||||
"vibe": vibe,
|
||||
"activity": {
|
||||
"cycles": {
|
||||
"total": cycles_data.get("total", 0),
|
||||
"successes": cycles_data.get("successes", 0),
|
||||
"failures": cycles_data.get("failures", 0),
|
||||
"success_rate": cycles_data.get("success_rate", 0),
|
||||
},
|
||||
"issues": {
|
||||
"touched": issues_data.get("touched_count", 0),
|
||||
"closed": issues_data.get("closed_count", 0),
|
||||
"opened": issues_data.get("opened_count", 0),
|
||||
},
|
||||
"pull_requests": {
|
||||
"touched": prs_data.get("touched_count", 0),
|
||||
"merged": prs_data.get("merged_count", 0),
|
||||
"opened": prs_data.get("opened_count", 0),
|
||||
},
|
||||
},
|
||||
"themes": themes,
|
||||
"agents": agent_contrib,
|
||||
"test_health": test_shifts,
|
||||
"triage_health": triage_shifts,
|
||||
"token_economy": token_data,
|
||||
}
|
||||
|
||||
|
||||
def generate_markdown_summary(narrative: dict) -> str:
|
||||
"""Generate a human-readable markdown summary."""
|
||||
vibe = narrative.get("vibe", {})
|
||||
activity = narrative.get("activity", {})
|
||||
cycles = activity.get("cycles", {})
|
||||
issues = activity.get("issues", {})
|
||||
prs = activity.get("pull_requests", {})
|
||||
|
||||
lines = [
|
||||
"# Weekly Narrative Summary",
|
||||
"",
|
||||
f"**Period:** {narrative['period']['start'][:10]} to {narrative['period']['end'][:10]}",
|
||||
f"**Vibe:** {vibe.get('overall', 'unknown').title()}",
|
||||
"",
|
||||
f"{vibe.get('description', '')}",
|
||||
"",
|
||||
"## Activity Highlights",
|
||||
"",
|
||||
f"- **Development Cycles:** {cycles.get('total', 0)} total ({cycles.get('successes', 0)} success, {cycles.get('failures', 0)} failure)",
|
||||
f"- **Issues:** {issues.get('closed', 0)} closed, {issues.get('opened', 0)} opened",
|
||||
f"- **Pull Requests:** {prs.get('merged', 0)} merged, {prs.get('opened', 0)} opened",
|
||||
"",
|
||||
]
|
||||
|
||||
# Focus areas
|
||||
focus = vibe.get("focus_areas", [])
|
||||
if focus:
|
||||
lines.append("## Focus Areas")
|
||||
lines.append("")
|
||||
for area in focus:
|
||||
lines.append(f"- {area}")
|
||||
lines.append("")
|
||||
|
||||
# Agent contributions
|
||||
agent_summary = vibe.get("agent_summary", "")
|
||||
if agent_summary:
|
||||
lines.append("## Agent Activity")
|
||||
lines.append("")
|
||||
lines.append(agent_summary)
|
||||
lines.append("")
|
||||
|
||||
# Notable events
|
||||
notable = vibe.get("notable_events", [])
|
||||
if notable:
|
||||
lines.append("## Notable Events")
|
||||
lines.append("")
|
||||
for event in notable:
|
||||
lines.append(f"- {event}")
|
||||
lines.append("")
|
||||
|
||||
# Triage health
|
||||
triage = narrative.get("triage_health", {})
|
||||
if triage.get("high_priority_recommendations", 0) > 0:
|
||||
lines.append("## Triage Notes")
|
||||
lines.append("")
|
||||
lines.append(f"⚠️ {triage['high_priority_recommendations']} high-priority recommendation(s) from loop introspection.")
|
||||
lines.append("")
|
||||
for rec in triage.get("recent_recommendations", [])[:2]:
|
||||
lines.append(f"- **{rec.get('category', 'general')}:** {rec.get('finding', '')}")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ── Main ───────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(
|
||||
description="Generate weekly narrative summary of work and vibes",
|
||||
)
|
||||
p.add_argument(
|
||||
"--json", "-j",
|
||||
action="store_true",
|
||||
help="Output as JSON instead of markdown",
|
||||
)
|
||||
p.add_argument(
|
||||
"--output", "-o",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Output file path (default from config)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--days",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Override lookback days (default 7)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--force",
|
||||
action="store_true",
|
||||
help="Run even if disabled in config",
|
||||
)
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
config = load_automation_config()
|
||||
|
||||
# Check if enabled
|
||||
if not config.get("enabled", True) and not args.force:
|
||||
print("[weekly_narrative] Skipped — weekly narrative is disabled in config")
|
||||
print("[weekly_narrative] Use --force to run anyway")
|
||||
return 0
|
||||
|
||||
# Determine lookback period
|
||||
days = args.days if args.days is not None else config.get("lookback_days", 7)
|
||||
until = datetime.now(UTC)
|
||||
since = until - timedelta(days=days)
|
||||
|
||||
print(f"[weekly_narrative] Generating narrative for the past {days} days...")
|
||||
|
||||
# Setup Gitea client
|
||||
token = get_token(config)
|
||||
client = GiteaClient(config, token)
|
||||
|
||||
if not client.is_available():
|
||||
print("[weekly_narrative] Warning: Gitea API unavailable — will use local data only")
|
||||
|
||||
# Collect data
|
||||
cycles_data = collect_cycles_data(since)
|
||||
issues_data = collect_issues_data(client, since)
|
||||
prs_data = collect_prs_data(client, since)
|
||||
triage_data = collect_triage_data(since)
|
||||
token_data = collect_token_data(since)
|
||||
|
||||
# Analyze
|
||||
themes = extract_themes(issues_data.get("issues", []))
|
||||
agent_contrib = extract_agent_contributions(
|
||||
issues_data.get("issues", []),
|
||||
prs_data.get("prs", []),
|
||||
cycles_data.get("cycles", []),
|
||||
)
|
||||
test_shifts = analyze_test_shifts(cycles_data.get("cycles", []))
|
||||
triage_shifts = analyze_triage_shifts(triage_data)
|
||||
|
||||
# Generate narrative
|
||||
narrative = generate_narrative(
|
||||
cycles_data,
|
||||
issues_data,
|
||||
prs_data,
|
||||
triage_data,
|
||||
themes,
|
||||
agent_contrib,
|
||||
test_shifts,
|
||||
triage_shifts,
|
||||
token_data,
|
||||
since,
|
||||
until,
|
||||
)
|
||||
|
||||
# Determine output path
|
||||
output_path = args.output or config.get("output_file", ".loop/weekly_narrative.json")
|
||||
output_file = REPO_ROOT / output_path
|
||||
output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Write JSON output
|
||||
output_file.write_text(json.dumps(narrative, indent=2) + "\n")
|
||||
|
||||
# Write markdown summary alongside JSON
|
||||
md_output_file = output_file.with_suffix(".md")
|
||||
md_output_file.write_text(generate_markdown_summary(narrative))
|
||||
|
||||
# Print output
|
||||
if args.json:
|
||||
print(json.dumps(narrative, indent=2))
|
||||
else:
|
||||
print()
|
||||
print(generate_markdown_summary(narrative))
|
||||
|
||||
print(f"\n[weekly_narrative] Written to: {output_file}")
|
||||
print(f"[weekly_narrative] Markdown summary: {md_output_file}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
6
timmy_automations/utils/__init__.py
Normal file
6
timmy_automations/utils/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""Timmy Automations utilities.
|
||||
|
||||
Shared helper modules for automations.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
389
timmy_automations/utils/token_rules.py
Normal file
389
timmy_automations/utils/token_rules.py
Normal file
@@ -0,0 +1,389 @@
|
||||
"""Token rules helper — Compute token deltas for agent actions.
|
||||
|
||||
This module loads token economy configuration from YAML and provides
|
||||
functions for automations to compute token rewards/penalties.
|
||||
|
||||
Usage:
|
||||
from timmy_automations.utils.token_rules import TokenRules
|
||||
|
||||
rules = TokenRules()
|
||||
delta = rules.get_delta("pr_merged")
|
||||
print(f"PR merge reward: {delta}") # 10
|
||||
|
||||
# Check if agent can perform sensitive operation
|
||||
can_merge = rules.check_gate("pr_merge", current_tokens=25)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass
|
||||
class TokenEvent:
|
||||
"""Represents a single token event configuration."""
|
||||
|
||||
name: str
|
||||
description: str
|
||||
reward: int
|
||||
penalty: int
|
||||
category: str
|
||||
gate_threshold: int | None = None
|
||||
|
||||
@property
|
||||
def delta(self) -> int:
|
||||
"""Net token delta (reward + penalty)."""
|
||||
return self.reward + self.penalty
|
||||
|
||||
|
||||
@dataclass
|
||||
class TokenCategoryLimits:
|
||||
"""Daily limits for a token category."""
|
||||
|
||||
max_earn: int
|
||||
max_spend: int
|
||||
|
||||
|
||||
class TokenRules:
|
||||
"""Token economy rules loader and calculator.
|
||||
|
||||
Loads configuration from timmy_automations/config/token_rules.yaml
|
||||
and provides methods to compute token deltas and check gating.
|
||||
"""
|
||||
|
||||
CONFIG_PATH = Path(__file__).parent.parent / "config" / "token_rules.yaml"
|
||||
|
||||
def __init__(self, config_path: Path | None = None) -> None:
|
||||
"""Initialize token rules from configuration file.
|
||||
|
||||
Args:
|
||||
config_path: Optional override for config file location.
|
||||
"""
|
||||
self._config_path = config_path or self.CONFIG_PATH
|
||||
self._events: dict[str, TokenEvent] = {}
|
||||
self._gating: dict[str, int] = {}
|
||||
self._daily_limits: dict[str, TokenCategoryLimits] = {}
|
||||
self._audit: dict[str, Any] = {}
|
||||
self._version: str = "unknown"
|
||||
self._load_config()
|
||||
|
||||
def _load_config(self) -> None:
|
||||
"""Load configuration from YAML file."""
|
||||
# Graceful degradation if yaml not available or file missing
|
||||
try:
|
||||
import yaml
|
||||
except ImportError:
|
||||
# YAML not installed, use fallback defaults
|
||||
self._load_fallback_defaults()
|
||||
return
|
||||
|
||||
if not self._config_path.exists():
|
||||
self._load_fallback_defaults()
|
||||
return
|
||||
|
||||
try:
|
||||
config = yaml.safe_load(self._config_path.read_text())
|
||||
if not config:
|
||||
self._load_fallback_defaults()
|
||||
return
|
||||
|
||||
self._version = config.get("version", "unknown")
|
||||
self._parse_events(config.get("events", {}))
|
||||
self._parse_gating(config.get("gating_thresholds", {}))
|
||||
self._parse_daily_limits(config.get("daily_limits", {}))
|
||||
self._audit = config.get("audit", {})
|
||||
|
||||
except Exception:
|
||||
# Any error loading config, use fallbacks
|
||||
self._load_fallback_defaults()
|
||||
|
||||
def _load_fallback_defaults(self) -> None:
|
||||
"""Load minimal fallback defaults if config unavailable."""
|
||||
self._version = "fallback"
|
||||
self._events = {
|
||||
"pr_merged": TokenEvent(
|
||||
name="pr_merged",
|
||||
description="Successfully merged a pull request",
|
||||
reward=10,
|
||||
penalty=0,
|
||||
category="merge",
|
||||
gate_threshold=0,
|
||||
),
|
||||
"test_fixed": TokenEvent(
|
||||
name="test_fixed",
|
||||
description="Fixed a failing test",
|
||||
reward=8,
|
||||
penalty=0,
|
||||
category="test",
|
||||
),
|
||||
"automation_failure": TokenEvent(
|
||||
name="automation_failure",
|
||||
description="Automation failed",
|
||||
reward=0,
|
||||
penalty=-2,
|
||||
category="operation",
|
||||
),
|
||||
}
|
||||
self._gating = {"pr_merge": 0}
|
||||
self._daily_limits = {}
|
||||
self._audit = {"log_all_transactions": True}
|
||||
|
||||
def _parse_events(self, events_config: dict) -> None:
|
||||
"""Parse event configurations from YAML."""
|
||||
for name, config in events_config.items():
|
||||
if not isinstance(config, dict):
|
||||
continue
|
||||
|
||||
self._events[name] = TokenEvent(
|
||||
name=name,
|
||||
description=config.get("description", ""),
|
||||
reward=config.get("reward", 0),
|
||||
penalty=config.get("penalty", 0),
|
||||
category=config.get("category", "unknown"),
|
||||
gate_threshold=config.get("gate_threshold"),
|
||||
)
|
||||
|
||||
def _parse_gating(self, gating_config: dict) -> None:
|
||||
"""Parse gating thresholds from YAML."""
|
||||
for name, threshold in gating_config.items():
|
||||
if isinstance(threshold, int):
|
||||
self._gating[name] = threshold
|
||||
|
||||
def _parse_daily_limits(self, limits_config: dict) -> None:
|
||||
"""Parse daily limits from YAML."""
|
||||
for category, limits in limits_config.items():
|
||||
if isinstance(limits, dict):
|
||||
self._daily_limits[category] = TokenCategoryLimits(
|
||||
max_earn=limits.get("max_earn", 0),
|
||||
max_spend=limits.get("max_spend", 0),
|
||||
)
|
||||
|
||||
def get_delta(self, event_name: str) -> int:
|
||||
"""Get token delta for an event.
|
||||
|
||||
Args:
|
||||
event_name: Name of the event (e.g., "pr_merged", "test_fixed")
|
||||
|
||||
Returns:
|
||||
Net token delta (positive for reward, negative for penalty)
|
||||
"""
|
||||
event = self._events.get(event_name)
|
||||
if event:
|
||||
return event.delta
|
||||
return 0
|
||||
|
||||
def get_event(self, event_name: str) -> TokenEvent | None:
|
||||
"""Get full event configuration.
|
||||
|
||||
Args:
|
||||
event_name: Name of the event
|
||||
|
||||
Returns:
|
||||
TokenEvent object or None if not found
|
||||
"""
|
||||
return self._events.get(event_name)
|
||||
|
||||
def list_events(self, category: str | None = None) -> list[TokenEvent]:
|
||||
"""List all configured events.
|
||||
|
||||
Args:
|
||||
category: Optional category filter
|
||||
|
||||
Returns:
|
||||
List of TokenEvent objects
|
||||
"""
|
||||
events = list(self._events.values())
|
||||
if category:
|
||||
events = [e for e in events if e.category == category]
|
||||
return events
|
||||
|
||||
def check_gate(self, operation: str, current_tokens: int) -> bool:
|
||||
"""Check if agent meets token threshold for an operation.
|
||||
|
||||
Args:
|
||||
operation: Operation name (e.g., "pr_merge")
|
||||
current_tokens: Agent's current token balance
|
||||
|
||||
Returns:
|
||||
True if agent can perform the operation
|
||||
"""
|
||||
threshold = self._gating.get(operation)
|
||||
if threshold is None:
|
||||
return True # No gate defined, allow
|
||||
return current_tokens >= threshold
|
||||
|
||||
def get_gate_threshold(self, operation: str) -> int | None:
|
||||
"""Get the gating threshold for an operation.
|
||||
|
||||
Args:
|
||||
operation: Operation name
|
||||
|
||||
Returns:
|
||||
Threshold value or None if no gate defined
|
||||
"""
|
||||
return self._gating.get(operation)
|
||||
|
||||
def get_daily_limits(self, category: str) -> TokenCategoryLimits | None:
|
||||
"""Get daily limits for a category.
|
||||
|
||||
Args:
|
||||
category: Category name
|
||||
|
||||
Returns:
|
||||
TokenCategoryLimits or None if not defined
|
||||
"""
|
||||
return self._daily_limits.get(category)
|
||||
|
||||
def compute_transaction(
|
||||
self,
|
||||
event_name: str,
|
||||
current_tokens: int = 0,
|
||||
current_daily_earned: dict[str, int] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Compute a complete token transaction.
|
||||
|
||||
This is the main entry point for agents to use. It returns
|
||||
a complete transaction record with delta, gating check, and limits.
|
||||
|
||||
Args:
|
||||
event_name: Name of the event
|
||||
current_tokens: Agent's current token balance
|
||||
current_daily_earned: Dict of category -> tokens earned today
|
||||
|
||||
Returns:
|
||||
Transaction dict with:
|
||||
- event: Event name
|
||||
- delta: Token delta
|
||||
- allowed: Whether operation is allowed (gating)
|
||||
- new_balance: Projected new balance
|
||||
- limit_reached: Whether daily limit would be exceeded
|
||||
"""
|
||||
event = self._events.get(event_name)
|
||||
if not event:
|
||||
return {
|
||||
"event": event_name,
|
||||
"delta": 0,
|
||||
"allowed": False,
|
||||
"reason": "unknown_event",
|
||||
"new_balance": current_tokens,
|
||||
"limit_reached": False,
|
||||
}
|
||||
|
||||
delta = event.delta
|
||||
new_balance = current_tokens + delta
|
||||
|
||||
# Check gating (for penalties, we don't check gates)
|
||||
allowed = True
|
||||
gate_reason = None
|
||||
if delta > 0 and event.gate_threshold is not None: # Only check gates for positive operations with thresholds
|
||||
allowed = current_tokens >= event.gate_threshold
|
||||
if not allowed:
|
||||
gate_reason = f"requires {event.gate_threshold} tokens"
|
||||
|
||||
# Check daily limits
|
||||
limit_reached = False
|
||||
limit_reason = None
|
||||
if current_daily_earned and event.category in current_daily_earned:
|
||||
limits = self._daily_limits.get(event.category)
|
||||
if limits:
|
||||
current_earned = current_daily_earned.get(event.category, 0)
|
||||
if delta > 0 and current_earned + delta > limits.max_earn:
|
||||
limit_reached = True
|
||||
limit_reason = f"daily earn limit ({limits.max_earn}) reached"
|
||||
|
||||
result = {
|
||||
"event": event_name,
|
||||
"delta": delta,
|
||||
"category": event.category,
|
||||
"allowed": allowed and not limit_reached,
|
||||
"new_balance": new_balance,
|
||||
"limit_reached": limit_reached,
|
||||
}
|
||||
|
||||
if gate_reason:
|
||||
result["gate_reason"] = gate_reason
|
||||
if limit_reason:
|
||||
result["limit_reason"] = limit_reason
|
||||
|
||||
return result
|
||||
|
||||
def get_config_version(self) -> str:
|
||||
"""Get the loaded configuration version."""
|
||||
return self._version
|
||||
|
||||
def get_categories(self) -> list[str]:
|
||||
"""Get list of all configured categories."""
|
||||
categories = {e.category for e in self._events.values()}
|
||||
return sorted(categories)
|
||||
|
||||
def is_auditable(self) -> bool:
|
||||
"""Check if transactions should be logged for audit."""
|
||||
return self._audit.get("log_all_transactions", True)
|
||||
|
||||
|
||||
# Convenience functions for simple use cases
|
||||
|
||||
def get_token_delta(event_name: str) -> int:
|
||||
"""Get token delta for an event (convenience function).
|
||||
|
||||
Args:
|
||||
event_name: Name of the event
|
||||
|
||||
Returns:
|
||||
Token delta (positive for reward, negative for penalty)
|
||||
"""
|
||||
return TokenRules().get_delta(event_name)
|
||||
|
||||
|
||||
def check_operation_gate(operation: str, current_tokens: int) -> bool:
|
||||
"""Check if agent can perform operation (convenience function).
|
||||
|
||||
Args:
|
||||
operation: Operation name
|
||||
current_tokens: Agent's current token balance
|
||||
|
||||
Returns:
|
||||
True if operation is allowed
|
||||
"""
|
||||
return TokenRules().check_gate(operation, current_tokens)
|
||||
|
||||
|
||||
def compute_token_reward(
|
||||
event_name: str,
|
||||
current_tokens: int = 0,
|
||||
) -> dict[str, Any]:
|
||||
"""Compute token reward for an event (convenience function).
|
||||
|
||||
Args:
|
||||
event_name: Name of the event
|
||||
current_tokens: Agent's current token balance
|
||||
|
||||
Returns:
|
||||
Transaction dict with delta, allowed status, new balance
|
||||
"""
|
||||
return TokenRules().compute_transaction(event_name, current_tokens)
|
||||
|
||||
|
||||
def list_token_events(category: str | None = None) -> list[dict[str, Any]]:
|
||||
"""List all token events (convenience function).
|
||||
|
||||
Args:
|
||||
category: Optional category filter
|
||||
|
||||
Returns:
|
||||
List of event dicts with name, description, delta, category
|
||||
"""
|
||||
rules = TokenRules()
|
||||
events = rules.list_events(category)
|
||||
return [
|
||||
{
|
||||
"name": e.name,
|
||||
"description": e.description,
|
||||
"delta": e.delta,
|
||||
"category": e.category,
|
||||
"gate_threshold": e.gate_threshold,
|
||||
}
|
||||
for e in events
|
||||
]
|
||||
Reference in New Issue
Block a user