Compare commits
1 Commits
feature/is
...
kimi/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
919a011cae |
98
config/stress_modes.yaml
Normal file
98
config/stress_modes.yaml
Normal file
@@ -0,0 +1,98 @@
|
||||
# ── System Stress Modes Configuration ────────────────────────────────────────
|
||||
#
|
||||
# This configuration defines how token rewards adapt based on system stress.
|
||||
# When the system detects elevated stress (flaky tests, growing backlog,
|
||||
# CI failures), quest rewards are adjusted to incentivize agents to focus
|
||||
# on the most critical areas.
|
||||
#
|
||||
# ── How It Works ─────────────────────────────────────────────────────────────
|
||||
#
|
||||
# 1. SIGNALS: System metrics are monitored continuously
|
||||
# 2. SCORE: Weighted contributions from triggered signals create a stress score
|
||||
# 3. MODE: Score determines the stress mode (calm, elevated, high)
|
||||
# 4. MULTIPLIERS: Token rewards are multiplied based on the current mode
|
||||
#
|
||||
# ── Stress Thresholds ────────────────────────────────────────────────────────
|
||||
|
||||
thresholds:
|
||||
# Minimum score to enter elevated mode (0.0 - 1.0)
|
||||
elevated_min: 0.3
|
||||
|
||||
# Minimum score to enter high stress mode (0.0 - 1.0)
|
||||
high_min: 0.6
|
||||
|
||||
# ── Stress Signals ───────────────────────────────────────────────────────────
|
||||
#
|
||||
# Each signal has:
|
||||
# - threshold: Value at which signal is considered "triggered"
|
||||
# - weight: Contribution to overall stress score (should sum to ~1.0)
|
||||
|
||||
signals:
|
||||
flaky_test_rate:
|
||||
threshold: 0.15 # 15% of tests showing flakiness
|
||||
weight: 0.30
|
||||
description: "Percentage of test runs that are flaky"
|
||||
|
||||
p1_backlog_growth:
|
||||
threshold: 5 # 5 new P1 issues in lookback period
|
||||
weight: 0.25
|
||||
description: "Net growth in P1 priority issues over 7 days"
|
||||
|
||||
ci_failure_rate:
|
||||
threshold: 0.20 # 20% of CI runs failing
|
||||
weight: 0.25
|
||||
description: "Percentage of CI runs failing in lookback period"
|
||||
|
||||
open_bug_count:
|
||||
threshold: 20 # 20 open bugs
|
||||
weight: 0.20
|
||||
description: "Total open issues labeled as 'bug'"
|
||||
|
||||
# ── Token Multipliers ────────────────────────────────────────────────────────
|
||||
#
|
||||
# Multipliers are applied to quest rewards based on current stress mode.
|
||||
# Values > 1.0 increase rewards, < 1.0 decrease rewards.
|
||||
#
|
||||
# Quest types:
|
||||
# - test_improve: Test coverage/quality improvements
|
||||
# - docs_update: Documentation updates
|
||||
# - issue_count: Closing specific issue types
|
||||
# - issue_reduce: Reducing overall issue backlog
|
||||
# - daily_run: Daily Run session completion
|
||||
# - custom: Special/manual quests
|
||||
# - exploration: Exploratory work
|
||||
# - refactor: Code refactoring
|
||||
|
||||
multipliers:
|
||||
calm:
|
||||
# Calm periods: incentivize maintenance and exploration
|
||||
test_improve: 1.0
|
||||
docs_update: 1.2
|
||||
issue_count: 1.0
|
||||
issue_reduce: 1.0
|
||||
daily_run: 1.0
|
||||
custom: 1.0
|
||||
exploration: 1.3
|
||||
refactor: 1.2
|
||||
|
||||
elevated:
|
||||
# Elevated stress: start emphasizing stability
|
||||
test_improve: 1.2
|
||||
docs_update: 1.0
|
||||
issue_count: 1.1
|
||||
issue_reduce: 1.1
|
||||
daily_run: 1.0
|
||||
custom: 1.0
|
||||
exploration: 1.0
|
||||
refactor: 0.9 # Discourage risky changes
|
||||
|
||||
high:
|
||||
# High stress: crisis mode, focus on stabilization
|
||||
test_improve: 1.5 # Strongly incentivize testing
|
||||
docs_update: 0.8 # Deprioritize docs
|
||||
issue_count: 1.3 # Reward closing issues
|
||||
issue_reduce: 1.4 # Strongly reward reducing backlog
|
||||
daily_run: 1.1
|
||||
custom: 1.0
|
||||
exploration: 0.7 # Discourage exploration
|
||||
refactor: 0.6 # Discourage refactors during crisis
|
||||
@@ -187,6 +187,76 @@ async def reload_quest_config_api() -> JSONResponse:
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Stress Mode Endpoints
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@router.get("/api/stress")
|
||||
async def get_stress_status_api() -> JSONResponse:
|
||||
"""Get current stress mode status and multipliers.
|
||||
|
||||
Returns:
|
||||
Current stress mode, score, active signals, and multipliers
|
||||
"""
|
||||
try:
|
||||
from timmy.stress_detector import (
|
||||
detect_stress_mode,
|
||||
get_stress_summary,
|
||||
)
|
||||
|
||||
snapshot = detect_stress_mode()
|
||||
summary = get_stress_summary()
|
||||
|
||||
return JSONResponse(
|
||||
{
|
||||
"status": "ok",
|
||||
"stress": summary,
|
||||
"raw": snapshot.to_dict(),
|
||||
}
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to get stress status: %s", exc)
|
||||
return JSONResponse(
|
||||
{
|
||||
"status": "error",
|
||||
"error": str(exc),
|
||||
},
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
|
||||
@router.post("/api/stress/refresh")
|
||||
async def refresh_stress_detection_api() -> JSONResponse:
|
||||
"""Force a fresh stress detection check.
|
||||
|
||||
Normally stress is cached for 60 seconds. This endpoint
|
||||
bypasses the cache for immediate results.
|
||||
"""
|
||||
try:
|
||||
from timmy.stress_detector import detect_stress_mode, get_stress_summary
|
||||
|
||||
snapshot = detect_stress_mode(force_refresh=True)
|
||||
summary = get_stress_summary()
|
||||
|
||||
return JSONResponse(
|
||||
{
|
||||
"status": "ok",
|
||||
"stress": summary,
|
||||
"raw": snapshot.to_dict(),
|
||||
}
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to refresh stress detection: %s", exc)
|
||||
return JSONResponse(
|
||||
{
|
||||
"status": "error",
|
||||
"error": str(exc),
|
||||
},
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dashboard UI Endpoints
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -269,6 +269,22 @@ def _is_on_cooldown(progress: QuestProgress, quest: QuestDefinition) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _apply_stress_multiplier(base_reward: int, quest_type: QuestType) -> tuple[int, float]:
|
||||
"""Apply stress-based multiplier to quest reward.
|
||||
|
||||
Returns:
|
||||
Tuple of (adjusted_reward, multiplier_used)
|
||||
"""
|
||||
try:
|
||||
from timmy.stress_detector import apply_multiplier
|
||||
|
||||
multiplier = apply_multiplier(base_reward, quest_type.value)
|
||||
return multiplier, multiplier / max(base_reward, 1)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to apply stress multiplier: %s", exc)
|
||||
return base_reward, 1.0
|
||||
|
||||
|
||||
def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
|
||||
"""Claim the token reward for a completed quest.
|
||||
|
||||
@@ -292,13 +308,18 @@ def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
|
||||
return None
|
||||
|
||||
try:
|
||||
# Apply stress-based multiplier
|
||||
adjusted_reward, multiplier = _apply_stress_multiplier(
|
||||
quest.reward_tokens, quest.quest_type
|
||||
)
|
||||
|
||||
# Award tokens via ledger
|
||||
from lightning.ledger import create_invoice_entry, mark_settled
|
||||
|
||||
# Create a mock invoice for the reward
|
||||
invoice_entry = create_invoice_entry(
|
||||
payment_hash=f"quest_{quest_id}_{agent_id}_{int(time.time())}",
|
||||
amount_sats=quest.reward_tokens,
|
||||
amount_sats=adjusted_reward,
|
||||
memo=f"Quest reward: {quest.name}",
|
||||
source="quest_reward",
|
||||
agent_id=agent_id,
|
||||
@@ -320,12 +341,21 @@ def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
|
||||
progress.completed_at = ""
|
||||
progress.claimed_at = ""
|
||||
|
||||
notification = quest.notification_message.format(tokens=quest.reward_tokens)
|
||||
# Build notification with multiplier info
|
||||
notification = quest.notification_message.format(tokens=adjusted_reward)
|
||||
if multiplier != 1.0:
|
||||
pct = int((multiplier - 1.0) * 100)
|
||||
if pct > 0:
|
||||
notification += f" (+{pct}% stress bonus)"
|
||||
else:
|
||||
notification += f" ({pct}% stress adjustment)"
|
||||
|
||||
return {
|
||||
"quest_id": quest_id,
|
||||
"agent_id": agent_id,
|
||||
"tokens_awarded": quest.reward_tokens,
|
||||
"tokens_awarded": adjusted_reward,
|
||||
"base_reward": quest.reward_tokens,
|
||||
"multiplier": round(multiplier, 2),
|
||||
"notification": notification,
|
||||
"completion_count": progress.completion_count,
|
||||
}
|
||||
@@ -467,6 +497,14 @@ def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
|
||||
total_rewards = 0
|
||||
completed_count = 0
|
||||
|
||||
# Get current stress mode for adjusted rewards display
|
||||
try:
|
||||
from timmy.stress_detector import get_current_stress_mode, get_multiplier
|
||||
|
||||
current_mode = get_current_stress_mode()
|
||||
except Exception:
|
||||
current_mode = None
|
||||
|
||||
for quest_id, quest in definitions.items():
|
||||
progress = get_quest_progress(quest_id, agent_id)
|
||||
if not progress:
|
||||
@@ -474,11 +512,23 @@ def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
|
||||
|
||||
is_on_cooldown = _is_on_cooldown(progress, quest) if quest.repeatable else False
|
||||
|
||||
# Calculate adjusted reward with stress multiplier
|
||||
adjusted_reward = quest.reward_tokens
|
||||
multiplier = 1.0
|
||||
if current_mode:
|
||||
try:
|
||||
multiplier = get_multiplier(quest.quest_type.value, current_mode)
|
||||
adjusted_reward = int(quest.reward_tokens * multiplier)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
quest_info = {
|
||||
"quest_id": quest_id,
|
||||
"name": quest.name,
|
||||
"description": quest.description,
|
||||
"reward_tokens": quest.reward_tokens,
|
||||
"adjusted_reward": adjusted_reward,
|
||||
"multiplier": round(multiplier, 2),
|
||||
"type": quest.quest_type.value,
|
||||
"enabled": quest.enabled,
|
||||
"repeatable": quest.repeatable,
|
||||
@@ -509,6 +559,7 @@ def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
|
||||
"total_tokens_earned": total_rewards,
|
||||
"total_quests_completed": completed_count,
|
||||
"active_quests_count": len([q for q in quests_status if q["enabled"]]),
|
||||
"stress_mode": current_mode.value if current_mode else None,
|
||||
}
|
||||
|
||||
|
||||
|
||||
565
src/timmy/stress_detector.py
Normal file
565
src/timmy/stress_detector.py
Normal file
@@ -0,0 +1,565 @@
|
||||
"""System stress detection for adaptive token rewards.
|
||||
|
||||
Monitors system signals like flakiness, backlog growth, and CI failures
|
||||
to determine the current stress mode. Token rewards are then adjusted
|
||||
based on the stress mode to incentivize agents to focus on critical areas.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from enum import StrEnum
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Path to stress mode configuration
|
||||
STRESS_CONFIG_PATH = Path(settings.repo_root) / "config" / "stress_modes.yaml"
|
||||
|
||||
|
||||
class StressMode(StrEnum):
|
||||
"""System stress modes.
|
||||
|
||||
- CALM: Normal operations, incentivize exploration and refactoring
|
||||
- ELEVATED: Some stress signals detected, balance incentives
|
||||
- HIGH: Critical stress, strongly incentivize bug fixes and stabilization
|
||||
"""
|
||||
|
||||
CALM = "calm"
|
||||
ELEVATED = "elevated"
|
||||
HIGH = "high"
|
||||
|
||||
|
||||
@dataclass
|
||||
class StressSignal:
|
||||
"""A single stress signal reading."""
|
||||
|
||||
name: str
|
||||
value: float
|
||||
threshold: float
|
||||
weight: float
|
||||
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
||||
|
||||
@property
|
||||
def is_triggered(self) -> bool:
|
||||
"""Whether this signal exceeds its threshold."""
|
||||
return self.value >= self.threshold
|
||||
|
||||
@property
|
||||
def contribution(self) -> float:
|
||||
"""Calculate this signal's contribution to stress score."""
|
||||
if not self.is_triggered:
|
||||
return 0.0
|
||||
# Contribution is weighted ratio of value to threshold
|
||||
return min(1.0, (self.value / max(self.threshold, 1.0))) * self.weight
|
||||
|
||||
|
||||
@dataclass
|
||||
class StressSnapshot:
|
||||
"""Complete stress assessment at a point in time."""
|
||||
|
||||
mode: StressMode
|
||||
score: float
|
||||
signals: list[StressSignal]
|
||||
multipliers: dict[str, float]
|
||||
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Convert to dictionary for serialization."""
|
||||
return {
|
||||
"mode": self.mode.value,
|
||||
"score": round(self.score, 3),
|
||||
"signals": [
|
||||
{
|
||||
"name": s.name,
|
||||
"value": s.value,
|
||||
"threshold": s.threshold,
|
||||
"triggered": s.is_triggered,
|
||||
"contribution": round(s.contribution, 3),
|
||||
}
|
||||
for s in self.signals
|
||||
],
|
||||
"multipliers": self.multipliers,
|
||||
"timestamp": self.timestamp,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class StressThresholds:
|
||||
"""Thresholds for entering/exiting stress modes."""
|
||||
|
||||
elevated_min: float = 0.3
|
||||
high_min: float = 0.6
|
||||
|
||||
def get_mode_for_score(self, score: float) -> StressMode:
|
||||
"""Determine stress mode based on score."""
|
||||
if score >= self.high_min:
|
||||
return StressMode.HIGH
|
||||
elif score >= self.elevated_min:
|
||||
return StressMode.ELEVATED
|
||||
return StressMode.CALM
|
||||
|
||||
|
||||
# In-memory storage for stress state
|
||||
_current_snapshot: StressSnapshot | None = None
|
||||
_last_check_time: datetime | None = None
|
||||
_config_cache: dict[str, Any] | None = None
|
||||
_config_mtime: float = 0.0
|
||||
|
||||
|
||||
def _load_stress_config() -> dict[str, Any]:
|
||||
"""Load stress mode configuration from YAML.
|
||||
|
||||
Returns:
|
||||
Configuration dictionary with default fallbacks
|
||||
"""
|
||||
global _config_cache, _config_mtime
|
||||
|
||||
# Check if config file has been modified
|
||||
if STRESS_CONFIG_PATH.exists():
|
||||
mtime = STRESS_CONFIG_PATH.stat().st_mtime
|
||||
if mtime != _config_mtime or _config_cache is None:
|
||||
try:
|
||||
raw = STRESS_CONFIG_PATH.read_text()
|
||||
_config_cache = yaml.safe_load(raw) or {}
|
||||
_config_mtime = mtime
|
||||
logger.debug("Loaded stress config from %s", STRESS_CONFIG_PATH)
|
||||
except (OSError, yaml.YAMLError) as exc:
|
||||
logger.warning("Failed to load stress config: %s", exc)
|
||||
_config_cache = {}
|
||||
|
||||
if _config_cache is None:
|
||||
_config_cache = {}
|
||||
|
||||
return _config_cache
|
||||
|
||||
|
||||
def get_default_config() -> dict[str, Any]:
|
||||
"""Get default stress configuration."""
|
||||
return {
|
||||
"thresholds": {
|
||||
"elevated_min": 0.3,
|
||||
"high_min": 0.6,
|
||||
},
|
||||
"signals": {
|
||||
"flaky_test_rate": {
|
||||
"threshold": 0.15, # 15% flaky test rate
|
||||
"weight": 0.3,
|
||||
"description": "Percentage of tests that are flaky",
|
||||
},
|
||||
"p1_backlog_growth": {
|
||||
"threshold": 5, # 5 new P1 issues
|
||||
"weight": 0.25,
|
||||
"description": "Net growth in P1 priority issues",
|
||||
},
|
||||
"ci_failure_rate": {
|
||||
"threshold": 0.2, # 20% CI failure rate
|
||||
"weight": 0.25,
|
||||
"description": "Percentage of CI runs failing",
|
||||
},
|
||||
"open_bug_count": {
|
||||
"threshold": 20, # 20 open bugs
|
||||
"weight": 0.2,
|
||||
"description": "Total open issues labeled as bugs",
|
||||
},
|
||||
},
|
||||
"multipliers": {
|
||||
StressMode.CALM.value: {
|
||||
"test_improve": 1.0,
|
||||
"docs_update": 1.2, # Calm periods good for docs
|
||||
"issue_count": 1.0,
|
||||
"issue_reduce": 1.0,
|
||||
"daily_run": 1.0,
|
||||
"custom": 1.0,
|
||||
"exploration": 1.3, # Encourage exploration
|
||||
"refactor": 1.2, # Encourage refactoring
|
||||
},
|
||||
StressMode.ELEVATED.value: {
|
||||
"test_improve": 1.2, # Start emphasizing tests
|
||||
"docs_update": 1.0,
|
||||
"issue_count": 1.1,
|
||||
"issue_reduce": 1.1,
|
||||
"daily_run": 1.0,
|
||||
"custom": 1.0,
|
||||
"exploration": 1.0,
|
||||
"refactor": 0.9, # Discourage risky refactors
|
||||
},
|
||||
StressMode.HIGH.value: {
|
||||
"test_improve": 1.5, # Strongly incentivize testing
|
||||
"docs_update": 0.8, # Deprioritize docs
|
||||
"issue_count": 1.3, # Reward closing issues
|
||||
"issue_reduce": 1.4, # Strongly reward reducing backlog
|
||||
"daily_run": 1.1,
|
||||
"custom": 1.0,
|
||||
"exploration": 0.7, # Discourage exploration
|
||||
"refactor": 0.6, # Discourage refactors during crisis
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _get_config_value(key_path: str, default: Any = None) -> Any:
|
||||
"""Get a value from config using dot notation path."""
|
||||
config = _load_stress_config()
|
||||
keys = key_path.split(".")
|
||||
value = config
|
||||
for key in keys:
|
||||
if isinstance(value, dict):
|
||||
value = value.get(key)
|
||||
else:
|
||||
return default
|
||||
return value if value is not None else default
|
||||
|
||||
|
||||
def _calculate_flaky_test_rate() -> float:
|
||||
"""Calculate current flaky test rate from available data."""
|
||||
try:
|
||||
# Try to load from daily run metrics or test results
|
||||
test_results_path = Path(settings.repo_root) / ".loop" / "test_results.jsonl"
|
||||
if not test_results_path.exists():
|
||||
return 0.0
|
||||
|
||||
# Count recent test runs and flaky results
|
||||
now = datetime.now(UTC)
|
||||
cutoff = now - timedelta(days=7)
|
||||
|
||||
total_runs = 0
|
||||
flaky_runs = 0
|
||||
|
||||
if test_results_path.exists():
|
||||
for line in test_results_path.read_text().strip().splitlines():
|
||||
try:
|
||||
entry = json.loads(line)
|
||||
ts_str = entry.get("timestamp", "")
|
||||
if not ts_str:
|
||||
continue
|
||||
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
||||
if ts >= cutoff:
|
||||
total_runs += 1
|
||||
if entry.get("is_flaky", False):
|
||||
flaky_runs += 1
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
|
||||
return flaky_runs / max(total_runs, 1)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to calculate flaky test rate: %s", exc)
|
||||
return 0.0
|
||||
|
||||
|
||||
def _calculate_p1_backlog_growth() -> float:
|
||||
"""Calculate P1 issue backlog growth."""
|
||||
try:
|
||||
from dashboard.routes.daily_run import GiteaClient, _load_config
|
||||
|
||||
config = _load_config()
|
||||
token = config.get("token")
|
||||
client = GiteaClient(config, token)
|
||||
|
||||
if not client.is_available():
|
||||
return 0.0
|
||||
|
||||
# Get current P1 issues
|
||||
now = datetime.now(UTC)
|
||||
cutoff_current = now - timedelta(days=7)
|
||||
cutoff_previous = now - timedelta(days=14)
|
||||
|
||||
issues = client.get_paginated("issues", {"state": "all", "labels": "P1", "limit": 100})
|
||||
|
||||
current_count = 0
|
||||
previous_count = 0
|
||||
|
||||
for issue in issues:
|
||||
created_at = issue.get("created_at", "")
|
||||
if not created_at:
|
||||
continue
|
||||
try:
|
||||
created = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
|
||||
if created >= cutoff_current:
|
||||
current_count += 1
|
||||
elif created >= cutoff_previous:
|
||||
previous_count += 1
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
|
||||
# Return net growth (positive means growing backlog)
|
||||
return max(0, current_count - previous_count)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to calculate P1 backlog growth: %s", exc)
|
||||
return 0.0
|
||||
|
||||
|
||||
def _calculate_ci_failure_rate() -> float:
|
||||
"""Calculate CI failure rate from recent runs."""
|
||||
try:
|
||||
# Try to get CI metrics from Gitea or local files
|
||||
ci_results_path = Path(settings.repo_root) / ".loop" / "ci_results.jsonl"
|
||||
if not ci_results_path.exists():
|
||||
return 0.0
|
||||
|
||||
now = datetime.now(UTC)
|
||||
cutoff = now - timedelta(days=7)
|
||||
|
||||
total_runs = 0
|
||||
failed_runs = 0
|
||||
|
||||
for line in ci_results_path.read_text().strip().splitlines():
|
||||
try:
|
||||
entry = json.loads(line)
|
||||
ts_str = entry.get("timestamp", "")
|
||||
if not ts_str:
|
||||
continue
|
||||
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
||||
if ts >= cutoff:
|
||||
total_runs += 1
|
||||
if entry.get("status") != "success":
|
||||
failed_runs += 1
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
|
||||
return failed_runs / max(total_runs, 1)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to calculate CI failure rate: %s", exc)
|
||||
return 0.0
|
||||
|
||||
|
||||
def _calculate_open_bug_count() -> float:
|
||||
"""Calculate current open bug count."""
|
||||
try:
|
||||
from dashboard.routes.daily_run import GiteaClient, _load_config
|
||||
|
||||
config = _load_config()
|
||||
token = config.get("token")
|
||||
client = GiteaClient(config, token)
|
||||
|
||||
if not client.is_available():
|
||||
return 0.0
|
||||
|
||||
issues = client.get_paginated("issues", {"state": "open", "labels": "bug", "limit": 100})
|
||||
|
||||
return float(len(issues))
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to calculate open bug count: %s", exc)
|
||||
return 0.0
|
||||
|
||||
|
||||
def _collect_stress_signals() -> list[StressSignal]:
|
||||
"""Collect all stress signals from the system."""
|
||||
config = _load_stress_config()
|
||||
default_config = get_default_config()
|
||||
signals_config = config.get("signals", default_config["signals"])
|
||||
|
||||
signals = []
|
||||
|
||||
# Define signal collectors
|
||||
collectors = {
|
||||
"flaky_test_rate": _calculate_flaky_test_rate,
|
||||
"p1_backlog_growth": _calculate_p1_backlog_growth,
|
||||
"ci_failure_rate": _calculate_ci_failure_rate,
|
||||
"open_bug_count": _calculate_open_bug_count,
|
||||
}
|
||||
|
||||
for signal_name, collector in collectors.items():
|
||||
signal_cfg = signals_config.get(signal_name, {})
|
||||
default_cfg = default_config["signals"].get(signal_name, {})
|
||||
|
||||
try:
|
||||
value = collector()
|
||||
threshold = signal_cfg.get("threshold", default_cfg.get("threshold", 1.0))
|
||||
weight = signal_cfg.get("weight", default_cfg.get("weight", 0.25))
|
||||
|
||||
signals.append(
|
||||
StressSignal(
|
||||
name=signal_name,
|
||||
value=value,
|
||||
threshold=threshold,
|
||||
weight=weight,
|
||||
)
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to collect signal %s: %s", signal_name, exc)
|
||||
|
||||
return signals
|
||||
|
||||
|
||||
def _calculate_stress_score(signals: list[StressSignal]) -> float:
|
||||
"""Calculate overall stress score from signals.
|
||||
|
||||
Score is weighted sum of triggered signal contributions,
|
||||
normalized to 0-1 range.
|
||||
"""
|
||||
if not signals:
|
||||
return 0.0
|
||||
|
||||
total_weight = sum(s.weight for s in signals)
|
||||
if total_weight == 0:
|
||||
return 0.0
|
||||
|
||||
triggered_contribution = sum(s.contribution for s in signals)
|
||||
return min(1.0, triggered_contribution / total_weight)
|
||||
|
||||
|
||||
def _get_multipliers_for_mode(mode: StressMode) -> dict[str, float]:
|
||||
"""Get token multipliers for a specific stress mode."""
|
||||
config = _load_stress_config()
|
||||
default_config = get_default_config()
|
||||
|
||||
multipliers = config.get("multipliers", default_config["multipliers"])
|
||||
mode_multipliers = multipliers.get(mode.value, {})
|
||||
default_mode_multipliers = default_config["multipliers"].get(mode.value, {})
|
||||
|
||||
# Merge with defaults
|
||||
result = default_mode_multipliers.copy()
|
||||
result.update(mode_multipliers)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def detect_stress_mode(
|
||||
force_refresh: bool = False,
|
||||
min_check_interval_seconds: int = 60,
|
||||
) -> StressSnapshot:
|
||||
"""Detect current system stress mode.
|
||||
|
||||
Args:
|
||||
force_refresh: Force a new check even if recently checked
|
||||
min_check_interval_seconds: Minimum seconds between checks
|
||||
|
||||
Returns:
|
||||
StressSnapshot with mode, score, signals, and multipliers
|
||||
"""
|
||||
global _current_snapshot, _last_check_time
|
||||
|
||||
now = datetime.now(UTC)
|
||||
|
||||
# Return cached snapshot if recent and not forced
|
||||
if not force_refresh and _current_snapshot is not None and _last_check_time is not None:
|
||||
elapsed = (now - _last_check_time).total_seconds()
|
||||
if elapsed < min_check_interval_seconds:
|
||||
return _current_snapshot
|
||||
|
||||
# Collect signals and calculate stress
|
||||
signals = _collect_stress_signals()
|
||||
score = _calculate_stress_score(signals)
|
||||
|
||||
# Determine mode from score
|
||||
config = _load_stress_config()
|
||||
default_config = get_default_config()
|
||||
thresholds_cfg = config.get("thresholds", default_config["thresholds"])
|
||||
thresholds = StressThresholds(
|
||||
elevated_min=thresholds_cfg.get("elevated_min", 0.3),
|
||||
high_min=thresholds_cfg.get("high_min", 0.6),
|
||||
)
|
||||
mode = thresholds.get_mode_for_score(score)
|
||||
|
||||
# Get multipliers for this mode
|
||||
multipliers = _get_multipliers_for_mode(mode)
|
||||
|
||||
# Create snapshot
|
||||
snapshot = StressSnapshot(
|
||||
mode=mode,
|
||||
score=score,
|
||||
signals=signals,
|
||||
multipliers=multipliers,
|
||||
timestamp=now.isoformat(),
|
||||
)
|
||||
|
||||
# Cache result
|
||||
_current_snapshot = snapshot
|
||||
_last_check_time = now
|
||||
|
||||
# Log mode changes
|
||||
if _current_snapshot is not None and _current_snapshot.mode != mode:
|
||||
logger.info(
|
||||
"Stress mode changed: %s -> %s (score: %.2f)",
|
||||
_current_snapshot.mode.value if _current_snapshot else "none",
|
||||
mode.value,
|
||||
score,
|
||||
)
|
||||
|
||||
return snapshot
|
||||
|
||||
|
||||
def get_current_stress_mode() -> StressMode:
|
||||
"""Get current stress mode (uses cached or fresh detection)."""
|
||||
snapshot = detect_stress_mode()
|
||||
return snapshot.mode
|
||||
|
||||
|
||||
def get_multiplier(quest_type: str, mode: StressMode | None = None) -> float:
|
||||
"""Get token multiplier for a quest type.
|
||||
|
||||
Args:
|
||||
quest_type: Type of quest (test_improve, issue_count, etc.)
|
||||
mode: Specific mode to get multiplier for, or None for current
|
||||
|
||||
Returns:
|
||||
Multiplier value (1.0 = normal, 1.5 = 50% bonus, etc.)
|
||||
"""
|
||||
if mode is None:
|
||||
mode = get_current_stress_mode()
|
||||
|
||||
multipliers = _get_multipliers_for_mode(mode)
|
||||
return multipliers.get(quest_type, 1.0)
|
||||
|
||||
|
||||
def apply_multiplier(base_reward: int, quest_type: str) -> int:
|
||||
"""Apply stress-based multiplier to a base reward.
|
||||
|
||||
Args:
|
||||
base_reward: Base token reward amount
|
||||
quest_type: Type of quest for multiplier lookup
|
||||
|
||||
Returns:
|
||||
Adjusted reward amount (always >= 1)
|
||||
"""
|
||||
multiplier = get_multiplier(quest_type)
|
||||
adjusted = int(base_reward * multiplier)
|
||||
return max(1, adjusted)
|
||||
|
||||
|
||||
def get_stress_summary() -> dict[str, Any]:
|
||||
"""Get a human-readable summary of current stress state."""
|
||||
snapshot = detect_stress_mode()
|
||||
|
||||
# Generate explanation
|
||||
explanations = {
|
||||
StressMode.CALM: "System is calm. Good time for exploration and refactoring.",
|
||||
StressMode.ELEVATED: "Elevated stress detected. Focus on stability and tests.",
|
||||
StressMode.HIGH: "HIGH STRESS MODE. Prioritize bug fixes and test hardening.",
|
||||
}
|
||||
|
||||
triggered_signals = [s for s in snapshot.signals if s.is_triggered]
|
||||
|
||||
return {
|
||||
"mode": snapshot.mode.value,
|
||||
"score": round(snapshot.score, 3),
|
||||
"explanation": explanations.get(snapshot.mode, "Unknown mode"),
|
||||
"active_signals": [
|
||||
{
|
||||
"name": s.name,
|
||||
"value": round(s.value, 3),
|
||||
"threshold": s.threshold,
|
||||
}
|
||||
for s in triggered_signals
|
||||
],
|
||||
"current_multipliers": snapshot.multipliers,
|
||||
"last_updated": snapshot.timestamp,
|
||||
}
|
||||
|
||||
|
||||
def reset_stress_state() -> None:
|
||||
"""Reset stress state cache (useful for testing)."""
|
||||
global _current_snapshot, _last_check_time, _config_cache, _config_mtime
|
||||
_current_snapshot = None
|
||||
_last_check_time = None
|
||||
_config_cache = None
|
||||
_config_mtime = 0.0
|
||||
294
tests/unit/test_stress_detector.py
Normal file
294
tests/unit/test_stress_detector.py
Normal file
@@ -0,0 +1,294 @@
|
||||
"""Unit tests for the stress detector module.
|
||||
|
||||
Tests stress signal calculation, mode detection, multipliers,
|
||||
and integration with the quest system.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.stress_detector import (
|
||||
StressMode,
|
||||
StressSignal,
|
||||
StressSnapshot,
|
||||
StressThresholds,
|
||||
_calculate_stress_score,
|
||||
_get_multipliers_for_mode,
|
||||
apply_multiplier,
|
||||
get_default_config,
|
||||
reset_stress_state,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def clean_stress_state():
|
||||
"""Reset stress state between tests."""
|
||||
reset_stress_state()
|
||||
yield
|
||||
reset_stress_state()
|
||||
|
||||
|
||||
# ── Stress Mode Tests ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStressMode:
|
||||
def test_stress_mode_values(self):
|
||||
"""StressMode enum has expected values."""
|
||||
assert StressMode.CALM.value == "calm"
|
||||
assert StressMode.ELEVATED.value == "elevated"
|
||||
assert StressMode.HIGH.value == "high"
|
||||
|
||||
|
||||
# ── Stress Signal Tests ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStressSignal:
|
||||
def test_signal_not_triggered(self):
|
||||
"""Signal with value below threshold is not triggered."""
|
||||
signal = StressSignal(
|
||||
name="test_signal",
|
||||
value=5.0,
|
||||
threshold=10.0,
|
||||
weight=0.5,
|
||||
)
|
||||
assert not signal.is_triggered
|
||||
assert signal.contribution == 0.0
|
||||
|
||||
def test_signal_triggered(self):
|
||||
"""Signal with value at threshold is triggered."""
|
||||
signal = StressSignal(
|
||||
name="test_signal",
|
||||
value=10.0,
|
||||
threshold=10.0,
|
||||
weight=0.5,
|
||||
)
|
||||
assert signal.is_triggered
|
||||
assert signal.contribution == 0.5 # weight * min(1, value/threshold)
|
||||
|
||||
def test_signal_contribution_capped(self):
|
||||
"""Signal contribution is capped at weight when value >> threshold."""
|
||||
signal = StressSignal(
|
||||
name="test_signal",
|
||||
value=100.0,
|
||||
threshold=10.0,
|
||||
weight=0.5,
|
||||
)
|
||||
assert signal.is_triggered
|
||||
assert signal.contribution == 0.5 # Capped at weight
|
||||
|
||||
def test_signal_partial_contribution(self):
|
||||
"""Signal contribution scales with value/threshold ratio."""
|
||||
signal = StressSignal(
|
||||
name="test_signal",
|
||||
value=15.0,
|
||||
threshold=10.0,
|
||||
weight=0.5,
|
||||
)
|
||||
assert signal.is_triggered
|
||||
# contribution = min(1, 15/10) * 0.5 = 0.5 (capped)
|
||||
assert signal.contribution == 0.5
|
||||
|
||||
|
||||
# ── Stress Thresholds Tests ────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStressThresholds:
|
||||
def test_calm_mode(self):
|
||||
"""Score below elevated_min returns CALM mode."""
|
||||
thresholds = StressThresholds(elevated_min=0.3, high_min=0.6)
|
||||
assert thresholds.get_mode_for_score(0.0) == StressMode.CALM
|
||||
assert thresholds.get_mode_for_score(0.1) == StressMode.CALM
|
||||
assert thresholds.get_mode_for_score(0.29) == StressMode.CALM
|
||||
|
||||
def test_elevated_mode(self):
|
||||
"""Score between elevated_min and high_min returns ELEVATED mode."""
|
||||
thresholds = StressThresholds(elevated_min=0.3, high_min=0.6)
|
||||
assert thresholds.get_mode_for_score(0.3) == StressMode.ELEVATED
|
||||
assert thresholds.get_mode_for_score(0.5) == StressMode.ELEVATED
|
||||
assert thresholds.get_mode_for_score(0.59) == StressMode.ELEVATED
|
||||
|
||||
def test_high_mode(self):
|
||||
"""Score at or above high_min returns HIGH mode."""
|
||||
thresholds = StressThresholds(elevated_min=0.3, high_min=0.6)
|
||||
assert thresholds.get_mode_for_score(0.6) == StressMode.HIGH
|
||||
assert thresholds.get_mode_for_score(0.8) == StressMode.HIGH
|
||||
assert thresholds.get_mode_for_score(1.0) == StressMode.HIGH
|
||||
|
||||
|
||||
# ── Stress Score Calculation Tests ─────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStressScoreCalculation:
|
||||
def test_empty_signals(self):
|
||||
"""Empty signal list returns zero stress score."""
|
||||
score = _calculate_stress_score([])
|
||||
assert score == 0.0
|
||||
|
||||
def test_no_triggered_signals(self):
|
||||
"""No triggered signals means zero stress score."""
|
||||
signals = [
|
||||
StressSignal(name="s1", value=1.0, threshold=10.0, weight=0.5),
|
||||
StressSignal(name="s2", value=2.0, threshold=10.0, weight=0.5),
|
||||
]
|
||||
score = _calculate_stress_score(signals)
|
||||
assert score == 0.0
|
||||
|
||||
def test_single_triggered_signal(self):
|
||||
"""Single triggered signal contributes its weight."""
|
||||
signals = [
|
||||
StressSignal(name="s1", value=10.0, threshold=10.0, weight=0.5),
|
||||
]
|
||||
score = _calculate_stress_score(signals)
|
||||
# contribution = 0.5, total_weight = 0.5, score = 0.5/0.5 = 1.0
|
||||
assert score == 1.0
|
||||
|
||||
def test_mixed_signals(self):
|
||||
"""Mix of triggered and non-triggered signals."""
|
||||
signals = [
|
||||
StressSignal(name="s1", value=10.0, threshold=10.0, weight=0.3),
|
||||
StressSignal(name="s2", value=1.0, threshold=10.0, weight=0.3),
|
||||
StressSignal(name="s3", value=10.0, threshold=10.0, weight=0.4),
|
||||
]
|
||||
score = _calculate_stress_score(signals)
|
||||
# triggered contributions: 0.3 + 0.4 = 0.7
|
||||
# total_weight: 0.3 + 0.3 + 0.4 = 1.0
|
||||
# score = 0.7 / 1.0 = 0.7
|
||||
assert score == 0.7
|
||||
|
||||
def test_score_capped_at_one(self):
|
||||
"""Stress score is capped at 1.0."""
|
||||
signals = [
|
||||
StressSignal(name="s1", value=100.0, threshold=10.0, weight=1.0),
|
||||
StressSignal(name="s2", value=100.0, threshold=10.0, weight=1.0),
|
||||
]
|
||||
score = _calculate_stress_score(signals)
|
||||
assert score == 1.0 # Capped
|
||||
|
||||
|
||||
# ── Multiplier Tests ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestMultipliers:
|
||||
def test_default_config_structure(self):
|
||||
"""Default config has expected structure."""
|
||||
config = get_default_config()
|
||||
assert "thresholds" in config
|
||||
assert "signals" in config
|
||||
assert "multipliers" in config
|
||||
|
||||
def test_calm_mode_multipliers(self):
|
||||
"""Calm mode has expected multipliers."""
|
||||
multipliers = _get_multipliers_for_mode(StressMode.CALM)
|
||||
assert multipliers["test_improve"] == 1.0
|
||||
assert multipliers["docs_update"] == 1.2
|
||||
assert multipliers["exploration"] == 1.3
|
||||
assert multipliers["refactor"] == 1.2
|
||||
|
||||
def test_elevated_mode_multipliers(self):
|
||||
"""Elevated mode has expected multipliers."""
|
||||
multipliers = _get_multipliers_for_mode(StressMode.ELEVATED)
|
||||
assert multipliers["test_improve"] == 1.2
|
||||
assert multipliers["issue_reduce"] == 1.1
|
||||
assert multipliers["refactor"] == 0.9
|
||||
|
||||
def test_high_mode_multipliers(self):
|
||||
"""High stress mode has expected multipliers."""
|
||||
multipliers = _get_multipliers_for_mode(StressMode.HIGH)
|
||||
assert multipliers["test_improve"] == 1.5
|
||||
assert multipliers["issue_reduce"] == 1.4
|
||||
assert multipliers["exploration"] == 0.7
|
||||
assert multipliers["refactor"] == 0.6
|
||||
|
||||
def test_multiplier_fallback_for_unknown_type(self):
|
||||
"""Unknown quest types return default multiplier of 1.0."""
|
||||
multipliers = _get_multipliers_for_mode(StressMode.CALM)
|
||||
assert multipliers.get("unknown_type", 1.0) == 1.0
|
||||
|
||||
|
||||
# ── Apply Multiplier Tests ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestApplyMultiplier:
|
||||
def test_apply_multiplier_calm(self):
|
||||
"""Multiplier applies correctly in calm mode."""
|
||||
# This test uses get_multiplier which reads from current stress mode
|
||||
# Since we can't easily mock the stress mode, we test the apply_multiplier logic
|
||||
base = 100
|
||||
# In calm mode with test_improve = 1.0
|
||||
result = apply_multiplier(base, "unknown_type")
|
||||
assert result >= 1 # At least 1 token
|
||||
|
||||
def test_apply_multiplier_minimum_one(self):
|
||||
"""Applied reward is at least 1 token."""
|
||||
# Even with very low multiplier, result should be >= 1
|
||||
result = apply_multiplier(1, "any_type")
|
||||
assert result >= 1
|
||||
|
||||
|
||||
# ── Stress Snapshot Tests ──────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStressSnapshot:
|
||||
def test_snapshot_to_dict(self):
|
||||
"""Snapshot can be converted to dictionary."""
|
||||
signals = [
|
||||
StressSignal(name="test", value=10.0, threshold=5.0, weight=0.5),
|
||||
]
|
||||
snapshot = StressSnapshot(
|
||||
mode=StressMode.ELEVATED,
|
||||
score=0.5,
|
||||
signals=signals,
|
||||
multipliers={"test_improve": 1.2},
|
||||
)
|
||||
|
||||
data = snapshot.to_dict()
|
||||
assert data["mode"] == "elevated"
|
||||
assert data["score"] == 0.5
|
||||
assert len(data["signals"]) == 1
|
||||
assert data["multipliers"]["test_improve"] == 1.2
|
||||
|
||||
|
||||
# ── Integration Tests ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStressDetectorIntegration:
|
||||
def test_reset_stress_state(self):
|
||||
"""Reset clears internal state."""
|
||||
# Just verify reset doesn't error
|
||||
reset_stress_state()
|
||||
|
||||
def test_default_config_contains_all_signals(self):
|
||||
"""Default config defines all expected signals."""
|
||||
config = get_default_config()
|
||||
signals = config["signals"]
|
||||
|
||||
expected_signals = [
|
||||
"flaky_test_rate",
|
||||
"p1_backlog_growth",
|
||||
"ci_failure_rate",
|
||||
"open_bug_count",
|
||||
]
|
||||
|
||||
for signal in expected_signals:
|
||||
assert signal in signals
|
||||
assert "threshold" in signals[signal]
|
||||
assert "weight" in signals[signal]
|
||||
|
||||
def test_default_config_contains_all_modes(self):
|
||||
"""Default config defines all stress modes."""
|
||||
config = get_default_config()
|
||||
multipliers = config["multipliers"]
|
||||
|
||||
assert "calm" in multipliers
|
||||
assert "elevated" in multipliers
|
||||
assert "high" in multipliers
|
||||
|
||||
def test_multiplier_weights_sum_approximately_one(self):
|
||||
"""Signal weights should approximately sum to 1.0."""
|
||||
config = get_default_config()
|
||||
signals = config["signals"]
|
||||
|
||||
total_weight = sum(s["weight"] for s in signals.values())
|
||||
# Allow some flexibility but should be close to 1.0
|
||||
assert 0.9 <= total_weight <= 1.1
|
||||
Reference in New Issue
Block a user