From 919a011cae1c6441df581f20d1c48ddec2e76cf8 Mon Sep 17 00:00:00 2001 From: kimi Date: Sat, 21 Mar 2026 17:26:40 -0400 Subject: [PATCH] feat: adapt token rewards based on system stress signals (#714) Implements adaptive token rewards that respond to system stress: - StressDetector module (timmy/stress_detector.py): - Monitors 4 stress signals: flaky test rate, P1 backlog growth, CI failure rate, open bug count - Calculates weighted stress score (0-1) and determines mode: calm (<0.3), elevated (0.3-0.6), high (>0.6) - Applies quest-specific multipliers based on current mode - Configuration (config/stress_modes.yaml): - Thresholds for mode transitions - Signal weights and thresholds - Multipliers per mode (e.g., test_improve: 1.5x in high stress) - Quest system integration: - Rewards now include stress bonus/penalty in notification - Quest status API includes adjusted_reward and multiplier - Agent can see current stress mode and why rewards changed - API endpoints: - GET /quests/api/stress - current stress mode and signals - POST /quests/api/stress/refresh - force refresh stress detection Fixes #714 --- config/stress_modes.yaml | 98 +++++ src/dashboard/routes/quests.py | 70 ++++ src/timmy/quest_system.py | 57 ++- src/timmy/stress_detector.py | 565 +++++++++++++++++++++++++++++ tests/unit/test_stress_detector.py | 294 +++++++++++++++ 5 files changed, 1081 insertions(+), 3 deletions(-) create mode 100644 config/stress_modes.yaml create mode 100644 src/timmy/stress_detector.py create mode 100644 tests/unit/test_stress_detector.py diff --git a/config/stress_modes.yaml b/config/stress_modes.yaml new file mode 100644 index 0000000..be702bc --- /dev/null +++ b/config/stress_modes.yaml @@ -0,0 +1,98 @@ +# ── System Stress Modes Configuration ──────────────────────────────────────── +# +# This configuration defines how token rewards adapt based on system stress. +# When the system detects elevated stress (flaky tests, growing backlog, +# CI failures), quest rewards are adjusted to incentivize agents to focus +# on the most critical areas. +# +# ── How It Works ───────────────────────────────────────────────────────────── +# +# 1. SIGNALS: System metrics are monitored continuously +# 2. SCORE: Weighted contributions from triggered signals create a stress score +# 3. MODE: Score determines the stress mode (calm, elevated, high) +# 4. MULTIPLIERS: Token rewards are multiplied based on the current mode +# +# ── Stress Thresholds ──────────────────────────────────────────────────────── + +thresholds: + # Minimum score to enter elevated mode (0.0 - 1.0) + elevated_min: 0.3 + + # Minimum score to enter high stress mode (0.0 - 1.0) + high_min: 0.6 + +# ── Stress Signals ─────────────────────────────────────────────────────────── +# +# Each signal has: +# - threshold: Value at which signal is considered "triggered" +# - weight: Contribution to overall stress score (should sum to ~1.0) + +signals: + flaky_test_rate: + threshold: 0.15 # 15% of tests showing flakiness + weight: 0.30 + description: "Percentage of test runs that are flaky" + + p1_backlog_growth: + threshold: 5 # 5 new P1 issues in lookback period + weight: 0.25 + description: "Net growth in P1 priority issues over 7 days" + + ci_failure_rate: + threshold: 0.20 # 20% of CI runs failing + weight: 0.25 + description: "Percentage of CI runs failing in lookback period" + + open_bug_count: + threshold: 20 # 20 open bugs + weight: 0.20 + description: "Total open issues labeled as 'bug'" + +# ── Token Multipliers ──────────────────────────────────────────────────────── +# +# Multipliers are applied to quest rewards based on current stress mode. +# Values > 1.0 increase rewards, < 1.0 decrease rewards. +# +# Quest types: +# - test_improve: Test coverage/quality improvements +# - docs_update: Documentation updates +# - issue_count: Closing specific issue types +# - issue_reduce: Reducing overall issue backlog +# - daily_run: Daily Run session completion +# - custom: Special/manual quests +# - exploration: Exploratory work +# - refactor: Code refactoring + +multipliers: + calm: + # Calm periods: incentivize maintenance and exploration + test_improve: 1.0 + docs_update: 1.2 + issue_count: 1.0 + issue_reduce: 1.0 + daily_run: 1.0 + custom: 1.0 + exploration: 1.3 + refactor: 1.2 + + elevated: + # Elevated stress: start emphasizing stability + test_improve: 1.2 + docs_update: 1.0 + issue_count: 1.1 + issue_reduce: 1.1 + daily_run: 1.0 + custom: 1.0 + exploration: 1.0 + refactor: 0.9 # Discourage risky changes + + high: + # High stress: crisis mode, focus on stabilization + test_improve: 1.5 # Strongly incentivize testing + docs_update: 0.8 # Deprioritize docs + issue_count: 1.3 # Reward closing issues + issue_reduce: 1.4 # Strongly reward reducing backlog + daily_run: 1.1 + custom: 1.0 + exploration: 0.7 # Discourage exploration + refactor: 0.6 # Discourage refactors during crisis diff --git a/src/dashboard/routes/quests.py b/src/dashboard/routes/quests.py index f344476..55b6069 100644 --- a/src/dashboard/routes/quests.py +++ b/src/dashboard/routes/quests.py @@ -187,6 +187,76 @@ async def reload_quest_config_api() -> JSONResponse: ) +# --------------------------------------------------------------------------- +# Stress Mode Endpoints +# --------------------------------------------------------------------------- + + +@router.get("/api/stress") +async def get_stress_status_api() -> JSONResponse: + """Get current stress mode status and multipliers. + + Returns: + Current stress mode, score, active signals, and multipliers + """ + try: + from timmy.stress_detector import ( + detect_stress_mode, + get_stress_summary, + ) + + snapshot = detect_stress_mode() + summary = get_stress_summary() + + return JSONResponse( + { + "status": "ok", + "stress": summary, + "raw": snapshot.to_dict(), + } + ) + except Exception as exc: + logger.warning("Failed to get stress status: %s", exc) + return JSONResponse( + { + "status": "error", + "error": str(exc), + }, + status_code=500, + ) + + +@router.post("/api/stress/refresh") +async def refresh_stress_detection_api() -> JSONResponse: + """Force a fresh stress detection check. + + Normally stress is cached for 60 seconds. This endpoint + bypasses the cache for immediate results. + """ + try: + from timmy.stress_detector import detect_stress_mode, get_stress_summary + + snapshot = detect_stress_mode(force_refresh=True) + summary = get_stress_summary() + + return JSONResponse( + { + "status": "ok", + "stress": summary, + "raw": snapshot.to_dict(), + } + ) + except Exception as exc: + logger.warning("Failed to refresh stress detection: %s", exc) + return JSONResponse( + { + "status": "error", + "error": str(exc), + }, + status_code=500, + ) + + # --------------------------------------------------------------------------- # Dashboard UI Endpoints # --------------------------------------------------------------------------- diff --git a/src/timmy/quest_system.py b/src/timmy/quest_system.py index ae7f6c0..a64f6ca 100644 --- a/src/timmy/quest_system.py +++ b/src/timmy/quest_system.py @@ -269,6 +269,22 @@ def _is_on_cooldown(progress: QuestProgress, quest: QuestDefinition) -> bool: return False +def _apply_stress_multiplier(base_reward: int, quest_type: QuestType) -> tuple[int, float]: + """Apply stress-based multiplier to quest reward. + + Returns: + Tuple of (adjusted_reward, multiplier_used) + """ + try: + from timmy.stress_detector import apply_multiplier + + multiplier = apply_multiplier(base_reward, quest_type.value) + return multiplier, multiplier / max(base_reward, 1) + except Exception as exc: + logger.debug("Failed to apply stress multiplier: %s", exc) + return base_reward, 1.0 + + def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None: """Claim the token reward for a completed quest. @@ -292,13 +308,18 @@ def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None: return None try: + # Apply stress-based multiplier + adjusted_reward, multiplier = _apply_stress_multiplier( + quest.reward_tokens, quest.quest_type + ) + # Award tokens via ledger from lightning.ledger import create_invoice_entry, mark_settled # Create a mock invoice for the reward invoice_entry = create_invoice_entry( payment_hash=f"quest_{quest_id}_{agent_id}_{int(time.time())}", - amount_sats=quest.reward_tokens, + amount_sats=adjusted_reward, memo=f"Quest reward: {quest.name}", source="quest_reward", agent_id=agent_id, @@ -320,12 +341,21 @@ def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None: progress.completed_at = "" progress.claimed_at = "" - notification = quest.notification_message.format(tokens=quest.reward_tokens) + # Build notification with multiplier info + notification = quest.notification_message.format(tokens=adjusted_reward) + if multiplier != 1.0: + pct = int((multiplier - 1.0) * 100) + if pct > 0: + notification += f" (+{pct}% stress bonus)" + else: + notification += f" ({pct}% stress adjustment)" return { "quest_id": quest_id, "agent_id": agent_id, - "tokens_awarded": quest.reward_tokens, + "tokens_awarded": adjusted_reward, + "base_reward": quest.reward_tokens, + "multiplier": round(multiplier, 2), "notification": notification, "completion_count": progress.completion_count, } @@ -467,6 +497,14 @@ def get_agent_quests_status(agent_id: str) -> dict[str, Any]: total_rewards = 0 completed_count = 0 + # Get current stress mode for adjusted rewards display + try: + from timmy.stress_detector import get_current_stress_mode, get_multiplier + + current_mode = get_current_stress_mode() + except Exception: + current_mode = None + for quest_id, quest in definitions.items(): progress = get_quest_progress(quest_id, agent_id) if not progress: @@ -474,11 +512,23 @@ def get_agent_quests_status(agent_id: str) -> dict[str, Any]: is_on_cooldown = _is_on_cooldown(progress, quest) if quest.repeatable else False + # Calculate adjusted reward with stress multiplier + adjusted_reward = quest.reward_tokens + multiplier = 1.0 + if current_mode: + try: + multiplier = get_multiplier(quest.quest_type.value, current_mode) + adjusted_reward = int(quest.reward_tokens * multiplier) + except Exception: + pass + quest_info = { "quest_id": quest_id, "name": quest.name, "description": quest.description, "reward_tokens": quest.reward_tokens, + "adjusted_reward": adjusted_reward, + "multiplier": round(multiplier, 2), "type": quest.quest_type.value, "enabled": quest.enabled, "repeatable": quest.repeatable, @@ -509,6 +559,7 @@ def get_agent_quests_status(agent_id: str) -> dict[str, Any]: "total_tokens_earned": total_rewards, "total_quests_completed": completed_count, "active_quests_count": len([q for q in quests_status if q["enabled"]]), + "stress_mode": current_mode.value if current_mode else None, } diff --git a/src/timmy/stress_detector.py b/src/timmy/stress_detector.py new file mode 100644 index 0000000..a2adb87 --- /dev/null +++ b/src/timmy/stress_detector.py @@ -0,0 +1,565 @@ +"""System stress detection for adaptive token rewards. + +Monitors system signals like flakiness, backlog growth, and CI failures +to determine the current stress mode. Token rewards are then adjusted +based on the stress mode to incentivize agents to focus on critical areas. +""" + +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass, field +from datetime import UTC, datetime, timedelta +from enum import StrEnum +from pathlib import Path +from typing import Any + +import yaml + +from config import settings + +logger = logging.getLogger(__name__) + +# Path to stress mode configuration +STRESS_CONFIG_PATH = Path(settings.repo_root) / "config" / "stress_modes.yaml" + + +class StressMode(StrEnum): + """System stress modes. + + - CALM: Normal operations, incentivize exploration and refactoring + - ELEVATED: Some stress signals detected, balance incentives + - HIGH: Critical stress, strongly incentivize bug fixes and stabilization + """ + + CALM = "calm" + ELEVATED = "elevated" + HIGH = "high" + + +@dataclass +class StressSignal: + """A single stress signal reading.""" + + name: str + value: float + threshold: float + weight: float + timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) + + @property + def is_triggered(self) -> bool: + """Whether this signal exceeds its threshold.""" + return self.value >= self.threshold + + @property + def contribution(self) -> float: + """Calculate this signal's contribution to stress score.""" + if not self.is_triggered: + return 0.0 + # Contribution is weighted ratio of value to threshold + return min(1.0, (self.value / max(self.threshold, 1.0))) * self.weight + + +@dataclass +class StressSnapshot: + """Complete stress assessment at a point in time.""" + + mode: StressMode + score: float + signals: list[StressSignal] + multipliers: dict[str, float] + timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + "mode": self.mode.value, + "score": round(self.score, 3), + "signals": [ + { + "name": s.name, + "value": s.value, + "threshold": s.threshold, + "triggered": s.is_triggered, + "contribution": round(s.contribution, 3), + } + for s in self.signals + ], + "multipliers": self.multipliers, + "timestamp": self.timestamp, + } + + +@dataclass +class StressThresholds: + """Thresholds for entering/exiting stress modes.""" + + elevated_min: float = 0.3 + high_min: float = 0.6 + + def get_mode_for_score(self, score: float) -> StressMode: + """Determine stress mode based on score.""" + if score >= self.high_min: + return StressMode.HIGH + elif score >= self.elevated_min: + return StressMode.ELEVATED + return StressMode.CALM + + +# In-memory storage for stress state +_current_snapshot: StressSnapshot | None = None +_last_check_time: datetime | None = None +_config_cache: dict[str, Any] | None = None +_config_mtime: float = 0.0 + + +def _load_stress_config() -> dict[str, Any]: + """Load stress mode configuration from YAML. + + Returns: + Configuration dictionary with default fallbacks + """ + global _config_cache, _config_mtime + + # Check if config file has been modified + if STRESS_CONFIG_PATH.exists(): + mtime = STRESS_CONFIG_PATH.stat().st_mtime + if mtime != _config_mtime or _config_cache is None: + try: + raw = STRESS_CONFIG_PATH.read_text() + _config_cache = yaml.safe_load(raw) or {} + _config_mtime = mtime + logger.debug("Loaded stress config from %s", STRESS_CONFIG_PATH) + except (OSError, yaml.YAMLError) as exc: + logger.warning("Failed to load stress config: %s", exc) + _config_cache = {} + + if _config_cache is None: + _config_cache = {} + + return _config_cache + + +def get_default_config() -> dict[str, Any]: + """Get default stress configuration.""" + return { + "thresholds": { + "elevated_min": 0.3, + "high_min": 0.6, + }, + "signals": { + "flaky_test_rate": { + "threshold": 0.15, # 15% flaky test rate + "weight": 0.3, + "description": "Percentage of tests that are flaky", + }, + "p1_backlog_growth": { + "threshold": 5, # 5 new P1 issues + "weight": 0.25, + "description": "Net growth in P1 priority issues", + }, + "ci_failure_rate": { + "threshold": 0.2, # 20% CI failure rate + "weight": 0.25, + "description": "Percentage of CI runs failing", + }, + "open_bug_count": { + "threshold": 20, # 20 open bugs + "weight": 0.2, + "description": "Total open issues labeled as bugs", + }, + }, + "multipliers": { + StressMode.CALM.value: { + "test_improve": 1.0, + "docs_update": 1.2, # Calm periods good for docs + "issue_count": 1.0, + "issue_reduce": 1.0, + "daily_run": 1.0, + "custom": 1.0, + "exploration": 1.3, # Encourage exploration + "refactor": 1.2, # Encourage refactoring + }, + StressMode.ELEVATED.value: { + "test_improve": 1.2, # Start emphasizing tests + "docs_update": 1.0, + "issue_count": 1.1, + "issue_reduce": 1.1, + "daily_run": 1.0, + "custom": 1.0, + "exploration": 1.0, + "refactor": 0.9, # Discourage risky refactors + }, + StressMode.HIGH.value: { + "test_improve": 1.5, # Strongly incentivize testing + "docs_update": 0.8, # Deprioritize docs + "issue_count": 1.3, # Reward closing issues + "issue_reduce": 1.4, # Strongly reward reducing backlog + "daily_run": 1.1, + "custom": 1.0, + "exploration": 0.7, # Discourage exploration + "refactor": 0.6, # Discourage refactors during crisis + }, + }, + } + + +def _get_config_value(key_path: str, default: Any = None) -> Any: + """Get a value from config using dot notation path.""" + config = _load_stress_config() + keys = key_path.split(".") + value = config + for key in keys: + if isinstance(value, dict): + value = value.get(key) + else: + return default + return value if value is not None else default + + +def _calculate_flaky_test_rate() -> float: + """Calculate current flaky test rate from available data.""" + try: + # Try to load from daily run metrics or test results + test_results_path = Path(settings.repo_root) / ".loop" / "test_results.jsonl" + if not test_results_path.exists(): + return 0.0 + + # Count recent test runs and flaky results + now = datetime.now(UTC) + cutoff = now - timedelta(days=7) + + total_runs = 0 + flaky_runs = 0 + + if test_results_path.exists(): + for line in test_results_path.read_text().strip().splitlines(): + try: + entry = json.loads(line) + ts_str = entry.get("timestamp", "") + if not ts_str: + continue + ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) + if ts >= cutoff: + total_runs += 1 + if entry.get("is_flaky", False): + flaky_runs += 1 + except (json.JSONDecodeError, ValueError): + continue + + return flaky_runs / max(total_runs, 1) + except Exception as exc: + logger.debug("Failed to calculate flaky test rate: %s", exc) + return 0.0 + + +def _calculate_p1_backlog_growth() -> float: + """Calculate P1 issue backlog growth.""" + try: + from dashboard.routes.daily_run import GiteaClient, _load_config + + config = _load_config() + token = config.get("token") + client = GiteaClient(config, token) + + if not client.is_available(): + return 0.0 + + # Get current P1 issues + now = datetime.now(UTC) + cutoff_current = now - timedelta(days=7) + cutoff_previous = now - timedelta(days=14) + + issues = client.get_paginated("issues", {"state": "all", "labels": "P1", "limit": 100}) + + current_count = 0 + previous_count = 0 + + for issue in issues: + created_at = issue.get("created_at", "") + if not created_at: + continue + try: + created = datetime.fromisoformat(created_at.replace("Z", "+00:00")) + if created >= cutoff_current: + current_count += 1 + elif created >= cutoff_previous: + previous_count += 1 + except (ValueError, TypeError): + continue + + # Return net growth (positive means growing backlog) + return max(0, current_count - previous_count) + except Exception as exc: + logger.debug("Failed to calculate P1 backlog growth: %s", exc) + return 0.0 + + +def _calculate_ci_failure_rate() -> float: + """Calculate CI failure rate from recent runs.""" + try: + # Try to get CI metrics from Gitea or local files + ci_results_path = Path(settings.repo_root) / ".loop" / "ci_results.jsonl" + if not ci_results_path.exists(): + return 0.0 + + now = datetime.now(UTC) + cutoff = now - timedelta(days=7) + + total_runs = 0 + failed_runs = 0 + + for line in ci_results_path.read_text().strip().splitlines(): + try: + entry = json.loads(line) + ts_str = entry.get("timestamp", "") + if not ts_str: + continue + ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) + if ts >= cutoff: + total_runs += 1 + if entry.get("status") != "success": + failed_runs += 1 + except (json.JSONDecodeError, ValueError): + continue + + return failed_runs / max(total_runs, 1) + except Exception as exc: + logger.debug("Failed to calculate CI failure rate: %s", exc) + return 0.0 + + +def _calculate_open_bug_count() -> float: + """Calculate current open bug count.""" + try: + from dashboard.routes.daily_run import GiteaClient, _load_config + + config = _load_config() + token = config.get("token") + client = GiteaClient(config, token) + + if not client.is_available(): + return 0.0 + + issues = client.get_paginated("issues", {"state": "open", "labels": "bug", "limit": 100}) + + return float(len(issues)) + except Exception as exc: + logger.debug("Failed to calculate open bug count: %s", exc) + return 0.0 + + +def _collect_stress_signals() -> list[StressSignal]: + """Collect all stress signals from the system.""" + config = _load_stress_config() + default_config = get_default_config() + signals_config = config.get("signals", default_config["signals"]) + + signals = [] + + # Define signal collectors + collectors = { + "flaky_test_rate": _calculate_flaky_test_rate, + "p1_backlog_growth": _calculate_p1_backlog_growth, + "ci_failure_rate": _calculate_ci_failure_rate, + "open_bug_count": _calculate_open_bug_count, + } + + for signal_name, collector in collectors.items(): + signal_cfg = signals_config.get(signal_name, {}) + default_cfg = default_config["signals"].get(signal_name, {}) + + try: + value = collector() + threshold = signal_cfg.get("threshold", default_cfg.get("threshold", 1.0)) + weight = signal_cfg.get("weight", default_cfg.get("weight", 0.25)) + + signals.append( + StressSignal( + name=signal_name, + value=value, + threshold=threshold, + weight=weight, + ) + ) + except Exception as exc: + logger.debug("Failed to collect signal %s: %s", signal_name, exc) + + return signals + + +def _calculate_stress_score(signals: list[StressSignal]) -> float: + """Calculate overall stress score from signals. + + Score is weighted sum of triggered signal contributions, + normalized to 0-1 range. + """ + if not signals: + return 0.0 + + total_weight = sum(s.weight for s in signals) + if total_weight == 0: + return 0.0 + + triggered_contribution = sum(s.contribution for s in signals) + return min(1.0, triggered_contribution / total_weight) + + +def _get_multipliers_for_mode(mode: StressMode) -> dict[str, float]: + """Get token multipliers for a specific stress mode.""" + config = _load_stress_config() + default_config = get_default_config() + + multipliers = config.get("multipliers", default_config["multipliers"]) + mode_multipliers = multipliers.get(mode.value, {}) + default_mode_multipliers = default_config["multipliers"].get(mode.value, {}) + + # Merge with defaults + result = default_mode_multipliers.copy() + result.update(mode_multipliers) + + return result + + +def detect_stress_mode( + force_refresh: bool = False, + min_check_interval_seconds: int = 60, +) -> StressSnapshot: + """Detect current system stress mode. + + Args: + force_refresh: Force a new check even if recently checked + min_check_interval_seconds: Minimum seconds between checks + + Returns: + StressSnapshot with mode, score, signals, and multipliers + """ + global _current_snapshot, _last_check_time + + now = datetime.now(UTC) + + # Return cached snapshot if recent and not forced + if not force_refresh and _current_snapshot is not None and _last_check_time is not None: + elapsed = (now - _last_check_time).total_seconds() + if elapsed < min_check_interval_seconds: + return _current_snapshot + + # Collect signals and calculate stress + signals = _collect_stress_signals() + score = _calculate_stress_score(signals) + + # Determine mode from score + config = _load_stress_config() + default_config = get_default_config() + thresholds_cfg = config.get("thresholds", default_config["thresholds"]) + thresholds = StressThresholds( + elevated_min=thresholds_cfg.get("elevated_min", 0.3), + high_min=thresholds_cfg.get("high_min", 0.6), + ) + mode = thresholds.get_mode_for_score(score) + + # Get multipliers for this mode + multipliers = _get_multipliers_for_mode(mode) + + # Create snapshot + snapshot = StressSnapshot( + mode=mode, + score=score, + signals=signals, + multipliers=multipliers, + timestamp=now.isoformat(), + ) + + # Cache result + _current_snapshot = snapshot + _last_check_time = now + + # Log mode changes + if _current_snapshot is not None and _current_snapshot.mode != mode: + logger.info( + "Stress mode changed: %s -> %s (score: %.2f)", + _current_snapshot.mode.value if _current_snapshot else "none", + mode.value, + score, + ) + + return snapshot + + +def get_current_stress_mode() -> StressMode: + """Get current stress mode (uses cached or fresh detection).""" + snapshot = detect_stress_mode() + return snapshot.mode + + +def get_multiplier(quest_type: str, mode: StressMode | None = None) -> float: + """Get token multiplier for a quest type. + + Args: + quest_type: Type of quest (test_improve, issue_count, etc.) + mode: Specific mode to get multiplier for, or None for current + + Returns: + Multiplier value (1.0 = normal, 1.5 = 50% bonus, etc.) + """ + if mode is None: + mode = get_current_stress_mode() + + multipliers = _get_multipliers_for_mode(mode) + return multipliers.get(quest_type, 1.0) + + +def apply_multiplier(base_reward: int, quest_type: str) -> int: + """Apply stress-based multiplier to a base reward. + + Args: + base_reward: Base token reward amount + quest_type: Type of quest for multiplier lookup + + Returns: + Adjusted reward amount (always >= 1) + """ + multiplier = get_multiplier(quest_type) + adjusted = int(base_reward * multiplier) + return max(1, adjusted) + + +def get_stress_summary() -> dict[str, Any]: + """Get a human-readable summary of current stress state.""" + snapshot = detect_stress_mode() + + # Generate explanation + explanations = { + StressMode.CALM: "System is calm. Good time for exploration and refactoring.", + StressMode.ELEVATED: "Elevated stress detected. Focus on stability and tests.", + StressMode.HIGH: "HIGH STRESS MODE. Prioritize bug fixes and test hardening.", + } + + triggered_signals = [s for s in snapshot.signals if s.is_triggered] + + return { + "mode": snapshot.mode.value, + "score": round(snapshot.score, 3), + "explanation": explanations.get(snapshot.mode, "Unknown mode"), + "active_signals": [ + { + "name": s.name, + "value": round(s.value, 3), + "threshold": s.threshold, + } + for s in triggered_signals + ], + "current_multipliers": snapshot.multipliers, + "last_updated": snapshot.timestamp, + } + + +def reset_stress_state() -> None: + """Reset stress state cache (useful for testing).""" + global _current_snapshot, _last_check_time, _config_cache, _config_mtime + _current_snapshot = None + _last_check_time = None + _config_cache = None + _config_mtime = 0.0 diff --git a/tests/unit/test_stress_detector.py b/tests/unit/test_stress_detector.py new file mode 100644 index 0000000..b227991 --- /dev/null +++ b/tests/unit/test_stress_detector.py @@ -0,0 +1,294 @@ +"""Unit tests for the stress detector module. + +Tests stress signal calculation, mode detection, multipliers, +and integration with the quest system. +""" + +from __future__ import annotations + +import pytest + +from timmy.stress_detector import ( + StressMode, + StressSignal, + StressSnapshot, + StressThresholds, + _calculate_stress_score, + _get_multipliers_for_mode, + apply_multiplier, + get_default_config, + reset_stress_state, +) + + +@pytest.fixture(autouse=True) +def clean_stress_state(): + """Reset stress state between tests.""" + reset_stress_state() + yield + reset_stress_state() + + +# ── Stress Mode Tests ────────────────────────────────────────────────────── + + +class TestStressMode: + def test_stress_mode_values(self): + """StressMode enum has expected values.""" + assert StressMode.CALM.value == "calm" + assert StressMode.ELEVATED.value == "elevated" + assert StressMode.HIGH.value == "high" + + +# ── Stress Signal Tests ──────────────────────────────────────────────────── + + +class TestStressSignal: + def test_signal_not_triggered(self): + """Signal with value below threshold is not triggered.""" + signal = StressSignal( + name="test_signal", + value=5.0, + threshold=10.0, + weight=0.5, + ) + assert not signal.is_triggered + assert signal.contribution == 0.0 + + def test_signal_triggered(self): + """Signal with value at threshold is triggered.""" + signal = StressSignal( + name="test_signal", + value=10.0, + threshold=10.0, + weight=0.5, + ) + assert signal.is_triggered + assert signal.contribution == 0.5 # weight * min(1, value/threshold) + + def test_signal_contribution_capped(self): + """Signal contribution is capped at weight when value >> threshold.""" + signal = StressSignal( + name="test_signal", + value=100.0, + threshold=10.0, + weight=0.5, + ) + assert signal.is_triggered + assert signal.contribution == 0.5 # Capped at weight + + def test_signal_partial_contribution(self): + """Signal contribution scales with value/threshold ratio.""" + signal = StressSignal( + name="test_signal", + value=15.0, + threshold=10.0, + weight=0.5, + ) + assert signal.is_triggered + # contribution = min(1, 15/10) * 0.5 = 0.5 (capped) + assert signal.contribution == 0.5 + + +# ── Stress Thresholds Tests ──────────────────────────────────────────────── + + +class TestStressThresholds: + def test_calm_mode(self): + """Score below elevated_min returns CALM mode.""" + thresholds = StressThresholds(elevated_min=0.3, high_min=0.6) + assert thresholds.get_mode_for_score(0.0) == StressMode.CALM + assert thresholds.get_mode_for_score(0.1) == StressMode.CALM + assert thresholds.get_mode_for_score(0.29) == StressMode.CALM + + def test_elevated_mode(self): + """Score between elevated_min and high_min returns ELEVATED mode.""" + thresholds = StressThresholds(elevated_min=0.3, high_min=0.6) + assert thresholds.get_mode_for_score(0.3) == StressMode.ELEVATED + assert thresholds.get_mode_for_score(0.5) == StressMode.ELEVATED + assert thresholds.get_mode_for_score(0.59) == StressMode.ELEVATED + + def test_high_mode(self): + """Score at or above high_min returns HIGH mode.""" + thresholds = StressThresholds(elevated_min=0.3, high_min=0.6) + assert thresholds.get_mode_for_score(0.6) == StressMode.HIGH + assert thresholds.get_mode_for_score(0.8) == StressMode.HIGH + assert thresholds.get_mode_for_score(1.0) == StressMode.HIGH + + +# ── Stress Score Calculation Tests ───────────────────────────────────────── + + +class TestStressScoreCalculation: + def test_empty_signals(self): + """Empty signal list returns zero stress score.""" + score = _calculate_stress_score([]) + assert score == 0.0 + + def test_no_triggered_signals(self): + """No triggered signals means zero stress score.""" + signals = [ + StressSignal(name="s1", value=1.0, threshold=10.0, weight=0.5), + StressSignal(name="s2", value=2.0, threshold=10.0, weight=0.5), + ] + score = _calculate_stress_score(signals) + assert score == 0.0 + + def test_single_triggered_signal(self): + """Single triggered signal contributes its weight.""" + signals = [ + StressSignal(name="s1", value=10.0, threshold=10.0, weight=0.5), + ] + score = _calculate_stress_score(signals) + # contribution = 0.5, total_weight = 0.5, score = 0.5/0.5 = 1.0 + assert score == 1.0 + + def test_mixed_signals(self): + """Mix of triggered and non-triggered signals.""" + signals = [ + StressSignal(name="s1", value=10.0, threshold=10.0, weight=0.3), + StressSignal(name="s2", value=1.0, threshold=10.0, weight=0.3), + StressSignal(name="s3", value=10.0, threshold=10.0, weight=0.4), + ] + score = _calculate_stress_score(signals) + # triggered contributions: 0.3 + 0.4 = 0.7 + # total_weight: 0.3 + 0.3 + 0.4 = 1.0 + # score = 0.7 / 1.0 = 0.7 + assert score == 0.7 + + def test_score_capped_at_one(self): + """Stress score is capped at 1.0.""" + signals = [ + StressSignal(name="s1", value=100.0, threshold=10.0, weight=1.0), + StressSignal(name="s2", value=100.0, threshold=10.0, weight=1.0), + ] + score = _calculate_stress_score(signals) + assert score == 1.0 # Capped + + +# ── Multiplier Tests ─────────────────────────────────────────────────────── + + +class TestMultipliers: + def test_default_config_structure(self): + """Default config has expected structure.""" + config = get_default_config() + assert "thresholds" in config + assert "signals" in config + assert "multipliers" in config + + def test_calm_mode_multipliers(self): + """Calm mode has expected multipliers.""" + multipliers = _get_multipliers_for_mode(StressMode.CALM) + assert multipliers["test_improve"] == 1.0 + assert multipliers["docs_update"] == 1.2 + assert multipliers["exploration"] == 1.3 + assert multipliers["refactor"] == 1.2 + + def test_elevated_mode_multipliers(self): + """Elevated mode has expected multipliers.""" + multipliers = _get_multipliers_for_mode(StressMode.ELEVATED) + assert multipliers["test_improve"] == 1.2 + assert multipliers["issue_reduce"] == 1.1 + assert multipliers["refactor"] == 0.9 + + def test_high_mode_multipliers(self): + """High stress mode has expected multipliers.""" + multipliers = _get_multipliers_for_mode(StressMode.HIGH) + assert multipliers["test_improve"] == 1.5 + assert multipliers["issue_reduce"] == 1.4 + assert multipliers["exploration"] == 0.7 + assert multipliers["refactor"] == 0.6 + + def test_multiplier_fallback_for_unknown_type(self): + """Unknown quest types return default multiplier of 1.0.""" + multipliers = _get_multipliers_for_mode(StressMode.CALM) + assert multipliers.get("unknown_type", 1.0) == 1.0 + + +# ── Apply Multiplier Tests ───────────────────────────────────────────────── + + +class TestApplyMultiplier: + def test_apply_multiplier_calm(self): + """Multiplier applies correctly in calm mode.""" + # This test uses get_multiplier which reads from current stress mode + # Since we can't easily mock the stress mode, we test the apply_multiplier logic + base = 100 + # In calm mode with test_improve = 1.0 + result = apply_multiplier(base, "unknown_type") + assert result >= 1 # At least 1 token + + def test_apply_multiplier_minimum_one(self): + """Applied reward is at least 1 token.""" + # Even with very low multiplier, result should be >= 1 + result = apply_multiplier(1, "any_type") + assert result >= 1 + + +# ── Stress Snapshot Tests ────────────────────────────────────────────────── + + +class TestStressSnapshot: + def test_snapshot_to_dict(self): + """Snapshot can be converted to dictionary.""" + signals = [ + StressSignal(name="test", value=10.0, threshold=5.0, weight=0.5), + ] + snapshot = StressSnapshot( + mode=StressMode.ELEVATED, + score=0.5, + signals=signals, + multipliers={"test_improve": 1.2}, + ) + + data = snapshot.to_dict() + assert data["mode"] == "elevated" + assert data["score"] == 0.5 + assert len(data["signals"]) == 1 + assert data["multipliers"]["test_improve"] == 1.2 + + +# ── Integration Tests ────────────────────────────────────────────────────── + + +class TestStressDetectorIntegration: + def test_reset_stress_state(self): + """Reset clears internal state.""" + # Just verify reset doesn't error + reset_stress_state() + + def test_default_config_contains_all_signals(self): + """Default config defines all expected signals.""" + config = get_default_config() + signals = config["signals"] + + expected_signals = [ + "flaky_test_rate", + "p1_backlog_growth", + "ci_failure_rate", + "open_bug_count", + ] + + for signal in expected_signals: + assert signal in signals + assert "threshold" in signals[signal] + assert "weight" in signals[signal] + + def test_default_config_contains_all_modes(self): + """Default config defines all stress modes.""" + config = get_default_config() + multipliers = config["multipliers"] + + assert "calm" in multipliers + assert "elevated" in multipliers + assert "high" in multipliers + + def test_multiplier_weights_sum_approximately_one(self): + """Signal weights should approximately sum to 1.0.""" + config = get_default_config() + signals = config["signals"] + + total_weight = sum(s["weight"] for s in signals.values()) + # Allow some flexibility but should be close to 1.0 + assert 0.9 <= total_weight <= 1.1