forked from Rockachopa/Timmy-time-dashboard
Compare commits
1 Commits
main
...
kimi/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
919a011cae |
98
config/stress_modes.yaml
Normal file
98
config/stress_modes.yaml
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
# ── System Stress Modes Configuration ────────────────────────────────────────
|
||||||
|
#
|
||||||
|
# This configuration defines how token rewards adapt based on system stress.
|
||||||
|
# When the system detects elevated stress (flaky tests, growing backlog,
|
||||||
|
# CI failures), quest rewards are adjusted to incentivize agents to focus
|
||||||
|
# on the most critical areas.
|
||||||
|
#
|
||||||
|
# ── How It Works ─────────────────────────────────────────────────────────────
|
||||||
|
#
|
||||||
|
# 1. SIGNALS: System metrics are monitored continuously
|
||||||
|
# 2. SCORE: Weighted contributions from triggered signals create a stress score
|
||||||
|
# 3. MODE: Score determines the stress mode (calm, elevated, high)
|
||||||
|
# 4. MULTIPLIERS: Token rewards are multiplied based on the current mode
|
||||||
|
#
|
||||||
|
# ── Stress Thresholds ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
thresholds:
|
||||||
|
# Minimum score to enter elevated mode (0.0 - 1.0)
|
||||||
|
elevated_min: 0.3
|
||||||
|
|
||||||
|
# Minimum score to enter high stress mode (0.0 - 1.0)
|
||||||
|
high_min: 0.6
|
||||||
|
|
||||||
|
# ── Stress Signals ───────────────────────────────────────────────────────────
|
||||||
|
#
|
||||||
|
# Each signal has:
|
||||||
|
# - threshold: Value at which signal is considered "triggered"
|
||||||
|
# - weight: Contribution to overall stress score (should sum to ~1.0)
|
||||||
|
|
||||||
|
signals:
|
||||||
|
flaky_test_rate:
|
||||||
|
threshold: 0.15 # 15% of tests showing flakiness
|
||||||
|
weight: 0.30
|
||||||
|
description: "Percentage of test runs that are flaky"
|
||||||
|
|
||||||
|
p1_backlog_growth:
|
||||||
|
threshold: 5 # 5 new P1 issues in lookback period
|
||||||
|
weight: 0.25
|
||||||
|
description: "Net growth in P1 priority issues over 7 days"
|
||||||
|
|
||||||
|
ci_failure_rate:
|
||||||
|
threshold: 0.20 # 20% of CI runs failing
|
||||||
|
weight: 0.25
|
||||||
|
description: "Percentage of CI runs failing in lookback period"
|
||||||
|
|
||||||
|
open_bug_count:
|
||||||
|
threshold: 20 # 20 open bugs
|
||||||
|
weight: 0.20
|
||||||
|
description: "Total open issues labeled as 'bug'"
|
||||||
|
|
||||||
|
# ── Token Multipliers ────────────────────────────────────────────────────────
|
||||||
|
#
|
||||||
|
# Multipliers are applied to quest rewards based on current stress mode.
|
||||||
|
# Values > 1.0 increase rewards, < 1.0 decrease rewards.
|
||||||
|
#
|
||||||
|
# Quest types:
|
||||||
|
# - test_improve: Test coverage/quality improvements
|
||||||
|
# - docs_update: Documentation updates
|
||||||
|
# - issue_count: Closing specific issue types
|
||||||
|
# - issue_reduce: Reducing overall issue backlog
|
||||||
|
# - daily_run: Daily Run session completion
|
||||||
|
# - custom: Special/manual quests
|
||||||
|
# - exploration: Exploratory work
|
||||||
|
# - refactor: Code refactoring
|
||||||
|
|
||||||
|
multipliers:
|
||||||
|
calm:
|
||||||
|
# Calm periods: incentivize maintenance and exploration
|
||||||
|
test_improve: 1.0
|
||||||
|
docs_update: 1.2
|
||||||
|
issue_count: 1.0
|
||||||
|
issue_reduce: 1.0
|
||||||
|
daily_run: 1.0
|
||||||
|
custom: 1.0
|
||||||
|
exploration: 1.3
|
||||||
|
refactor: 1.2
|
||||||
|
|
||||||
|
elevated:
|
||||||
|
# Elevated stress: start emphasizing stability
|
||||||
|
test_improve: 1.2
|
||||||
|
docs_update: 1.0
|
||||||
|
issue_count: 1.1
|
||||||
|
issue_reduce: 1.1
|
||||||
|
daily_run: 1.0
|
||||||
|
custom: 1.0
|
||||||
|
exploration: 1.0
|
||||||
|
refactor: 0.9 # Discourage risky changes
|
||||||
|
|
||||||
|
high:
|
||||||
|
# High stress: crisis mode, focus on stabilization
|
||||||
|
test_improve: 1.5 # Strongly incentivize testing
|
||||||
|
docs_update: 0.8 # Deprioritize docs
|
||||||
|
issue_count: 1.3 # Reward closing issues
|
||||||
|
issue_reduce: 1.4 # Strongly reward reducing backlog
|
||||||
|
daily_run: 1.1
|
||||||
|
custom: 1.0
|
||||||
|
exploration: 0.7 # Discourage exploration
|
||||||
|
refactor: 0.6 # Discourage refactors during crisis
|
||||||
@@ -187,6 +187,76 @@ async def reload_quest_config_api() -> JSONResponse:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Stress Mode Endpoints
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/api/stress")
|
||||||
|
async def get_stress_status_api() -> JSONResponse:
|
||||||
|
"""Get current stress mode status and multipliers.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Current stress mode, score, active signals, and multipliers
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from timmy.stress_detector import (
|
||||||
|
detect_stress_mode,
|
||||||
|
get_stress_summary,
|
||||||
|
)
|
||||||
|
|
||||||
|
snapshot = detect_stress_mode()
|
||||||
|
summary = get_stress_summary()
|
||||||
|
|
||||||
|
return JSONResponse(
|
||||||
|
{
|
||||||
|
"status": "ok",
|
||||||
|
"stress": summary,
|
||||||
|
"raw": snapshot.to_dict(),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Failed to get stress status: %s", exc)
|
||||||
|
return JSONResponse(
|
||||||
|
{
|
||||||
|
"status": "error",
|
||||||
|
"error": str(exc),
|
||||||
|
},
|
||||||
|
status_code=500,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/api/stress/refresh")
|
||||||
|
async def refresh_stress_detection_api() -> JSONResponse:
|
||||||
|
"""Force a fresh stress detection check.
|
||||||
|
|
||||||
|
Normally stress is cached for 60 seconds. This endpoint
|
||||||
|
bypasses the cache for immediate results.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from timmy.stress_detector import detect_stress_mode, get_stress_summary
|
||||||
|
|
||||||
|
snapshot = detect_stress_mode(force_refresh=True)
|
||||||
|
summary = get_stress_summary()
|
||||||
|
|
||||||
|
return JSONResponse(
|
||||||
|
{
|
||||||
|
"status": "ok",
|
||||||
|
"stress": summary,
|
||||||
|
"raw": snapshot.to_dict(),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Failed to refresh stress detection: %s", exc)
|
||||||
|
return JSONResponse(
|
||||||
|
{
|
||||||
|
"status": "error",
|
||||||
|
"error": str(exc),
|
||||||
|
},
|
||||||
|
status_code=500,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Dashboard UI Endpoints
|
# Dashboard UI Endpoints
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -269,6 +269,22 @@ def _is_on_cooldown(progress: QuestProgress, quest: QuestDefinition) -> bool:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _apply_stress_multiplier(base_reward: int, quest_type: QuestType) -> tuple[int, float]:
|
||||||
|
"""Apply stress-based multiplier to quest reward.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (adjusted_reward, multiplier_used)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from timmy.stress_detector import apply_multiplier
|
||||||
|
|
||||||
|
multiplier = apply_multiplier(base_reward, quest_type.value)
|
||||||
|
return multiplier, multiplier / max(base_reward, 1)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("Failed to apply stress multiplier: %s", exc)
|
||||||
|
return base_reward, 1.0
|
||||||
|
|
||||||
|
|
||||||
def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
|
def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
|
||||||
"""Claim the token reward for a completed quest.
|
"""Claim the token reward for a completed quest.
|
||||||
|
|
||||||
@@ -292,13 +308,18 @@ def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# Apply stress-based multiplier
|
||||||
|
adjusted_reward, multiplier = _apply_stress_multiplier(
|
||||||
|
quest.reward_tokens, quest.quest_type
|
||||||
|
)
|
||||||
|
|
||||||
# Award tokens via ledger
|
# Award tokens via ledger
|
||||||
from lightning.ledger import create_invoice_entry, mark_settled
|
from lightning.ledger import create_invoice_entry, mark_settled
|
||||||
|
|
||||||
# Create a mock invoice for the reward
|
# Create a mock invoice for the reward
|
||||||
invoice_entry = create_invoice_entry(
|
invoice_entry = create_invoice_entry(
|
||||||
payment_hash=f"quest_{quest_id}_{agent_id}_{int(time.time())}",
|
payment_hash=f"quest_{quest_id}_{agent_id}_{int(time.time())}",
|
||||||
amount_sats=quest.reward_tokens,
|
amount_sats=adjusted_reward,
|
||||||
memo=f"Quest reward: {quest.name}",
|
memo=f"Quest reward: {quest.name}",
|
||||||
source="quest_reward",
|
source="quest_reward",
|
||||||
agent_id=agent_id,
|
agent_id=agent_id,
|
||||||
@@ -320,12 +341,21 @@ def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
|
|||||||
progress.completed_at = ""
|
progress.completed_at = ""
|
||||||
progress.claimed_at = ""
|
progress.claimed_at = ""
|
||||||
|
|
||||||
notification = quest.notification_message.format(tokens=quest.reward_tokens)
|
# Build notification with multiplier info
|
||||||
|
notification = quest.notification_message.format(tokens=adjusted_reward)
|
||||||
|
if multiplier != 1.0:
|
||||||
|
pct = int((multiplier - 1.0) * 100)
|
||||||
|
if pct > 0:
|
||||||
|
notification += f" (+{pct}% stress bonus)"
|
||||||
|
else:
|
||||||
|
notification += f" ({pct}% stress adjustment)"
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"quest_id": quest_id,
|
"quest_id": quest_id,
|
||||||
"agent_id": agent_id,
|
"agent_id": agent_id,
|
||||||
"tokens_awarded": quest.reward_tokens,
|
"tokens_awarded": adjusted_reward,
|
||||||
|
"base_reward": quest.reward_tokens,
|
||||||
|
"multiplier": round(multiplier, 2),
|
||||||
"notification": notification,
|
"notification": notification,
|
||||||
"completion_count": progress.completion_count,
|
"completion_count": progress.completion_count,
|
||||||
}
|
}
|
||||||
@@ -467,6 +497,14 @@ def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
|
|||||||
total_rewards = 0
|
total_rewards = 0
|
||||||
completed_count = 0
|
completed_count = 0
|
||||||
|
|
||||||
|
# Get current stress mode for adjusted rewards display
|
||||||
|
try:
|
||||||
|
from timmy.stress_detector import get_current_stress_mode, get_multiplier
|
||||||
|
|
||||||
|
current_mode = get_current_stress_mode()
|
||||||
|
except Exception:
|
||||||
|
current_mode = None
|
||||||
|
|
||||||
for quest_id, quest in definitions.items():
|
for quest_id, quest in definitions.items():
|
||||||
progress = get_quest_progress(quest_id, agent_id)
|
progress = get_quest_progress(quest_id, agent_id)
|
||||||
if not progress:
|
if not progress:
|
||||||
@@ -474,11 +512,23 @@ def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
|
|||||||
|
|
||||||
is_on_cooldown = _is_on_cooldown(progress, quest) if quest.repeatable else False
|
is_on_cooldown = _is_on_cooldown(progress, quest) if quest.repeatable else False
|
||||||
|
|
||||||
|
# Calculate adjusted reward with stress multiplier
|
||||||
|
adjusted_reward = quest.reward_tokens
|
||||||
|
multiplier = 1.0
|
||||||
|
if current_mode:
|
||||||
|
try:
|
||||||
|
multiplier = get_multiplier(quest.quest_type.value, current_mode)
|
||||||
|
adjusted_reward = int(quest.reward_tokens * multiplier)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
quest_info = {
|
quest_info = {
|
||||||
"quest_id": quest_id,
|
"quest_id": quest_id,
|
||||||
"name": quest.name,
|
"name": quest.name,
|
||||||
"description": quest.description,
|
"description": quest.description,
|
||||||
"reward_tokens": quest.reward_tokens,
|
"reward_tokens": quest.reward_tokens,
|
||||||
|
"adjusted_reward": adjusted_reward,
|
||||||
|
"multiplier": round(multiplier, 2),
|
||||||
"type": quest.quest_type.value,
|
"type": quest.quest_type.value,
|
||||||
"enabled": quest.enabled,
|
"enabled": quest.enabled,
|
||||||
"repeatable": quest.repeatable,
|
"repeatable": quest.repeatable,
|
||||||
@@ -509,6 +559,7 @@ def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
|
|||||||
"total_tokens_earned": total_rewards,
|
"total_tokens_earned": total_rewards,
|
||||||
"total_quests_completed": completed_count,
|
"total_quests_completed": completed_count,
|
||||||
"active_quests_count": len([q for q in quests_status if q["enabled"]]),
|
"active_quests_count": len([q for q in quests_status if q["enabled"]]),
|
||||||
|
"stress_mode": current_mode.value if current_mode else None,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
565
src/timmy/stress_detector.py
Normal file
565
src/timmy/stress_detector.py
Normal file
@@ -0,0 +1,565 @@
|
|||||||
|
"""System stress detection for adaptive token rewards.
|
||||||
|
|
||||||
|
Monitors system signals like flakiness, backlog growth, and CI failures
|
||||||
|
to determine the current stress mode. Token rewards are then adjusted
|
||||||
|
based on the stress mode to incentivize agents to focus on critical areas.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from datetime import UTC, datetime, timedelta
|
||||||
|
from enum import StrEnum
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
from config import settings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Path to stress mode configuration
|
||||||
|
STRESS_CONFIG_PATH = Path(settings.repo_root) / "config" / "stress_modes.yaml"
|
||||||
|
|
||||||
|
|
||||||
|
class StressMode(StrEnum):
|
||||||
|
"""System stress modes.
|
||||||
|
|
||||||
|
- CALM: Normal operations, incentivize exploration and refactoring
|
||||||
|
- ELEVATED: Some stress signals detected, balance incentives
|
||||||
|
- HIGH: Critical stress, strongly incentivize bug fixes and stabilization
|
||||||
|
"""
|
||||||
|
|
||||||
|
CALM = "calm"
|
||||||
|
ELEVATED = "elevated"
|
||||||
|
HIGH = "high"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class StressSignal:
|
||||||
|
"""A single stress signal reading."""
|
||||||
|
|
||||||
|
name: str
|
||||||
|
value: float
|
||||||
|
threshold: float
|
||||||
|
weight: float
|
||||||
|
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_triggered(self) -> bool:
|
||||||
|
"""Whether this signal exceeds its threshold."""
|
||||||
|
return self.value >= self.threshold
|
||||||
|
|
||||||
|
@property
|
||||||
|
def contribution(self) -> float:
|
||||||
|
"""Calculate this signal's contribution to stress score."""
|
||||||
|
if not self.is_triggered:
|
||||||
|
return 0.0
|
||||||
|
# Contribution is weighted ratio of value to threshold
|
||||||
|
return min(1.0, (self.value / max(self.threshold, 1.0))) * self.weight
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class StressSnapshot:
|
||||||
|
"""Complete stress assessment at a point in time."""
|
||||||
|
|
||||||
|
mode: StressMode
|
||||||
|
score: float
|
||||||
|
signals: list[StressSignal]
|
||||||
|
multipliers: dict[str, float]
|
||||||
|
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
||||||
|
|
||||||
|
def to_dict(self) -> dict[str, Any]:
|
||||||
|
"""Convert to dictionary for serialization."""
|
||||||
|
return {
|
||||||
|
"mode": self.mode.value,
|
||||||
|
"score": round(self.score, 3),
|
||||||
|
"signals": [
|
||||||
|
{
|
||||||
|
"name": s.name,
|
||||||
|
"value": s.value,
|
||||||
|
"threshold": s.threshold,
|
||||||
|
"triggered": s.is_triggered,
|
||||||
|
"contribution": round(s.contribution, 3),
|
||||||
|
}
|
||||||
|
for s in self.signals
|
||||||
|
],
|
||||||
|
"multipliers": self.multipliers,
|
||||||
|
"timestamp": self.timestamp,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class StressThresholds:
|
||||||
|
"""Thresholds for entering/exiting stress modes."""
|
||||||
|
|
||||||
|
elevated_min: float = 0.3
|
||||||
|
high_min: float = 0.6
|
||||||
|
|
||||||
|
def get_mode_for_score(self, score: float) -> StressMode:
|
||||||
|
"""Determine stress mode based on score."""
|
||||||
|
if score >= self.high_min:
|
||||||
|
return StressMode.HIGH
|
||||||
|
elif score >= self.elevated_min:
|
||||||
|
return StressMode.ELEVATED
|
||||||
|
return StressMode.CALM
|
||||||
|
|
||||||
|
|
||||||
|
# In-memory storage for stress state
|
||||||
|
_current_snapshot: StressSnapshot | None = None
|
||||||
|
_last_check_time: datetime | None = None
|
||||||
|
_config_cache: dict[str, Any] | None = None
|
||||||
|
_config_mtime: float = 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def _load_stress_config() -> dict[str, Any]:
|
||||||
|
"""Load stress mode configuration from YAML.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Configuration dictionary with default fallbacks
|
||||||
|
"""
|
||||||
|
global _config_cache, _config_mtime
|
||||||
|
|
||||||
|
# Check if config file has been modified
|
||||||
|
if STRESS_CONFIG_PATH.exists():
|
||||||
|
mtime = STRESS_CONFIG_PATH.stat().st_mtime
|
||||||
|
if mtime != _config_mtime or _config_cache is None:
|
||||||
|
try:
|
||||||
|
raw = STRESS_CONFIG_PATH.read_text()
|
||||||
|
_config_cache = yaml.safe_load(raw) or {}
|
||||||
|
_config_mtime = mtime
|
||||||
|
logger.debug("Loaded stress config from %s", STRESS_CONFIG_PATH)
|
||||||
|
except (OSError, yaml.YAMLError) as exc:
|
||||||
|
logger.warning("Failed to load stress config: %s", exc)
|
||||||
|
_config_cache = {}
|
||||||
|
|
||||||
|
if _config_cache is None:
|
||||||
|
_config_cache = {}
|
||||||
|
|
||||||
|
return _config_cache
|
||||||
|
|
||||||
|
|
||||||
|
def get_default_config() -> dict[str, Any]:
|
||||||
|
"""Get default stress configuration."""
|
||||||
|
return {
|
||||||
|
"thresholds": {
|
||||||
|
"elevated_min": 0.3,
|
||||||
|
"high_min": 0.6,
|
||||||
|
},
|
||||||
|
"signals": {
|
||||||
|
"flaky_test_rate": {
|
||||||
|
"threshold": 0.15, # 15% flaky test rate
|
||||||
|
"weight": 0.3,
|
||||||
|
"description": "Percentage of tests that are flaky",
|
||||||
|
},
|
||||||
|
"p1_backlog_growth": {
|
||||||
|
"threshold": 5, # 5 new P1 issues
|
||||||
|
"weight": 0.25,
|
||||||
|
"description": "Net growth in P1 priority issues",
|
||||||
|
},
|
||||||
|
"ci_failure_rate": {
|
||||||
|
"threshold": 0.2, # 20% CI failure rate
|
||||||
|
"weight": 0.25,
|
||||||
|
"description": "Percentage of CI runs failing",
|
||||||
|
},
|
||||||
|
"open_bug_count": {
|
||||||
|
"threshold": 20, # 20 open bugs
|
||||||
|
"weight": 0.2,
|
||||||
|
"description": "Total open issues labeled as bugs",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"multipliers": {
|
||||||
|
StressMode.CALM.value: {
|
||||||
|
"test_improve": 1.0,
|
||||||
|
"docs_update": 1.2, # Calm periods good for docs
|
||||||
|
"issue_count": 1.0,
|
||||||
|
"issue_reduce": 1.0,
|
||||||
|
"daily_run": 1.0,
|
||||||
|
"custom": 1.0,
|
||||||
|
"exploration": 1.3, # Encourage exploration
|
||||||
|
"refactor": 1.2, # Encourage refactoring
|
||||||
|
},
|
||||||
|
StressMode.ELEVATED.value: {
|
||||||
|
"test_improve": 1.2, # Start emphasizing tests
|
||||||
|
"docs_update": 1.0,
|
||||||
|
"issue_count": 1.1,
|
||||||
|
"issue_reduce": 1.1,
|
||||||
|
"daily_run": 1.0,
|
||||||
|
"custom": 1.0,
|
||||||
|
"exploration": 1.0,
|
||||||
|
"refactor": 0.9, # Discourage risky refactors
|
||||||
|
},
|
||||||
|
StressMode.HIGH.value: {
|
||||||
|
"test_improve": 1.5, # Strongly incentivize testing
|
||||||
|
"docs_update": 0.8, # Deprioritize docs
|
||||||
|
"issue_count": 1.3, # Reward closing issues
|
||||||
|
"issue_reduce": 1.4, # Strongly reward reducing backlog
|
||||||
|
"daily_run": 1.1,
|
||||||
|
"custom": 1.0,
|
||||||
|
"exploration": 0.7, # Discourage exploration
|
||||||
|
"refactor": 0.6, # Discourage refactors during crisis
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _get_config_value(key_path: str, default: Any = None) -> Any:
|
||||||
|
"""Get a value from config using dot notation path."""
|
||||||
|
config = _load_stress_config()
|
||||||
|
keys = key_path.split(".")
|
||||||
|
value = config
|
||||||
|
for key in keys:
|
||||||
|
if isinstance(value, dict):
|
||||||
|
value = value.get(key)
|
||||||
|
else:
|
||||||
|
return default
|
||||||
|
return value if value is not None else default
|
||||||
|
|
||||||
|
|
||||||
|
def _calculate_flaky_test_rate() -> float:
|
||||||
|
"""Calculate current flaky test rate from available data."""
|
||||||
|
try:
|
||||||
|
# Try to load from daily run metrics or test results
|
||||||
|
test_results_path = Path(settings.repo_root) / ".loop" / "test_results.jsonl"
|
||||||
|
if not test_results_path.exists():
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
# Count recent test runs and flaky results
|
||||||
|
now = datetime.now(UTC)
|
||||||
|
cutoff = now - timedelta(days=7)
|
||||||
|
|
||||||
|
total_runs = 0
|
||||||
|
flaky_runs = 0
|
||||||
|
|
||||||
|
if test_results_path.exists():
|
||||||
|
for line in test_results_path.read_text().strip().splitlines():
|
||||||
|
try:
|
||||||
|
entry = json.loads(line)
|
||||||
|
ts_str = entry.get("timestamp", "")
|
||||||
|
if not ts_str:
|
||||||
|
continue
|
||||||
|
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
||||||
|
if ts >= cutoff:
|
||||||
|
total_runs += 1
|
||||||
|
if entry.get("is_flaky", False):
|
||||||
|
flaky_runs += 1
|
||||||
|
except (json.JSONDecodeError, ValueError):
|
||||||
|
continue
|
||||||
|
|
||||||
|
return flaky_runs / max(total_runs, 1)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("Failed to calculate flaky test rate: %s", exc)
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def _calculate_p1_backlog_growth() -> float:
|
||||||
|
"""Calculate P1 issue backlog growth."""
|
||||||
|
try:
|
||||||
|
from dashboard.routes.daily_run import GiteaClient, _load_config
|
||||||
|
|
||||||
|
config = _load_config()
|
||||||
|
token = config.get("token")
|
||||||
|
client = GiteaClient(config, token)
|
||||||
|
|
||||||
|
if not client.is_available():
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
# Get current P1 issues
|
||||||
|
now = datetime.now(UTC)
|
||||||
|
cutoff_current = now - timedelta(days=7)
|
||||||
|
cutoff_previous = now - timedelta(days=14)
|
||||||
|
|
||||||
|
issues = client.get_paginated("issues", {"state": "all", "labels": "P1", "limit": 100})
|
||||||
|
|
||||||
|
current_count = 0
|
||||||
|
previous_count = 0
|
||||||
|
|
||||||
|
for issue in issues:
|
||||||
|
created_at = issue.get("created_at", "")
|
||||||
|
if not created_at:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
created = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
|
||||||
|
if created >= cutoff_current:
|
||||||
|
current_count += 1
|
||||||
|
elif created >= cutoff_previous:
|
||||||
|
previous_count += 1
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Return net growth (positive means growing backlog)
|
||||||
|
return max(0, current_count - previous_count)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("Failed to calculate P1 backlog growth: %s", exc)
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def _calculate_ci_failure_rate() -> float:
|
||||||
|
"""Calculate CI failure rate from recent runs."""
|
||||||
|
try:
|
||||||
|
# Try to get CI metrics from Gitea or local files
|
||||||
|
ci_results_path = Path(settings.repo_root) / ".loop" / "ci_results.jsonl"
|
||||||
|
if not ci_results_path.exists():
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
now = datetime.now(UTC)
|
||||||
|
cutoff = now - timedelta(days=7)
|
||||||
|
|
||||||
|
total_runs = 0
|
||||||
|
failed_runs = 0
|
||||||
|
|
||||||
|
for line in ci_results_path.read_text().strip().splitlines():
|
||||||
|
try:
|
||||||
|
entry = json.loads(line)
|
||||||
|
ts_str = entry.get("timestamp", "")
|
||||||
|
if not ts_str:
|
||||||
|
continue
|
||||||
|
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
||||||
|
if ts >= cutoff:
|
||||||
|
total_runs += 1
|
||||||
|
if entry.get("status") != "success":
|
||||||
|
failed_runs += 1
|
||||||
|
except (json.JSONDecodeError, ValueError):
|
||||||
|
continue
|
||||||
|
|
||||||
|
return failed_runs / max(total_runs, 1)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("Failed to calculate CI failure rate: %s", exc)
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def _calculate_open_bug_count() -> float:
|
||||||
|
"""Calculate current open bug count."""
|
||||||
|
try:
|
||||||
|
from dashboard.routes.daily_run import GiteaClient, _load_config
|
||||||
|
|
||||||
|
config = _load_config()
|
||||||
|
token = config.get("token")
|
||||||
|
client = GiteaClient(config, token)
|
||||||
|
|
||||||
|
if not client.is_available():
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
issues = client.get_paginated("issues", {"state": "open", "labels": "bug", "limit": 100})
|
||||||
|
|
||||||
|
return float(len(issues))
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("Failed to calculate open bug count: %s", exc)
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def _collect_stress_signals() -> list[StressSignal]:
|
||||||
|
"""Collect all stress signals from the system."""
|
||||||
|
config = _load_stress_config()
|
||||||
|
default_config = get_default_config()
|
||||||
|
signals_config = config.get("signals", default_config["signals"])
|
||||||
|
|
||||||
|
signals = []
|
||||||
|
|
||||||
|
# Define signal collectors
|
||||||
|
collectors = {
|
||||||
|
"flaky_test_rate": _calculate_flaky_test_rate,
|
||||||
|
"p1_backlog_growth": _calculate_p1_backlog_growth,
|
||||||
|
"ci_failure_rate": _calculate_ci_failure_rate,
|
||||||
|
"open_bug_count": _calculate_open_bug_count,
|
||||||
|
}
|
||||||
|
|
||||||
|
for signal_name, collector in collectors.items():
|
||||||
|
signal_cfg = signals_config.get(signal_name, {})
|
||||||
|
default_cfg = default_config["signals"].get(signal_name, {})
|
||||||
|
|
||||||
|
try:
|
||||||
|
value = collector()
|
||||||
|
threshold = signal_cfg.get("threshold", default_cfg.get("threshold", 1.0))
|
||||||
|
weight = signal_cfg.get("weight", default_cfg.get("weight", 0.25))
|
||||||
|
|
||||||
|
signals.append(
|
||||||
|
StressSignal(
|
||||||
|
name=signal_name,
|
||||||
|
value=value,
|
||||||
|
threshold=threshold,
|
||||||
|
weight=weight,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("Failed to collect signal %s: %s", signal_name, exc)
|
||||||
|
|
||||||
|
return signals
|
||||||
|
|
||||||
|
|
||||||
|
def _calculate_stress_score(signals: list[StressSignal]) -> float:
|
||||||
|
"""Calculate overall stress score from signals.
|
||||||
|
|
||||||
|
Score is weighted sum of triggered signal contributions,
|
||||||
|
normalized to 0-1 range.
|
||||||
|
"""
|
||||||
|
if not signals:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
total_weight = sum(s.weight for s in signals)
|
||||||
|
if total_weight == 0:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
triggered_contribution = sum(s.contribution for s in signals)
|
||||||
|
return min(1.0, triggered_contribution / total_weight)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_multipliers_for_mode(mode: StressMode) -> dict[str, float]:
|
||||||
|
"""Get token multipliers for a specific stress mode."""
|
||||||
|
config = _load_stress_config()
|
||||||
|
default_config = get_default_config()
|
||||||
|
|
||||||
|
multipliers = config.get("multipliers", default_config["multipliers"])
|
||||||
|
mode_multipliers = multipliers.get(mode.value, {})
|
||||||
|
default_mode_multipliers = default_config["multipliers"].get(mode.value, {})
|
||||||
|
|
||||||
|
# Merge with defaults
|
||||||
|
result = default_mode_multipliers.copy()
|
||||||
|
result.update(mode_multipliers)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def detect_stress_mode(
|
||||||
|
force_refresh: bool = False,
|
||||||
|
min_check_interval_seconds: int = 60,
|
||||||
|
) -> StressSnapshot:
|
||||||
|
"""Detect current system stress mode.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
force_refresh: Force a new check even if recently checked
|
||||||
|
min_check_interval_seconds: Minimum seconds between checks
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
StressSnapshot with mode, score, signals, and multipliers
|
||||||
|
"""
|
||||||
|
global _current_snapshot, _last_check_time
|
||||||
|
|
||||||
|
now = datetime.now(UTC)
|
||||||
|
|
||||||
|
# Return cached snapshot if recent and not forced
|
||||||
|
if not force_refresh and _current_snapshot is not None and _last_check_time is not None:
|
||||||
|
elapsed = (now - _last_check_time).total_seconds()
|
||||||
|
if elapsed < min_check_interval_seconds:
|
||||||
|
return _current_snapshot
|
||||||
|
|
||||||
|
# Collect signals and calculate stress
|
||||||
|
signals = _collect_stress_signals()
|
||||||
|
score = _calculate_stress_score(signals)
|
||||||
|
|
||||||
|
# Determine mode from score
|
||||||
|
config = _load_stress_config()
|
||||||
|
default_config = get_default_config()
|
||||||
|
thresholds_cfg = config.get("thresholds", default_config["thresholds"])
|
||||||
|
thresholds = StressThresholds(
|
||||||
|
elevated_min=thresholds_cfg.get("elevated_min", 0.3),
|
||||||
|
high_min=thresholds_cfg.get("high_min", 0.6),
|
||||||
|
)
|
||||||
|
mode = thresholds.get_mode_for_score(score)
|
||||||
|
|
||||||
|
# Get multipliers for this mode
|
||||||
|
multipliers = _get_multipliers_for_mode(mode)
|
||||||
|
|
||||||
|
# Create snapshot
|
||||||
|
snapshot = StressSnapshot(
|
||||||
|
mode=mode,
|
||||||
|
score=score,
|
||||||
|
signals=signals,
|
||||||
|
multipliers=multipliers,
|
||||||
|
timestamp=now.isoformat(),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Cache result
|
||||||
|
_current_snapshot = snapshot
|
||||||
|
_last_check_time = now
|
||||||
|
|
||||||
|
# Log mode changes
|
||||||
|
if _current_snapshot is not None and _current_snapshot.mode != mode:
|
||||||
|
logger.info(
|
||||||
|
"Stress mode changed: %s -> %s (score: %.2f)",
|
||||||
|
_current_snapshot.mode.value if _current_snapshot else "none",
|
||||||
|
mode.value,
|
||||||
|
score,
|
||||||
|
)
|
||||||
|
|
||||||
|
return snapshot
|
||||||
|
|
||||||
|
|
||||||
|
def get_current_stress_mode() -> StressMode:
|
||||||
|
"""Get current stress mode (uses cached or fresh detection)."""
|
||||||
|
snapshot = detect_stress_mode()
|
||||||
|
return snapshot.mode
|
||||||
|
|
||||||
|
|
||||||
|
def get_multiplier(quest_type: str, mode: StressMode | None = None) -> float:
|
||||||
|
"""Get token multiplier for a quest type.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
quest_type: Type of quest (test_improve, issue_count, etc.)
|
||||||
|
mode: Specific mode to get multiplier for, or None for current
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Multiplier value (1.0 = normal, 1.5 = 50% bonus, etc.)
|
||||||
|
"""
|
||||||
|
if mode is None:
|
||||||
|
mode = get_current_stress_mode()
|
||||||
|
|
||||||
|
multipliers = _get_multipliers_for_mode(mode)
|
||||||
|
return multipliers.get(quest_type, 1.0)
|
||||||
|
|
||||||
|
|
||||||
|
def apply_multiplier(base_reward: int, quest_type: str) -> int:
|
||||||
|
"""Apply stress-based multiplier to a base reward.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
base_reward: Base token reward amount
|
||||||
|
quest_type: Type of quest for multiplier lookup
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Adjusted reward amount (always >= 1)
|
||||||
|
"""
|
||||||
|
multiplier = get_multiplier(quest_type)
|
||||||
|
adjusted = int(base_reward * multiplier)
|
||||||
|
return max(1, adjusted)
|
||||||
|
|
||||||
|
|
||||||
|
def get_stress_summary() -> dict[str, Any]:
|
||||||
|
"""Get a human-readable summary of current stress state."""
|
||||||
|
snapshot = detect_stress_mode()
|
||||||
|
|
||||||
|
# Generate explanation
|
||||||
|
explanations = {
|
||||||
|
StressMode.CALM: "System is calm. Good time for exploration and refactoring.",
|
||||||
|
StressMode.ELEVATED: "Elevated stress detected. Focus on stability and tests.",
|
||||||
|
StressMode.HIGH: "HIGH STRESS MODE. Prioritize bug fixes and test hardening.",
|
||||||
|
}
|
||||||
|
|
||||||
|
triggered_signals = [s for s in snapshot.signals if s.is_triggered]
|
||||||
|
|
||||||
|
return {
|
||||||
|
"mode": snapshot.mode.value,
|
||||||
|
"score": round(snapshot.score, 3),
|
||||||
|
"explanation": explanations.get(snapshot.mode, "Unknown mode"),
|
||||||
|
"active_signals": [
|
||||||
|
{
|
||||||
|
"name": s.name,
|
||||||
|
"value": round(s.value, 3),
|
||||||
|
"threshold": s.threshold,
|
||||||
|
}
|
||||||
|
for s in triggered_signals
|
||||||
|
],
|
||||||
|
"current_multipliers": snapshot.multipliers,
|
||||||
|
"last_updated": snapshot.timestamp,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def reset_stress_state() -> None:
|
||||||
|
"""Reset stress state cache (useful for testing)."""
|
||||||
|
global _current_snapshot, _last_check_time, _config_cache, _config_mtime
|
||||||
|
_current_snapshot = None
|
||||||
|
_last_check_time = None
|
||||||
|
_config_cache = None
|
||||||
|
_config_mtime = 0.0
|
||||||
294
tests/unit/test_stress_detector.py
Normal file
294
tests/unit/test_stress_detector.py
Normal file
@@ -0,0 +1,294 @@
|
|||||||
|
"""Unit tests for the stress detector module.
|
||||||
|
|
||||||
|
Tests stress signal calculation, mode detection, multipliers,
|
||||||
|
and integration with the quest system.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from timmy.stress_detector import (
|
||||||
|
StressMode,
|
||||||
|
StressSignal,
|
||||||
|
StressSnapshot,
|
||||||
|
StressThresholds,
|
||||||
|
_calculate_stress_score,
|
||||||
|
_get_multipliers_for_mode,
|
||||||
|
apply_multiplier,
|
||||||
|
get_default_config,
|
||||||
|
reset_stress_state,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def clean_stress_state():
|
||||||
|
"""Reset stress state between tests."""
|
||||||
|
reset_stress_state()
|
||||||
|
yield
|
||||||
|
reset_stress_state()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Stress Mode Tests ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestStressMode:
|
||||||
|
def test_stress_mode_values(self):
|
||||||
|
"""StressMode enum has expected values."""
|
||||||
|
assert StressMode.CALM.value == "calm"
|
||||||
|
assert StressMode.ELEVATED.value == "elevated"
|
||||||
|
assert StressMode.HIGH.value == "high"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Stress Signal Tests ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestStressSignal:
|
||||||
|
def test_signal_not_triggered(self):
|
||||||
|
"""Signal with value below threshold is not triggered."""
|
||||||
|
signal = StressSignal(
|
||||||
|
name="test_signal",
|
||||||
|
value=5.0,
|
||||||
|
threshold=10.0,
|
||||||
|
weight=0.5,
|
||||||
|
)
|
||||||
|
assert not signal.is_triggered
|
||||||
|
assert signal.contribution == 0.0
|
||||||
|
|
||||||
|
def test_signal_triggered(self):
|
||||||
|
"""Signal with value at threshold is triggered."""
|
||||||
|
signal = StressSignal(
|
||||||
|
name="test_signal",
|
||||||
|
value=10.0,
|
||||||
|
threshold=10.0,
|
||||||
|
weight=0.5,
|
||||||
|
)
|
||||||
|
assert signal.is_triggered
|
||||||
|
assert signal.contribution == 0.5 # weight * min(1, value/threshold)
|
||||||
|
|
||||||
|
def test_signal_contribution_capped(self):
|
||||||
|
"""Signal contribution is capped at weight when value >> threshold."""
|
||||||
|
signal = StressSignal(
|
||||||
|
name="test_signal",
|
||||||
|
value=100.0,
|
||||||
|
threshold=10.0,
|
||||||
|
weight=0.5,
|
||||||
|
)
|
||||||
|
assert signal.is_triggered
|
||||||
|
assert signal.contribution == 0.5 # Capped at weight
|
||||||
|
|
||||||
|
def test_signal_partial_contribution(self):
|
||||||
|
"""Signal contribution scales with value/threshold ratio."""
|
||||||
|
signal = StressSignal(
|
||||||
|
name="test_signal",
|
||||||
|
value=15.0,
|
||||||
|
threshold=10.0,
|
||||||
|
weight=0.5,
|
||||||
|
)
|
||||||
|
assert signal.is_triggered
|
||||||
|
# contribution = min(1, 15/10) * 0.5 = 0.5 (capped)
|
||||||
|
assert signal.contribution == 0.5
|
||||||
|
|
||||||
|
|
||||||
|
# ── Stress Thresholds Tests ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestStressThresholds:
|
||||||
|
def test_calm_mode(self):
|
||||||
|
"""Score below elevated_min returns CALM mode."""
|
||||||
|
thresholds = StressThresholds(elevated_min=0.3, high_min=0.6)
|
||||||
|
assert thresholds.get_mode_for_score(0.0) == StressMode.CALM
|
||||||
|
assert thresholds.get_mode_for_score(0.1) == StressMode.CALM
|
||||||
|
assert thresholds.get_mode_for_score(0.29) == StressMode.CALM
|
||||||
|
|
||||||
|
def test_elevated_mode(self):
|
||||||
|
"""Score between elevated_min and high_min returns ELEVATED mode."""
|
||||||
|
thresholds = StressThresholds(elevated_min=0.3, high_min=0.6)
|
||||||
|
assert thresholds.get_mode_for_score(0.3) == StressMode.ELEVATED
|
||||||
|
assert thresholds.get_mode_for_score(0.5) == StressMode.ELEVATED
|
||||||
|
assert thresholds.get_mode_for_score(0.59) == StressMode.ELEVATED
|
||||||
|
|
||||||
|
def test_high_mode(self):
|
||||||
|
"""Score at or above high_min returns HIGH mode."""
|
||||||
|
thresholds = StressThresholds(elevated_min=0.3, high_min=0.6)
|
||||||
|
assert thresholds.get_mode_for_score(0.6) == StressMode.HIGH
|
||||||
|
assert thresholds.get_mode_for_score(0.8) == StressMode.HIGH
|
||||||
|
assert thresholds.get_mode_for_score(1.0) == StressMode.HIGH
|
||||||
|
|
||||||
|
|
||||||
|
# ── Stress Score Calculation Tests ─────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestStressScoreCalculation:
|
||||||
|
def test_empty_signals(self):
|
||||||
|
"""Empty signal list returns zero stress score."""
|
||||||
|
score = _calculate_stress_score([])
|
||||||
|
assert score == 0.0
|
||||||
|
|
||||||
|
def test_no_triggered_signals(self):
|
||||||
|
"""No triggered signals means zero stress score."""
|
||||||
|
signals = [
|
||||||
|
StressSignal(name="s1", value=1.0, threshold=10.0, weight=0.5),
|
||||||
|
StressSignal(name="s2", value=2.0, threshold=10.0, weight=0.5),
|
||||||
|
]
|
||||||
|
score = _calculate_stress_score(signals)
|
||||||
|
assert score == 0.0
|
||||||
|
|
||||||
|
def test_single_triggered_signal(self):
|
||||||
|
"""Single triggered signal contributes its weight."""
|
||||||
|
signals = [
|
||||||
|
StressSignal(name="s1", value=10.0, threshold=10.0, weight=0.5),
|
||||||
|
]
|
||||||
|
score = _calculate_stress_score(signals)
|
||||||
|
# contribution = 0.5, total_weight = 0.5, score = 0.5/0.5 = 1.0
|
||||||
|
assert score == 1.0
|
||||||
|
|
||||||
|
def test_mixed_signals(self):
|
||||||
|
"""Mix of triggered and non-triggered signals."""
|
||||||
|
signals = [
|
||||||
|
StressSignal(name="s1", value=10.0, threshold=10.0, weight=0.3),
|
||||||
|
StressSignal(name="s2", value=1.0, threshold=10.0, weight=0.3),
|
||||||
|
StressSignal(name="s3", value=10.0, threshold=10.0, weight=0.4),
|
||||||
|
]
|
||||||
|
score = _calculate_stress_score(signals)
|
||||||
|
# triggered contributions: 0.3 + 0.4 = 0.7
|
||||||
|
# total_weight: 0.3 + 0.3 + 0.4 = 1.0
|
||||||
|
# score = 0.7 / 1.0 = 0.7
|
||||||
|
assert score == 0.7
|
||||||
|
|
||||||
|
def test_score_capped_at_one(self):
|
||||||
|
"""Stress score is capped at 1.0."""
|
||||||
|
signals = [
|
||||||
|
StressSignal(name="s1", value=100.0, threshold=10.0, weight=1.0),
|
||||||
|
StressSignal(name="s2", value=100.0, threshold=10.0, weight=1.0),
|
||||||
|
]
|
||||||
|
score = _calculate_stress_score(signals)
|
||||||
|
assert score == 1.0 # Capped
|
||||||
|
|
||||||
|
|
||||||
|
# ── Multiplier Tests ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestMultipliers:
|
||||||
|
def test_default_config_structure(self):
|
||||||
|
"""Default config has expected structure."""
|
||||||
|
config = get_default_config()
|
||||||
|
assert "thresholds" in config
|
||||||
|
assert "signals" in config
|
||||||
|
assert "multipliers" in config
|
||||||
|
|
||||||
|
def test_calm_mode_multipliers(self):
|
||||||
|
"""Calm mode has expected multipliers."""
|
||||||
|
multipliers = _get_multipliers_for_mode(StressMode.CALM)
|
||||||
|
assert multipliers["test_improve"] == 1.0
|
||||||
|
assert multipliers["docs_update"] == 1.2
|
||||||
|
assert multipliers["exploration"] == 1.3
|
||||||
|
assert multipliers["refactor"] == 1.2
|
||||||
|
|
||||||
|
def test_elevated_mode_multipliers(self):
|
||||||
|
"""Elevated mode has expected multipliers."""
|
||||||
|
multipliers = _get_multipliers_for_mode(StressMode.ELEVATED)
|
||||||
|
assert multipliers["test_improve"] == 1.2
|
||||||
|
assert multipliers["issue_reduce"] == 1.1
|
||||||
|
assert multipliers["refactor"] == 0.9
|
||||||
|
|
||||||
|
def test_high_mode_multipliers(self):
|
||||||
|
"""High stress mode has expected multipliers."""
|
||||||
|
multipliers = _get_multipliers_for_mode(StressMode.HIGH)
|
||||||
|
assert multipliers["test_improve"] == 1.5
|
||||||
|
assert multipliers["issue_reduce"] == 1.4
|
||||||
|
assert multipliers["exploration"] == 0.7
|
||||||
|
assert multipliers["refactor"] == 0.6
|
||||||
|
|
||||||
|
def test_multiplier_fallback_for_unknown_type(self):
|
||||||
|
"""Unknown quest types return default multiplier of 1.0."""
|
||||||
|
multipliers = _get_multipliers_for_mode(StressMode.CALM)
|
||||||
|
assert multipliers.get("unknown_type", 1.0) == 1.0
|
||||||
|
|
||||||
|
|
||||||
|
# ── Apply Multiplier Tests ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestApplyMultiplier:
|
||||||
|
def test_apply_multiplier_calm(self):
|
||||||
|
"""Multiplier applies correctly in calm mode."""
|
||||||
|
# This test uses get_multiplier which reads from current stress mode
|
||||||
|
# Since we can't easily mock the stress mode, we test the apply_multiplier logic
|
||||||
|
base = 100
|
||||||
|
# In calm mode with test_improve = 1.0
|
||||||
|
result = apply_multiplier(base, "unknown_type")
|
||||||
|
assert result >= 1 # At least 1 token
|
||||||
|
|
||||||
|
def test_apply_multiplier_minimum_one(self):
|
||||||
|
"""Applied reward is at least 1 token."""
|
||||||
|
# Even with very low multiplier, result should be >= 1
|
||||||
|
result = apply_multiplier(1, "any_type")
|
||||||
|
assert result >= 1
|
||||||
|
|
||||||
|
|
||||||
|
# ── Stress Snapshot Tests ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestStressSnapshot:
|
||||||
|
def test_snapshot_to_dict(self):
|
||||||
|
"""Snapshot can be converted to dictionary."""
|
||||||
|
signals = [
|
||||||
|
StressSignal(name="test", value=10.0, threshold=5.0, weight=0.5),
|
||||||
|
]
|
||||||
|
snapshot = StressSnapshot(
|
||||||
|
mode=StressMode.ELEVATED,
|
||||||
|
score=0.5,
|
||||||
|
signals=signals,
|
||||||
|
multipliers={"test_improve": 1.2},
|
||||||
|
)
|
||||||
|
|
||||||
|
data = snapshot.to_dict()
|
||||||
|
assert data["mode"] == "elevated"
|
||||||
|
assert data["score"] == 0.5
|
||||||
|
assert len(data["signals"]) == 1
|
||||||
|
assert data["multipliers"]["test_improve"] == 1.2
|
||||||
|
|
||||||
|
|
||||||
|
# ── Integration Tests ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestStressDetectorIntegration:
|
||||||
|
def test_reset_stress_state(self):
|
||||||
|
"""Reset clears internal state."""
|
||||||
|
# Just verify reset doesn't error
|
||||||
|
reset_stress_state()
|
||||||
|
|
||||||
|
def test_default_config_contains_all_signals(self):
|
||||||
|
"""Default config defines all expected signals."""
|
||||||
|
config = get_default_config()
|
||||||
|
signals = config["signals"]
|
||||||
|
|
||||||
|
expected_signals = [
|
||||||
|
"flaky_test_rate",
|
||||||
|
"p1_backlog_growth",
|
||||||
|
"ci_failure_rate",
|
||||||
|
"open_bug_count",
|
||||||
|
]
|
||||||
|
|
||||||
|
for signal in expected_signals:
|
||||||
|
assert signal in signals
|
||||||
|
assert "threshold" in signals[signal]
|
||||||
|
assert "weight" in signals[signal]
|
||||||
|
|
||||||
|
def test_default_config_contains_all_modes(self):
|
||||||
|
"""Default config defines all stress modes."""
|
||||||
|
config = get_default_config()
|
||||||
|
multipliers = config["multipliers"]
|
||||||
|
|
||||||
|
assert "calm" in multipliers
|
||||||
|
assert "elevated" in multipliers
|
||||||
|
assert "high" in multipliers
|
||||||
|
|
||||||
|
def test_multiplier_weights_sum_approximately_one(self):
|
||||||
|
"""Signal weights should approximately sum to 1.0."""
|
||||||
|
config = get_default_config()
|
||||||
|
signals = config["signals"]
|
||||||
|
|
||||||
|
total_weight = sum(s["weight"] for s in signals.values())
|
||||||
|
# Allow some flexibility but should be close to 1.0
|
||||||
|
assert 0.9 <= total_weight <= 1.1
|
||||||
Reference in New Issue
Block a user