forked from Rockachopa/Timmy-time-dashboard
Compare commits
1 Commits
main
...
kimi/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
919a011cae |
98
config/stress_modes.yaml
Normal file
98
config/stress_modes.yaml
Normal file
@@ -0,0 +1,98 @@
|
||||
# ── System Stress Modes Configuration ────────────────────────────────────────
|
||||
#
|
||||
# This configuration defines how token rewards adapt based on system stress.
|
||||
# When the system detects elevated stress (flaky tests, growing backlog,
|
||||
# CI failures), quest rewards are adjusted to incentivize agents to focus
|
||||
# on the most critical areas.
|
||||
#
|
||||
# ── How It Works ─────────────────────────────────────────────────────────────
|
||||
#
|
||||
# 1. SIGNALS: System metrics are monitored continuously
|
||||
# 2. SCORE: Weighted contributions from triggered signals create a stress score
|
||||
# 3. MODE: Score determines the stress mode (calm, elevated, high)
|
||||
# 4. MULTIPLIERS: Token rewards are multiplied based on the current mode
|
||||
#
|
||||
# ── Stress Thresholds ────────────────────────────────────────────────────────
|
||||
|
||||
thresholds:
|
||||
# Minimum score to enter elevated mode (0.0 - 1.0)
|
||||
elevated_min: 0.3
|
||||
|
||||
# Minimum score to enter high stress mode (0.0 - 1.0)
|
||||
high_min: 0.6
|
||||
|
||||
# ── Stress Signals ───────────────────────────────────────────────────────────
|
||||
#
|
||||
# Each signal has:
|
||||
# - threshold: Value at which signal is considered "triggered"
|
||||
# - weight: Contribution to overall stress score (should sum to ~1.0)
|
||||
|
||||
signals:
|
||||
flaky_test_rate:
|
||||
threshold: 0.15 # 15% of tests showing flakiness
|
||||
weight: 0.30
|
||||
description: "Percentage of test runs that are flaky"
|
||||
|
||||
p1_backlog_growth:
|
||||
threshold: 5 # 5 new P1 issues in lookback period
|
||||
weight: 0.25
|
||||
description: "Net growth in P1 priority issues over 7 days"
|
||||
|
||||
ci_failure_rate:
|
||||
threshold: 0.20 # 20% of CI runs failing
|
||||
weight: 0.25
|
||||
description: "Percentage of CI runs failing in lookback period"
|
||||
|
||||
open_bug_count:
|
||||
threshold: 20 # 20 open bugs
|
||||
weight: 0.20
|
||||
description: "Total open issues labeled as 'bug'"
|
||||
|
||||
# ── Token Multipliers ────────────────────────────────────────────────────────
|
||||
#
|
||||
# Multipliers are applied to quest rewards based on current stress mode.
|
||||
# Values > 1.0 increase rewards, < 1.0 decrease rewards.
|
||||
#
|
||||
# Quest types:
|
||||
# - test_improve: Test coverage/quality improvements
|
||||
# - docs_update: Documentation updates
|
||||
# - issue_count: Closing specific issue types
|
||||
# - issue_reduce: Reducing overall issue backlog
|
||||
# - daily_run: Daily Run session completion
|
||||
# - custom: Special/manual quests
|
||||
# - exploration: Exploratory work
|
||||
# - refactor: Code refactoring
|
||||
|
||||
multipliers:
|
||||
calm:
|
||||
# Calm periods: incentivize maintenance and exploration
|
||||
test_improve: 1.0
|
||||
docs_update: 1.2
|
||||
issue_count: 1.0
|
||||
issue_reduce: 1.0
|
||||
daily_run: 1.0
|
||||
custom: 1.0
|
||||
exploration: 1.3
|
||||
refactor: 1.2
|
||||
|
||||
elevated:
|
||||
# Elevated stress: start emphasizing stability
|
||||
test_improve: 1.2
|
||||
docs_update: 1.0
|
||||
issue_count: 1.1
|
||||
issue_reduce: 1.1
|
||||
daily_run: 1.0
|
||||
custom: 1.0
|
||||
exploration: 1.0
|
||||
refactor: 0.9 # Discourage risky changes
|
||||
|
||||
high:
|
||||
# High stress: crisis mode, focus on stabilization
|
||||
test_improve: 1.5 # Strongly incentivize testing
|
||||
docs_update: 0.8 # Deprioritize docs
|
||||
issue_count: 1.3 # Reward closing issues
|
||||
issue_reduce: 1.4 # Strongly reward reducing backlog
|
||||
daily_run: 1.1
|
||||
custom: 1.0
|
||||
exploration: 0.7 # Discourage exploration
|
||||
refactor: 0.6 # Discourage refactors during crisis
|
||||
@@ -330,13 +330,6 @@ class Settings(BaseSettings):
|
||||
autoresearch_max_iterations: int = 100
|
||||
autoresearch_metric: str = "val_bpb" # metric to optimise (lower = better)
|
||||
|
||||
# ── Weekly Narrative Summary ───────────────────────────────────────
|
||||
# Generates a human-readable weekly summary of development activity.
|
||||
# Disabling this will stop the weekly narrative generation.
|
||||
weekly_narrative_enabled: bool = True
|
||||
weekly_narrative_lookback_days: int = 7
|
||||
weekly_narrative_output_dir: str = ".loop"
|
||||
|
||||
# ── Local Hands (Shell + Git) ──────────────────────────────────────
|
||||
# Enable local shell/git execution hands.
|
||||
hands_shell_enabled: bool = True
|
||||
|
||||
@@ -187,6 +187,76 @@ async def reload_quest_config_api() -> JSONResponse:
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Stress Mode Endpoints
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@router.get("/api/stress")
|
||||
async def get_stress_status_api() -> JSONResponse:
|
||||
"""Get current stress mode status and multipliers.
|
||||
|
||||
Returns:
|
||||
Current stress mode, score, active signals, and multipliers
|
||||
"""
|
||||
try:
|
||||
from timmy.stress_detector import (
|
||||
detect_stress_mode,
|
||||
get_stress_summary,
|
||||
)
|
||||
|
||||
snapshot = detect_stress_mode()
|
||||
summary = get_stress_summary()
|
||||
|
||||
return JSONResponse(
|
||||
{
|
||||
"status": "ok",
|
||||
"stress": summary,
|
||||
"raw": snapshot.to_dict(),
|
||||
}
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to get stress status: %s", exc)
|
||||
return JSONResponse(
|
||||
{
|
||||
"status": "error",
|
||||
"error": str(exc),
|
||||
},
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
|
||||
@router.post("/api/stress/refresh")
|
||||
async def refresh_stress_detection_api() -> JSONResponse:
|
||||
"""Force a fresh stress detection check.
|
||||
|
||||
Normally stress is cached for 60 seconds. This endpoint
|
||||
bypasses the cache for immediate results.
|
||||
"""
|
||||
try:
|
||||
from timmy.stress_detector import detect_stress_mode, get_stress_summary
|
||||
|
||||
snapshot = detect_stress_mode(force_refresh=True)
|
||||
summary = get_stress_summary()
|
||||
|
||||
return JSONResponse(
|
||||
{
|
||||
"status": "ok",
|
||||
"stress": summary,
|
||||
"raw": snapshot.to_dict(),
|
||||
}
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to refresh stress detection: %s", exc)
|
||||
return JSONResponse(
|
||||
{
|
||||
"status": "error",
|
||||
"error": str(exc),
|
||||
},
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dashboard UI Endpoints
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -269,6 +269,22 @@ def _is_on_cooldown(progress: QuestProgress, quest: QuestDefinition) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _apply_stress_multiplier(base_reward: int, quest_type: QuestType) -> tuple[int, float]:
|
||||
"""Apply stress-based multiplier to quest reward.
|
||||
|
||||
Returns:
|
||||
Tuple of (adjusted_reward, multiplier_used)
|
||||
"""
|
||||
try:
|
||||
from timmy.stress_detector import apply_multiplier
|
||||
|
||||
multiplier = apply_multiplier(base_reward, quest_type.value)
|
||||
return multiplier, multiplier / max(base_reward, 1)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to apply stress multiplier: %s", exc)
|
||||
return base_reward, 1.0
|
||||
|
||||
|
||||
def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
|
||||
"""Claim the token reward for a completed quest.
|
||||
|
||||
@@ -292,13 +308,18 @@ def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
|
||||
return None
|
||||
|
||||
try:
|
||||
# Apply stress-based multiplier
|
||||
adjusted_reward, multiplier = _apply_stress_multiplier(
|
||||
quest.reward_tokens, quest.quest_type
|
||||
)
|
||||
|
||||
# Award tokens via ledger
|
||||
from lightning.ledger import create_invoice_entry, mark_settled
|
||||
|
||||
# Create a mock invoice for the reward
|
||||
invoice_entry = create_invoice_entry(
|
||||
payment_hash=f"quest_{quest_id}_{agent_id}_{int(time.time())}",
|
||||
amount_sats=quest.reward_tokens,
|
||||
amount_sats=adjusted_reward,
|
||||
memo=f"Quest reward: {quest.name}",
|
||||
source="quest_reward",
|
||||
agent_id=agent_id,
|
||||
@@ -320,12 +341,21 @@ def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
|
||||
progress.completed_at = ""
|
||||
progress.claimed_at = ""
|
||||
|
||||
notification = quest.notification_message.format(tokens=quest.reward_tokens)
|
||||
# Build notification with multiplier info
|
||||
notification = quest.notification_message.format(tokens=adjusted_reward)
|
||||
if multiplier != 1.0:
|
||||
pct = int((multiplier - 1.0) * 100)
|
||||
if pct > 0:
|
||||
notification += f" (+{pct}% stress bonus)"
|
||||
else:
|
||||
notification += f" ({pct}% stress adjustment)"
|
||||
|
||||
return {
|
||||
"quest_id": quest_id,
|
||||
"agent_id": agent_id,
|
||||
"tokens_awarded": quest.reward_tokens,
|
||||
"tokens_awarded": adjusted_reward,
|
||||
"base_reward": quest.reward_tokens,
|
||||
"multiplier": round(multiplier, 2),
|
||||
"notification": notification,
|
||||
"completion_count": progress.completion_count,
|
||||
}
|
||||
@@ -467,6 +497,14 @@ def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
|
||||
total_rewards = 0
|
||||
completed_count = 0
|
||||
|
||||
# Get current stress mode for adjusted rewards display
|
||||
try:
|
||||
from timmy.stress_detector import get_current_stress_mode, get_multiplier
|
||||
|
||||
current_mode = get_current_stress_mode()
|
||||
except Exception:
|
||||
current_mode = None
|
||||
|
||||
for quest_id, quest in definitions.items():
|
||||
progress = get_quest_progress(quest_id, agent_id)
|
||||
if not progress:
|
||||
@@ -474,11 +512,23 @@ def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
|
||||
|
||||
is_on_cooldown = _is_on_cooldown(progress, quest) if quest.repeatable else False
|
||||
|
||||
# Calculate adjusted reward with stress multiplier
|
||||
adjusted_reward = quest.reward_tokens
|
||||
multiplier = 1.0
|
||||
if current_mode:
|
||||
try:
|
||||
multiplier = get_multiplier(quest.quest_type.value, current_mode)
|
||||
adjusted_reward = int(quest.reward_tokens * multiplier)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
quest_info = {
|
||||
"quest_id": quest_id,
|
||||
"name": quest.name,
|
||||
"description": quest.description,
|
||||
"reward_tokens": quest.reward_tokens,
|
||||
"adjusted_reward": adjusted_reward,
|
||||
"multiplier": round(multiplier, 2),
|
||||
"type": quest.quest_type.value,
|
||||
"enabled": quest.enabled,
|
||||
"repeatable": quest.repeatable,
|
||||
@@ -509,6 +559,7 @@ def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
|
||||
"total_tokens_earned": total_rewards,
|
||||
"total_quests_completed": completed_count,
|
||||
"active_quests_count": len([q for q in quests_status if q["enabled"]]),
|
||||
"stress_mode": current_mode.value if current_mode else None,
|
||||
}
|
||||
|
||||
|
||||
|
||||
565
src/timmy/stress_detector.py
Normal file
565
src/timmy/stress_detector.py
Normal file
@@ -0,0 +1,565 @@
|
||||
"""System stress detection for adaptive token rewards.
|
||||
|
||||
Monitors system signals like flakiness, backlog growth, and CI failures
|
||||
to determine the current stress mode. Token rewards are then adjusted
|
||||
based on the stress mode to incentivize agents to focus on critical areas.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from enum import StrEnum
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Path to stress mode configuration
|
||||
STRESS_CONFIG_PATH = Path(settings.repo_root) / "config" / "stress_modes.yaml"
|
||||
|
||||
|
||||
class StressMode(StrEnum):
|
||||
"""System stress modes.
|
||||
|
||||
- CALM: Normal operations, incentivize exploration and refactoring
|
||||
- ELEVATED: Some stress signals detected, balance incentives
|
||||
- HIGH: Critical stress, strongly incentivize bug fixes and stabilization
|
||||
"""
|
||||
|
||||
CALM = "calm"
|
||||
ELEVATED = "elevated"
|
||||
HIGH = "high"
|
||||
|
||||
|
||||
@dataclass
|
||||
class StressSignal:
|
||||
"""A single stress signal reading."""
|
||||
|
||||
name: str
|
||||
value: float
|
||||
threshold: float
|
||||
weight: float
|
||||
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
||||
|
||||
@property
|
||||
def is_triggered(self) -> bool:
|
||||
"""Whether this signal exceeds its threshold."""
|
||||
return self.value >= self.threshold
|
||||
|
||||
@property
|
||||
def contribution(self) -> float:
|
||||
"""Calculate this signal's contribution to stress score."""
|
||||
if not self.is_triggered:
|
||||
return 0.0
|
||||
# Contribution is weighted ratio of value to threshold
|
||||
return min(1.0, (self.value / max(self.threshold, 1.0))) * self.weight
|
||||
|
||||
|
||||
@dataclass
|
||||
class StressSnapshot:
|
||||
"""Complete stress assessment at a point in time."""
|
||||
|
||||
mode: StressMode
|
||||
score: float
|
||||
signals: list[StressSignal]
|
||||
multipliers: dict[str, float]
|
||||
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Convert to dictionary for serialization."""
|
||||
return {
|
||||
"mode": self.mode.value,
|
||||
"score": round(self.score, 3),
|
||||
"signals": [
|
||||
{
|
||||
"name": s.name,
|
||||
"value": s.value,
|
||||
"threshold": s.threshold,
|
||||
"triggered": s.is_triggered,
|
||||
"contribution": round(s.contribution, 3),
|
||||
}
|
||||
for s in self.signals
|
||||
],
|
||||
"multipliers": self.multipliers,
|
||||
"timestamp": self.timestamp,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class StressThresholds:
|
||||
"""Thresholds for entering/exiting stress modes."""
|
||||
|
||||
elevated_min: float = 0.3
|
||||
high_min: float = 0.6
|
||||
|
||||
def get_mode_for_score(self, score: float) -> StressMode:
|
||||
"""Determine stress mode based on score."""
|
||||
if score >= self.high_min:
|
||||
return StressMode.HIGH
|
||||
elif score >= self.elevated_min:
|
||||
return StressMode.ELEVATED
|
||||
return StressMode.CALM
|
||||
|
||||
|
||||
# In-memory storage for stress state
|
||||
_current_snapshot: StressSnapshot | None = None
|
||||
_last_check_time: datetime | None = None
|
||||
_config_cache: dict[str, Any] | None = None
|
||||
_config_mtime: float = 0.0
|
||||
|
||||
|
||||
def _load_stress_config() -> dict[str, Any]:
|
||||
"""Load stress mode configuration from YAML.
|
||||
|
||||
Returns:
|
||||
Configuration dictionary with default fallbacks
|
||||
"""
|
||||
global _config_cache, _config_mtime
|
||||
|
||||
# Check if config file has been modified
|
||||
if STRESS_CONFIG_PATH.exists():
|
||||
mtime = STRESS_CONFIG_PATH.stat().st_mtime
|
||||
if mtime != _config_mtime or _config_cache is None:
|
||||
try:
|
||||
raw = STRESS_CONFIG_PATH.read_text()
|
||||
_config_cache = yaml.safe_load(raw) or {}
|
||||
_config_mtime = mtime
|
||||
logger.debug("Loaded stress config from %s", STRESS_CONFIG_PATH)
|
||||
except (OSError, yaml.YAMLError) as exc:
|
||||
logger.warning("Failed to load stress config: %s", exc)
|
||||
_config_cache = {}
|
||||
|
||||
if _config_cache is None:
|
||||
_config_cache = {}
|
||||
|
||||
return _config_cache
|
||||
|
||||
|
||||
def get_default_config() -> dict[str, Any]:
|
||||
"""Get default stress configuration."""
|
||||
return {
|
||||
"thresholds": {
|
||||
"elevated_min": 0.3,
|
||||
"high_min": 0.6,
|
||||
},
|
||||
"signals": {
|
||||
"flaky_test_rate": {
|
||||
"threshold": 0.15, # 15% flaky test rate
|
||||
"weight": 0.3,
|
||||
"description": "Percentage of tests that are flaky",
|
||||
},
|
||||
"p1_backlog_growth": {
|
||||
"threshold": 5, # 5 new P1 issues
|
||||
"weight": 0.25,
|
||||
"description": "Net growth in P1 priority issues",
|
||||
},
|
||||
"ci_failure_rate": {
|
||||
"threshold": 0.2, # 20% CI failure rate
|
||||
"weight": 0.25,
|
||||
"description": "Percentage of CI runs failing",
|
||||
},
|
||||
"open_bug_count": {
|
||||
"threshold": 20, # 20 open bugs
|
||||
"weight": 0.2,
|
||||
"description": "Total open issues labeled as bugs",
|
||||
},
|
||||
},
|
||||
"multipliers": {
|
||||
StressMode.CALM.value: {
|
||||
"test_improve": 1.0,
|
||||
"docs_update": 1.2, # Calm periods good for docs
|
||||
"issue_count": 1.0,
|
||||
"issue_reduce": 1.0,
|
||||
"daily_run": 1.0,
|
||||
"custom": 1.0,
|
||||
"exploration": 1.3, # Encourage exploration
|
||||
"refactor": 1.2, # Encourage refactoring
|
||||
},
|
||||
StressMode.ELEVATED.value: {
|
||||
"test_improve": 1.2, # Start emphasizing tests
|
||||
"docs_update": 1.0,
|
||||
"issue_count": 1.1,
|
||||
"issue_reduce": 1.1,
|
||||
"daily_run": 1.0,
|
||||
"custom": 1.0,
|
||||
"exploration": 1.0,
|
||||
"refactor": 0.9, # Discourage risky refactors
|
||||
},
|
||||
StressMode.HIGH.value: {
|
||||
"test_improve": 1.5, # Strongly incentivize testing
|
||||
"docs_update": 0.8, # Deprioritize docs
|
||||
"issue_count": 1.3, # Reward closing issues
|
||||
"issue_reduce": 1.4, # Strongly reward reducing backlog
|
||||
"daily_run": 1.1,
|
||||
"custom": 1.0,
|
||||
"exploration": 0.7, # Discourage exploration
|
||||
"refactor": 0.6, # Discourage refactors during crisis
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _get_config_value(key_path: str, default: Any = None) -> Any:
|
||||
"""Get a value from config using dot notation path."""
|
||||
config = _load_stress_config()
|
||||
keys = key_path.split(".")
|
||||
value = config
|
||||
for key in keys:
|
||||
if isinstance(value, dict):
|
||||
value = value.get(key)
|
||||
else:
|
||||
return default
|
||||
return value if value is not None else default
|
||||
|
||||
|
||||
def _calculate_flaky_test_rate() -> float:
|
||||
"""Calculate current flaky test rate from available data."""
|
||||
try:
|
||||
# Try to load from daily run metrics or test results
|
||||
test_results_path = Path(settings.repo_root) / ".loop" / "test_results.jsonl"
|
||||
if not test_results_path.exists():
|
||||
return 0.0
|
||||
|
||||
# Count recent test runs and flaky results
|
||||
now = datetime.now(UTC)
|
||||
cutoff = now - timedelta(days=7)
|
||||
|
||||
total_runs = 0
|
||||
flaky_runs = 0
|
||||
|
||||
if test_results_path.exists():
|
||||
for line in test_results_path.read_text().strip().splitlines():
|
||||
try:
|
||||
entry = json.loads(line)
|
||||
ts_str = entry.get("timestamp", "")
|
||||
if not ts_str:
|
||||
continue
|
||||
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
||||
if ts >= cutoff:
|
||||
total_runs += 1
|
||||
if entry.get("is_flaky", False):
|
||||
flaky_runs += 1
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
|
||||
return flaky_runs / max(total_runs, 1)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to calculate flaky test rate: %s", exc)
|
||||
return 0.0
|
||||
|
||||
|
||||
def _calculate_p1_backlog_growth() -> float:
|
||||
"""Calculate P1 issue backlog growth."""
|
||||
try:
|
||||
from dashboard.routes.daily_run import GiteaClient, _load_config
|
||||
|
||||
config = _load_config()
|
||||
token = config.get("token")
|
||||
client = GiteaClient(config, token)
|
||||
|
||||
if not client.is_available():
|
||||
return 0.0
|
||||
|
||||
# Get current P1 issues
|
||||
now = datetime.now(UTC)
|
||||
cutoff_current = now - timedelta(days=7)
|
||||
cutoff_previous = now - timedelta(days=14)
|
||||
|
||||
issues = client.get_paginated("issues", {"state": "all", "labels": "P1", "limit": 100})
|
||||
|
||||
current_count = 0
|
||||
previous_count = 0
|
||||
|
||||
for issue in issues:
|
||||
created_at = issue.get("created_at", "")
|
||||
if not created_at:
|
||||
continue
|
||||
try:
|
||||
created = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
|
||||
if created >= cutoff_current:
|
||||
current_count += 1
|
||||
elif created >= cutoff_previous:
|
||||
previous_count += 1
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
|
||||
# Return net growth (positive means growing backlog)
|
||||
return max(0, current_count - previous_count)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to calculate P1 backlog growth: %s", exc)
|
||||
return 0.0
|
||||
|
||||
|
||||
def _calculate_ci_failure_rate() -> float:
|
||||
"""Calculate CI failure rate from recent runs."""
|
||||
try:
|
||||
# Try to get CI metrics from Gitea or local files
|
||||
ci_results_path = Path(settings.repo_root) / ".loop" / "ci_results.jsonl"
|
||||
if not ci_results_path.exists():
|
||||
return 0.0
|
||||
|
||||
now = datetime.now(UTC)
|
||||
cutoff = now - timedelta(days=7)
|
||||
|
||||
total_runs = 0
|
||||
failed_runs = 0
|
||||
|
||||
for line in ci_results_path.read_text().strip().splitlines():
|
||||
try:
|
||||
entry = json.loads(line)
|
||||
ts_str = entry.get("timestamp", "")
|
||||
if not ts_str:
|
||||
continue
|
||||
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
||||
if ts >= cutoff:
|
||||
total_runs += 1
|
||||
if entry.get("status") != "success":
|
||||
failed_runs += 1
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
|
||||
return failed_runs / max(total_runs, 1)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to calculate CI failure rate: %s", exc)
|
||||
return 0.0
|
||||
|
||||
|
||||
def _calculate_open_bug_count() -> float:
|
||||
"""Calculate current open bug count."""
|
||||
try:
|
||||
from dashboard.routes.daily_run import GiteaClient, _load_config
|
||||
|
||||
config = _load_config()
|
||||
token = config.get("token")
|
||||
client = GiteaClient(config, token)
|
||||
|
||||
if not client.is_available():
|
||||
return 0.0
|
||||
|
||||
issues = client.get_paginated("issues", {"state": "open", "labels": "bug", "limit": 100})
|
||||
|
||||
return float(len(issues))
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to calculate open bug count: %s", exc)
|
||||
return 0.0
|
||||
|
||||
|
||||
def _collect_stress_signals() -> list[StressSignal]:
|
||||
"""Collect all stress signals from the system."""
|
||||
config = _load_stress_config()
|
||||
default_config = get_default_config()
|
||||
signals_config = config.get("signals", default_config["signals"])
|
||||
|
||||
signals = []
|
||||
|
||||
# Define signal collectors
|
||||
collectors = {
|
||||
"flaky_test_rate": _calculate_flaky_test_rate,
|
||||
"p1_backlog_growth": _calculate_p1_backlog_growth,
|
||||
"ci_failure_rate": _calculate_ci_failure_rate,
|
||||
"open_bug_count": _calculate_open_bug_count,
|
||||
}
|
||||
|
||||
for signal_name, collector in collectors.items():
|
||||
signal_cfg = signals_config.get(signal_name, {})
|
||||
default_cfg = default_config["signals"].get(signal_name, {})
|
||||
|
||||
try:
|
||||
value = collector()
|
||||
threshold = signal_cfg.get("threshold", default_cfg.get("threshold", 1.0))
|
||||
weight = signal_cfg.get("weight", default_cfg.get("weight", 0.25))
|
||||
|
||||
signals.append(
|
||||
StressSignal(
|
||||
name=signal_name,
|
||||
value=value,
|
||||
threshold=threshold,
|
||||
weight=weight,
|
||||
)
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to collect signal %s: %s", signal_name, exc)
|
||||
|
||||
return signals
|
||||
|
||||
|
||||
def _calculate_stress_score(signals: list[StressSignal]) -> float:
|
||||
"""Calculate overall stress score from signals.
|
||||
|
||||
Score is weighted sum of triggered signal contributions,
|
||||
normalized to 0-1 range.
|
||||
"""
|
||||
if not signals:
|
||||
return 0.0
|
||||
|
||||
total_weight = sum(s.weight for s in signals)
|
||||
if total_weight == 0:
|
||||
return 0.0
|
||||
|
||||
triggered_contribution = sum(s.contribution for s in signals)
|
||||
return min(1.0, triggered_contribution / total_weight)
|
||||
|
||||
|
||||
def _get_multipliers_for_mode(mode: StressMode) -> dict[str, float]:
|
||||
"""Get token multipliers for a specific stress mode."""
|
||||
config = _load_stress_config()
|
||||
default_config = get_default_config()
|
||||
|
||||
multipliers = config.get("multipliers", default_config["multipliers"])
|
||||
mode_multipliers = multipliers.get(mode.value, {})
|
||||
default_mode_multipliers = default_config["multipliers"].get(mode.value, {})
|
||||
|
||||
# Merge with defaults
|
||||
result = default_mode_multipliers.copy()
|
||||
result.update(mode_multipliers)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def detect_stress_mode(
|
||||
force_refresh: bool = False,
|
||||
min_check_interval_seconds: int = 60,
|
||||
) -> StressSnapshot:
|
||||
"""Detect current system stress mode.
|
||||
|
||||
Args:
|
||||
force_refresh: Force a new check even if recently checked
|
||||
min_check_interval_seconds: Minimum seconds between checks
|
||||
|
||||
Returns:
|
||||
StressSnapshot with mode, score, signals, and multipliers
|
||||
"""
|
||||
global _current_snapshot, _last_check_time
|
||||
|
||||
now = datetime.now(UTC)
|
||||
|
||||
# Return cached snapshot if recent and not forced
|
||||
if not force_refresh and _current_snapshot is not None and _last_check_time is not None:
|
||||
elapsed = (now - _last_check_time).total_seconds()
|
||||
if elapsed < min_check_interval_seconds:
|
||||
return _current_snapshot
|
||||
|
||||
# Collect signals and calculate stress
|
||||
signals = _collect_stress_signals()
|
||||
score = _calculate_stress_score(signals)
|
||||
|
||||
# Determine mode from score
|
||||
config = _load_stress_config()
|
||||
default_config = get_default_config()
|
||||
thresholds_cfg = config.get("thresholds", default_config["thresholds"])
|
||||
thresholds = StressThresholds(
|
||||
elevated_min=thresholds_cfg.get("elevated_min", 0.3),
|
||||
high_min=thresholds_cfg.get("high_min", 0.6),
|
||||
)
|
||||
mode = thresholds.get_mode_for_score(score)
|
||||
|
||||
# Get multipliers for this mode
|
||||
multipliers = _get_multipliers_for_mode(mode)
|
||||
|
||||
# Create snapshot
|
||||
snapshot = StressSnapshot(
|
||||
mode=mode,
|
||||
score=score,
|
||||
signals=signals,
|
||||
multipliers=multipliers,
|
||||
timestamp=now.isoformat(),
|
||||
)
|
||||
|
||||
# Cache result
|
||||
_current_snapshot = snapshot
|
||||
_last_check_time = now
|
||||
|
||||
# Log mode changes
|
||||
if _current_snapshot is not None and _current_snapshot.mode != mode:
|
||||
logger.info(
|
||||
"Stress mode changed: %s -> %s (score: %.2f)",
|
||||
_current_snapshot.mode.value if _current_snapshot else "none",
|
||||
mode.value,
|
||||
score,
|
||||
)
|
||||
|
||||
return snapshot
|
||||
|
||||
|
||||
def get_current_stress_mode() -> StressMode:
|
||||
"""Get current stress mode (uses cached or fresh detection)."""
|
||||
snapshot = detect_stress_mode()
|
||||
return snapshot.mode
|
||||
|
||||
|
||||
def get_multiplier(quest_type: str, mode: StressMode | None = None) -> float:
|
||||
"""Get token multiplier for a quest type.
|
||||
|
||||
Args:
|
||||
quest_type: Type of quest (test_improve, issue_count, etc.)
|
||||
mode: Specific mode to get multiplier for, or None for current
|
||||
|
||||
Returns:
|
||||
Multiplier value (1.0 = normal, 1.5 = 50% bonus, etc.)
|
||||
"""
|
||||
if mode is None:
|
||||
mode = get_current_stress_mode()
|
||||
|
||||
multipliers = _get_multipliers_for_mode(mode)
|
||||
return multipliers.get(quest_type, 1.0)
|
||||
|
||||
|
||||
def apply_multiplier(base_reward: int, quest_type: str) -> int:
|
||||
"""Apply stress-based multiplier to a base reward.
|
||||
|
||||
Args:
|
||||
base_reward: Base token reward amount
|
||||
quest_type: Type of quest for multiplier lookup
|
||||
|
||||
Returns:
|
||||
Adjusted reward amount (always >= 1)
|
||||
"""
|
||||
multiplier = get_multiplier(quest_type)
|
||||
adjusted = int(base_reward * multiplier)
|
||||
return max(1, adjusted)
|
||||
|
||||
|
||||
def get_stress_summary() -> dict[str, Any]:
|
||||
"""Get a human-readable summary of current stress state."""
|
||||
snapshot = detect_stress_mode()
|
||||
|
||||
# Generate explanation
|
||||
explanations = {
|
||||
StressMode.CALM: "System is calm. Good time for exploration and refactoring.",
|
||||
StressMode.ELEVATED: "Elevated stress detected. Focus on stability and tests.",
|
||||
StressMode.HIGH: "HIGH STRESS MODE. Prioritize bug fixes and test hardening.",
|
||||
}
|
||||
|
||||
triggered_signals = [s for s in snapshot.signals if s.is_triggered]
|
||||
|
||||
return {
|
||||
"mode": snapshot.mode.value,
|
||||
"score": round(snapshot.score, 3),
|
||||
"explanation": explanations.get(snapshot.mode, "Unknown mode"),
|
||||
"active_signals": [
|
||||
{
|
||||
"name": s.name,
|
||||
"value": round(s.value, 3),
|
||||
"threshold": s.threshold,
|
||||
}
|
||||
for s in triggered_signals
|
||||
],
|
||||
"current_multipliers": snapshot.multipliers,
|
||||
"last_updated": snapshot.timestamp,
|
||||
}
|
||||
|
||||
|
||||
def reset_stress_state() -> None:
|
||||
"""Reset stress state cache (useful for testing)."""
|
||||
global _current_snapshot, _last_check_time, _config_cache, _config_mtime
|
||||
_current_snapshot = None
|
||||
_last_check_time = None
|
||||
_config_cache = None
|
||||
_config_mtime = 0.0
|
||||
@@ -1,343 +0,0 @@
|
||||
"""Tests for weekly_narrative.py script."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
# Add timmy_automations to path for imports
|
||||
sys.path.insert(
|
||||
0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations" / "daily_run")
|
||||
)
|
||||
|
||||
import weekly_narrative as wn
|
||||
|
||||
|
||||
class TestParseTimestamp:
|
||||
"""Test timestamp parsing."""
|
||||
|
||||
def test_parse_iso_with_z(self):
|
||||
"""Parse ISO timestamp with Z suffix."""
|
||||
result = wn.parse_ts("2026-03-21T12:00:00Z")
|
||||
assert result is not None
|
||||
assert result.year == 2026
|
||||
assert result.month == 3
|
||||
assert result.day == 21
|
||||
|
||||
def test_parse_iso_with_offset(self):
|
||||
"""Parse ISO timestamp with timezone offset."""
|
||||
result = wn.parse_ts("2026-03-21T12:00:00+00:00")
|
||||
assert result is not None
|
||||
assert result.year == 2026
|
||||
|
||||
def test_parse_empty_string(self):
|
||||
"""Empty string returns None."""
|
||||
result = wn.parse_ts("")
|
||||
assert result is None
|
||||
|
||||
def test_parse_invalid_string(self):
|
||||
"""Invalid string returns None."""
|
||||
result = wn.parse_ts("not-a-timestamp")
|
||||
assert result is None
|
||||
|
||||
|
||||
class TestCollectCyclesData:
|
||||
"""Test cycle data collection."""
|
||||
|
||||
def test_no_cycles_file(self, tmp_path):
|
||||
"""Handle missing cycles file gracefully."""
|
||||
with patch.object(wn, "REPO_ROOT", tmp_path):
|
||||
since = datetime.now(UTC) - timedelta(days=7)
|
||||
result = wn.collect_cycles_data(since)
|
||||
assert result["total"] == 0
|
||||
assert result["successes"] == 0
|
||||
assert result["failures"] == 0
|
||||
|
||||
def test_collect_recent_cycles(self, tmp_path):
|
||||
"""Collect cycles within lookback period."""
|
||||
retro_dir = tmp_path / ".loop" / "retro"
|
||||
retro_dir.mkdir(parents=True)
|
||||
|
||||
now = datetime.now(UTC)
|
||||
cycles = [
|
||||
{"timestamp": now.isoformat(), "success": True, "cycle": 1},
|
||||
{"timestamp": now.isoformat(), "success": False, "cycle": 2},
|
||||
{"timestamp": (now - timedelta(days=10)).isoformat(), "success": True, "cycle": 3},
|
||||
]
|
||||
|
||||
with open(retro_dir / "cycles.jsonl", "w") as f:
|
||||
for c in cycles:
|
||||
f.write(json.dumps(c) + "\n")
|
||||
|
||||
with patch.object(wn, "REPO_ROOT", tmp_path):
|
||||
since = now - timedelta(days=7)
|
||||
result = wn.collect_cycles_data(since)
|
||||
assert result["total"] == 2 # Only recent 2
|
||||
assert result["successes"] == 1
|
||||
assert result["failures"] == 1
|
||||
|
||||
|
||||
class TestExtractThemes:
|
||||
"""Test theme extraction from issues."""
|
||||
|
||||
def test_extract_layer_labels(self):
|
||||
"""Extract layer labels from issues."""
|
||||
issues = [
|
||||
{"labels": [{"name": "layer:triage"}, {"name": "bug"}]},
|
||||
{"labels": [{"name": "layer:tests"}, {"name": "bug"}]},
|
||||
{"labels": [{"name": "layer:triage"}, {"name": "feature"}]},
|
||||
]
|
||||
|
||||
result = wn.extract_themes(issues)
|
||||
|
||||
assert len(result["layers"]) == 2
|
||||
layer_names = {layer["name"] for layer in result["layers"]}
|
||||
assert "triage" in layer_names
|
||||
assert "tests" in layer_names
|
||||
|
||||
def test_extract_type_labels(self):
|
||||
"""Extract type labels (bug/feature/etc)."""
|
||||
issues = [
|
||||
{"labels": [{"name": "bug"}]},
|
||||
{"labels": [{"name": "feature"}]},
|
||||
{"labels": [{"name": "bug"}]},
|
||||
]
|
||||
|
||||
result = wn.extract_themes(issues)
|
||||
|
||||
type_names = {t_type["name"] for t_type in result["types"]}
|
||||
assert "bug" in type_names
|
||||
assert "feature" in type_names
|
||||
|
||||
def test_empty_issues(self):
|
||||
"""Handle empty issue list."""
|
||||
result = wn.extract_themes([])
|
||||
assert result["layers"] == []
|
||||
assert result["types"] == []
|
||||
assert result["top_labels"] == []
|
||||
|
||||
|
||||
class TestExtractAgentContributions:
|
||||
"""Test agent contribution extraction."""
|
||||
|
||||
def test_extract_assignees(self):
|
||||
"""Extract assignee counts."""
|
||||
issues = [
|
||||
{"assignee": {"login": "kimi"}},
|
||||
{"assignee": {"login": "hermes"}},
|
||||
{"assignee": {"login": "kimi"}},
|
||||
]
|
||||
|
||||
result = wn.extract_agent_contributions(issues, [], [])
|
||||
|
||||
assert len(result["active_assignees"]) == 2
|
||||
assignee_logins = {a["login"] for a in result["active_assignees"]} # noqa: E741
|
||||
assert "kimi" in assignee_logins
|
||||
assert "hermes" in assignee_logins
|
||||
|
||||
def test_extract_pr_authors(self):
|
||||
"""Extract PR author counts."""
|
||||
prs = [
|
||||
{"user": {"login": "kimi"}},
|
||||
{"user": {"login": "claude"}},
|
||||
{"user": {"login": "kimi"}},
|
||||
]
|
||||
|
||||
result = wn.extract_agent_contributions([], prs, [])
|
||||
|
||||
assert len(result["pr_authors"]) == 2
|
||||
|
||||
def test_kimi_mentions_in_cycles(self):
|
||||
"""Count Kimi mentions in cycle notes."""
|
||||
cycles = [
|
||||
{"notes": "Kimi did great work", "reason": ""},
|
||||
{"notes": "", "reason": "Kimi timeout"},
|
||||
{"notes": "All good", "reason": ""},
|
||||
]
|
||||
|
||||
result = wn.extract_agent_contributions([], [], cycles)
|
||||
assert result["kimi_mentioned_cycles"] == 2
|
||||
|
||||
|
||||
class TestAnalyzeTestShifts:
|
||||
"""Test test pattern analysis."""
|
||||
|
||||
def test_no_cycles(self):
|
||||
"""Handle no cycle data."""
|
||||
result = wn.analyze_test_shifts([])
|
||||
assert "note" in result
|
||||
|
||||
def test_test_metrics(self):
|
||||
"""Calculate test metrics from cycles."""
|
||||
cycles = [
|
||||
{"tests_passed": 100, "tests_added": 5},
|
||||
{"tests_passed": 150, "tests_added": 3},
|
||||
]
|
||||
|
||||
result = wn.analyze_test_shifts(cycles)
|
||||
|
||||
assert result["total_tests_passed"] == 250
|
||||
assert result["total_tests_added"] == 8
|
||||
|
||||
|
||||
class TestGenerateVibeSummary:
|
||||
"""Test vibe summary generation."""
|
||||
|
||||
def test_productive_vibe(self):
|
||||
"""High success rate and activity = productive vibe."""
|
||||
cycles_data = {"success_rate": 0.95, "successes": 10, "failures": 1}
|
||||
issues_data = {"closed_count": 5}
|
||||
|
||||
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
|
||||
|
||||
assert result["overall"] == "productive"
|
||||
assert "strong week" in result["description"].lower()
|
||||
|
||||
def test_struggling_vibe(self):
|
||||
"""More failures than successes = struggling vibe."""
|
||||
cycles_data = {"success_rate": 0.3, "successes": 3, "failures": 7}
|
||||
issues_data = {"closed_count": 0}
|
||||
|
||||
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
|
||||
|
||||
assert result["overall"] == "struggling"
|
||||
|
||||
def test_quiet_vibe(self):
|
||||
"""Low activity = quiet vibe."""
|
||||
cycles_data = {"success_rate": 0.0, "successes": 0, "failures": 0}
|
||||
issues_data = {"closed_count": 0}
|
||||
|
||||
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
|
||||
|
||||
assert result["overall"] == "quiet"
|
||||
|
||||
|
||||
class TestGenerateMarkdownSummary:
|
||||
"""Test markdown summary generation."""
|
||||
|
||||
def test_includes_header(self):
|
||||
"""Markdown includes header."""
|
||||
narrative = {
|
||||
"period": {"start": "2026-03-14T00:00:00", "end": "2026-03-21T00:00:00"},
|
||||
"vibe": {"overall": "productive", "description": "Good week"},
|
||||
"activity": {
|
||||
"cycles": {"total": 10, "successes": 9, "failures": 1},
|
||||
"issues": {"closed": 5, "opened": 3},
|
||||
"pull_requests": {"merged": 4, "opened": 2},
|
||||
},
|
||||
}
|
||||
|
||||
result = wn.generate_markdown_summary(narrative)
|
||||
|
||||
assert "# Weekly Narrative Summary" in result
|
||||
assert "productive" in result.lower()
|
||||
assert "10 total" in result or "10" in result
|
||||
|
||||
def test_includes_focus_areas(self):
|
||||
"""Markdown includes focus areas when present."""
|
||||
narrative = {
|
||||
"period": {"start": "2026-03-14", "end": "2026-03-21"},
|
||||
"vibe": {
|
||||
"overall": "productive",
|
||||
"description": "Good week",
|
||||
"focus_areas": ["triage (5 items)", "tests (3 items)"],
|
||||
},
|
||||
"activity": {
|
||||
"cycles": {"total": 0, "successes": 0, "failures": 0},
|
||||
"issues": {"closed": 0, "opened": 0},
|
||||
"pull_requests": {"merged": 0, "opened": 0},
|
||||
},
|
||||
}
|
||||
|
||||
result = wn.generate_markdown_summary(narrative)
|
||||
|
||||
assert "Focus Areas" in result
|
||||
assert "triage" in result
|
||||
|
||||
|
||||
class TestConfigLoading:
|
||||
"""Test configuration loading."""
|
||||
|
||||
def test_default_config(self, tmp_path):
|
||||
"""Default config when manifest missing."""
|
||||
with patch.object(wn, "CONFIG_PATH", tmp_path / "nonexistent.json"):
|
||||
config = wn.load_automation_config()
|
||||
assert config["lookback_days"] == 7
|
||||
assert config["enabled"] is True
|
||||
|
||||
def test_environment_override(self, tmp_path):
|
||||
"""Environment variables override config."""
|
||||
with patch.dict("os.environ", {"TIMMY_WEEKLY_NARRATIVE_ENABLED": "false"}):
|
||||
with patch.object(wn, "CONFIG_PATH", tmp_path / "nonexistent.json"):
|
||||
config = wn.load_automation_config()
|
||||
assert config["enabled"] is False
|
||||
|
||||
|
||||
class TestMain:
|
||||
"""Test main function."""
|
||||
|
||||
def test_disabled_exits_cleanly(self, tmp_path):
|
||||
"""When disabled and no --force, exits cleanly."""
|
||||
with patch.object(wn, "REPO_ROOT", tmp_path):
|
||||
with patch.object(wn, "load_automation_config", return_value={"enabled": False}):
|
||||
with patch("sys.argv", ["weekly_narrative"]):
|
||||
result = wn.main()
|
||||
assert result == 0
|
||||
|
||||
def test_force_runs_when_disabled(self, tmp_path):
|
||||
"""--force runs even when disabled."""
|
||||
# Setup minimal structure
|
||||
(tmp_path / ".loop" / "retro").mkdir(parents=True)
|
||||
|
||||
with patch.object(wn, "REPO_ROOT", tmp_path):
|
||||
with patch.object(
|
||||
wn,
|
||||
"load_automation_config",
|
||||
return_value={
|
||||
"enabled": False,
|
||||
"lookback_days": 7,
|
||||
"gitea_api": "http://localhost:3000/api/v1",
|
||||
"repo_slug": "test/repo",
|
||||
"token_file": "~/.hermes/gitea_token",
|
||||
},
|
||||
):
|
||||
with patch.object(wn, "GiteaClient") as mock_client:
|
||||
mock_instance = MagicMock()
|
||||
mock_instance.is_available.return_value = False
|
||||
mock_client.return_value = mock_instance
|
||||
|
||||
with patch("sys.argv", ["weekly_narrative", "--force"]):
|
||||
result = wn.main()
|
||||
# Should complete without error even though Gitea unavailable
|
||||
assert result == 0
|
||||
|
||||
|
||||
class TestGiteaClient:
|
||||
"""Test Gitea API client."""
|
||||
|
||||
def test_is_available_when_unavailable(self):
|
||||
"""is_available returns False when server down."""
|
||||
config = {"gitea_api": "http://localhost:99999", "repo_slug": "test/repo"}
|
||||
client = wn.GiteaClient(config, None)
|
||||
|
||||
# Should return False without raising
|
||||
assert client.is_available() is False
|
||||
|
||||
def test_headers_with_token(self):
|
||||
"""Headers include Authorization when token provided."""
|
||||
config = {"gitea_api": "http://localhost:3000", "repo_slug": "test/repo"}
|
||||
client = wn.GiteaClient(config, "test-token")
|
||||
|
||||
headers = client._headers()
|
||||
assert headers["Authorization"] == "token test-token"
|
||||
|
||||
def test_headers_without_token(self):
|
||||
"""Headers don't include Authorization when no token."""
|
||||
config = {"gitea_api": "http://localhost:3000", "repo_slug": "test/repo"}
|
||||
client = wn.GiteaClient(config, None)
|
||||
|
||||
headers = client._headers()
|
||||
assert "Authorization" not in headers
|
||||
294
tests/unit/test_stress_detector.py
Normal file
294
tests/unit/test_stress_detector.py
Normal file
@@ -0,0 +1,294 @@
|
||||
"""Unit tests for the stress detector module.
|
||||
|
||||
Tests stress signal calculation, mode detection, multipliers,
|
||||
and integration with the quest system.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.stress_detector import (
|
||||
StressMode,
|
||||
StressSignal,
|
||||
StressSnapshot,
|
||||
StressThresholds,
|
||||
_calculate_stress_score,
|
||||
_get_multipliers_for_mode,
|
||||
apply_multiplier,
|
||||
get_default_config,
|
||||
reset_stress_state,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def clean_stress_state():
|
||||
"""Reset stress state between tests."""
|
||||
reset_stress_state()
|
||||
yield
|
||||
reset_stress_state()
|
||||
|
||||
|
||||
# ── Stress Mode Tests ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStressMode:
|
||||
def test_stress_mode_values(self):
|
||||
"""StressMode enum has expected values."""
|
||||
assert StressMode.CALM.value == "calm"
|
||||
assert StressMode.ELEVATED.value == "elevated"
|
||||
assert StressMode.HIGH.value == "high"
|
||||
|
||||
|
||||
# ── Stress Signal Tests ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStressSignal:
|
||||
def test_signal_not_triggered(self):
|
||||
"""Signal with value below threshold is not triggered."""
|
||||
signal = StressSignal(
|
||||
name="test_signal",
|
||||
value=5.0,
|
||||
threshold=10.0,
|
||||
weight=0.5,
|
||||
)
|
||||
assert not signal.is_triggered
|
||||
assert signal.contribution == 0.0
|
||||
|
||||
def test_signal_triggered(self):
|
||||
"""Signal with value at threshold is triggered."""
|
||||
signal = StressSignal(
|
||||
name="test_signal",
|
||||
value=10.0,
|
||||
threshold=10.0,
|
||||
weight=0.5,
|
||||
)
|
||||
assert signal.is_triggered
|
||||
assert signal.contribution == 0.5 # weight * min(1, value/threshold)
|
||||
|
||||
def test_signal_contribution_capped(self):
|
||||
"""Signal contribution is capped at weight when value >> threshold."""
|
||||
signal = StressSignal(
|
||||
name="test_signal",
|
||||
value=100.0,
|
||||
threshold=10.0,
|
||||
weight=0.5,
|
||||
)
|
||||
assert signal.is_triggered
|
||||
assert signal.contribution == 0.5 # Capped at weight
|
||||
|
||||
def test_signal_partial_contribution(self):
|
||||
"""Signal contribution scales with value/threshold ratio."""
|
||||
signal = StressSignal(
|
||||
name="test_signal",
|
||||
value=15.0,
|
||||
threshold=10.0,
|
||||
weight=0.5,
|
||||
)
|
||||
assert signal.is_triggered
|
||||
# contribution = min(1, 15/10) * 0.5 = 0.5 (capped)
|
||||
assert signal.contribution == 0.5
|
||||
|
||||
|
||||
# ── Stress Thresholds Tests ────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStressThresholds:
|
||||
def test_calm_mode(self):
|
||||
"""Score below elevated_min returns CALM mode."""
|
||||
thresholds = StressThresholds(elevated_min=0.3, high_min=0.6)
|
||||
assert thresholds.get_mode_for_score(0.0) == StressMode.CALM
|
||||
assert thresholds.get_mode_for_score(0.1) == StressMode.CALM
|
||||
assert thresholds.get_mode_for_score(0.29) == StressMode.CALM
|
||||
|
||||
def test_elevated_mode(self):
|
||||
"""Score between elevated_min and high_min returns ELEVATED mode."""
|
||||
thresholds = StressThresholds(elevated_min=0.3, high_min=0.6)
|
||||
assert thresholds.get_mode_for_score(0.3) == StressMode.ELEVATED
|
||||
assert thresholds.get_mode_for_score(0.5) == StressMode.ELEVATED
|
||||
assert thresholds.get_mode_for_score(0.59) == StressMode.ELEVATED
|
||||
|
||||
def test_high_mode(self):
|
||||
"""Score at or above high_min returns HIGH mode."""
|
||||
thresholds = StressThresholds(elevated_min=0.3, high_min=0.6)
|
||||
assert thresholds.get_mode_for_score(0.6) == StressMode.HIGH
|
||||
assert thresholds.get_mode_for_score(0.8) == StressMode.HIGH
|
||||
assert thresholds.get_mode_for_score(1.0) == StressMode.HIGH
|
||||
|
||||
|
||||
# ── Stress Score Calculation Tests ─────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStressScoreCalculation:
|
||||
def test_empty_signals(self):
|
||||
"""Empty signal list returns zero stress score."""
|
||||
score = _calculate_stress_score([])
|
||||
assert score == 0.0
|
||||
|
||||
def test_no_triggered_signals(self):
|
||||
"""No triggered signals means zero stress score."""
|
||||
signals = [
|
||||
StressSignal(name="s1", value=1.0, threshold=10.0, weight=0.5),
|
||||
StressSignal(name="s2", value=2.0, threshold=10.0, weight=0.5),
|
||||
]
|
||||
score = _calculate_stress_score(signals)
|
||||
assert score == 0.0
|
||||
|
||||
def test_single_triggered_signal(self):
|
||||
"""Single triggered signal contributes its weight."""
|
||||
signals = [
|
||||
StressSignal(name="s1", value=10.0, threshold=10.0, weight=0.5),
|
||||
]
|
||||
score = _calculate_stress_score(signals)
|
||||
# contribution = 0.5, total_weight = 0.5, score = 0.5/0.5 = 1.0
|
||||
assert score == 1.0
|
||||
|
||||
def test_mixed_signals(self):
|
||||
"""Mix of triggered and non-triggered signals."""
|
||||
signals = [
|
||||
StressSignal(name="s1", value=10.0, threshold=10.0, weight=0.3),
|
||||
StressSignal(name="s2", value=1.0, threshold=10.0, weight=0.3),
|
||||
StressSignal(name="s3", value=10.0, threshold=10.0, weight=0.4),
|
||||
]
|
||||
score = _calculate_stress_score(signals)
|
||||
# triggered contributions: 0.3 + 0.4 = 0.7
|
||||
# total_weight: 0.3 + 0.3 + 0.4 = 1.0
|
||||
# score = 0.7 / 1.0 = 0.7
|
||||
assert score == 0.7
|
||||
|
||||
def test_score_capped_at_one(self):
|
||||
"""Stress score is capped at 1.0."""
|
||||
signals = [
|
||||
StressSignal(name="s1", value=100.0, threshold=10.0, weight=1.0),
|
||||
StressSignal(name="s2", value=100.0, threshold=10.0, weight=1.0),
|
||||
]
|
||||
score = _calculate_stress_score(signals)
|
||||
assert score == 1.0 # Capped
|
||||
|
||||
|
||||
# ── Multiplier Tests ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestMultipliers:
|
||||
def test_default_config_structure(self):
|
||||
"""Default config has expected structure."""
|
||||
config = get_default_config()
|
||||
assert "thresholds" in config
|
||||
assert "signals" in config
|
||||
assert "multipliers" in config
|
||||
|
||||
def test_calm_mode_multipliers(self):
|
||||
"""Calm mode has expected multipliers."""
|
||||
multipliers = _get_multipliers_for_mode(StressMode.CALM)
|
||||
assert multipliers["test_improve"] == 1.0
|
||||
assert multipliers["docs_update"] == 1.2
|
||||
assert multipliers["exploration"] == 1.3
|
||||
assert multipliers["refactor"] == 1.2
|
||||
|
||||
def test_elevated_mode_multipliers(self):
|
||||
"""Elevated mode has expected multipliers."""
|
||||
multipliers = _get_multipliers_for_mode(StressMode.ELEVATED)
|
||||
assert multipliers["test_improve"] == 1.2
|
||||
assert multipliers["issue_reduce"] == 1.1
|
||||
assert multipliers["refactor"] == 0.9
|
||||
|
||||
def test_high_mode_multipliers(self):
|
||||
"""High stress mode has expected multipliers."""
|
||||
multipliers = _get_multipliers_for_mode(StressMode.HIGH)
|
||||
assert multipliers["test_improve"] == 1.5
|
||||
assert multipliers["issue_reduce"] == 1.4
|
||||
assert multipliers["exploration"] == 0.7
|
||||
assert multipliers["refactor"] == 0.6
|
||||
|
||||
def test_multiplier_fallback_for_unknown_type(self):
|
||||
"""Unknown quest types return default multiplier of 1.0."""
|
||||
multipliers = _get_multipliers_for_mode(StressMode.CALM)
|
||||
assert multipliers.get("unknown_type", 1.0) == 1.0
|
||||
|
||||
|
||||
# ── Apply Multiplier Tests ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestApplyMultiplier:
|
||||
def test_apply_multiplier_calm(self):
|
||||
"""Multiplier applies correctly in calm mode."""
|
||||
# This test uses get_multiplier which reads from current stress mode
|
||||
# Since we can't easily mock the stress mode, we test the apply_multiplier logic
|
||||
base = 100
|
||||
# In calm mode with test_improve = 1.0
|
||||
result = apply_multiplier(base, "unknown_type")
|
||||
assert result >= 1 # At least 1 token
|
||||
|
||||
def test_apply_multiplier_minimum_one(self):
|
||||
"""Applied reward is at least 1 token."""
|
||||
# Even with very low multiplier, result should be >= 1
|
||||
result = apply_multiplier(1, "any_type")
|
||||
assert result >= 1
|
||||
|
||||
|
||||
# ── Stress Snapshot Tests ──────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStressSnapshot:
|
||||
def test_snapshot_to_dict(self):
|
||||
"""Snapshot can be converted to dictionary."""
|
||||
signals = [
|
||||
StressSignal(name="test", value=10.0, threshold=5.0, weight=0.5),
|
||||
]
|
||||
snapshot = StressSnapshot(
|
||||
mode=StressMode.ELEVATED,
|
||||
score=0.5,
|
||||
signals=signals,
|
||||
multipliers={"test_improve": 1.2},
|
||||
)
|
||||
|
||||
data = snapshot.to_dict()
|
||||
assert data["mode"] == "elevated"
|
||||
assert data["score"] == 0.5
|
||||
assert len(data["signals"]) == 1
|
||||
assert data["multipliers"]["test_improve"] == 1.2
|
||||
|
||||
|
||||
# ── Integration Tests ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStressDetectorIntegration:
|
||||
def test_reset_stress_state(self):
|
||||
"""Reset clears internal state."""
|
||||
# Just verify reset doesn't error
|
||||
reset_stress_state()
|
||||
|
||||
def test_default_config_contains_all_signals(self):
|
||||
"""Default config defines all expected signals."""
|
||||
config = get_default_config()
|
||||
signals = config["signals"]
|
||||
|
||||
expected_signals = [
|
||||
"flaky_test_rate",
|
||||
"p1_backlog_growth",
|
||||
"ci_failure_rate",
|
||||
"open_bug_count",
|
||||
]
|
||||
|
||||
for signal in expected_signals:
|
||||
assert signal in signals
|
||||
assert "threshold" in signals[signal]
|
||||
assert "weight" in signals[signal]
|
||||
|
||||
def test_default_config_contains_all_modes(self):
|
||||
"""Default config defines all stress modes."""
|
||||
config = get_default_config()
|
||||
multipliers = config["multipliers"]
|
||||
|
||||
assert "calm" in multipliers
|
||||
assert "elevated" in multipliers
|
||||
assert "high" in multipliers
|
||||
|
||||
def test_multiplier_weights_sum_approximately_one(self):
|
||||
"""Signal weights should approximately sum to 1.0."""
|
||||
config = get_default_config()
|
||||
signals = config["signals"]
|
||||
|
||||
total_weight = sum(s["weight"] for s in signals.values())
|
||||
# Allow some flexibility but should be close to 1.0
|
||||
assert 0.9 <= total_weight <= 1.1
|
||||
@@ -228,27 +228,6 @@
|
||||
"max_items": 5
|
||||
},
|
||||
"outputs": []
|
||||
},
|
||||
{
|
||||
"id": "weekly_narrative",
|
||||
"name": "Weekly Narrative Summary",
|
||||
"description": "Generates a human-readable weekly summary of work themes, agent contributions, and token economy shifts",
|
||||
"script": "timmy_automations/daily_run/weekly_narrative.py",
|
||||
"category": "daily_run",
|
||||
"enabled": true,
|
||||
"trigger": "scheduled",
|
||||
"schedule": "weekly",
|
||||
"executable": "python3",
|
||||
"config": {
|
||||
"lookback_days": 7,
|
||||
"output_file": ".loop/weekly_narrative.json",
|
||||
"gitea_api": "http://localhost:3000/api/v1",
|
||||
"repo_slug": "rockachopa/Timmy-time-dashboard"
|
||||
},
|
||||
"outputs": [
|
||||
".loop/weekly_narrative.json",
|
||||
".loop/weekly_narrative.md"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -17,10 +17,6 @@
|
||||
"manual": {
|
||||
"description": "Run on-demand only",
|
||||
"automations": ["agent_workspace", "kimi_bootstrap", "kimi_resume", "backfill_retro"]
|
||||
},
|
||||
"weekly": {
|
||||
"description": "Run once per week (Sundays)",
|
||||
"automations": ["weekly_narrative"]
|
||||
}
|
||||
},
|
||||
"triggers": {
|
||||
|
||||
@@ -1,745 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Weekly narrative summary generator — human-readable loop analysis.
|
||||
|
||||
Analyzes the past week's activity across the development loop to produce
|
||||
a narrative summary of:
|
||||
- What changed (themes, areas of focus)
|
||||
- How agents and Timmy contributed
|
||||
- Any shifts in tests, triage, or token economy
|
||||
|
||||
The output is designed to be skimmable — a quick read that gives context
|
||||
on the week's progress without drowning in metrics.
|
||||
|
||||
Run: python3 timmy_automations/daily_run/weekly_narrative.py [--json]
|
||||
Env: See timmy_automations/config/automations.json for configuration
|
||||
|
||||
Refs: #719
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from collections import Counter
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
# ── Configuration ─────────────────────────────────────────────────────────
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
|
||||
CONFIG_PATH = Path(__file__).parent.parent / "config" / "automations.json"
|
||||
|
||||
DEFAULT_CONFIG = {
|
||||
"gitea_api": "http://localhost:3000/api/v1",
|
||||
"repo_slug": "rockachopa/Timmy-time-dashboard",
|
||||
"token_file": "~/.hermes/gitea_token",
|
||||
"lookback_days": 7,
|
||||
"output_file": ".loop/weekly_narrative.json",
|
||||
"enabled": True,
|
||||
}
|
||||
|
||||
|
||||
# ── Data Loading ───────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def load_automation_config() -> dict:
|
||||
"""Load configuration for weekly_narrative from automations manifest."""
|
||||
config = DEFAULT_CONFIG.copy()
|
||||
if CONFIG_PATH.exists():
|
||||
try:
|
||||
manifest = json.loads(CONFIG_PATH.read_text())
|
||||
for auto in manifest.get("automations", []):
|
||||
if auto.get("id") == "weekly_narrative":
|
||||
config.update(auto.get("config", {}))
|
||||
config["enabled"] = auto.get("enabled", True)
|
||||
break
|
||||
except (json.JSONDecodeError, OSError) as exc:
|
||||
print(f"[weekly_narrative] Warning: Could not load config: {exc}", file=sys.stderr)
|
||||
|
||||
# Environment variable overrides
|
||||
if os.environ.get("TIMMY_GITEA_API"):
|
||||
config["gitea_api"] = os.environ.get("TIMMY_GITEA_API")
|
||||
if os.environ.get("TIMMY_REPO_SLUG"):
|
||||
config["repo_slug"] = os.environ.get("TIMMY_REPO_SLUG")
|
||||
if os.environ.get("TIMMY_GITEA_TOKEN"):
|
||||
config["token"] = os.environ.get("TIMMY_GITEA_TOKEN")
|
||||
if os.environ.get("TIMMY_WEEKLY_NARRATIVE_ENABLED"):
|
||||
config["enabled"] = os.environ.get("TIMMY_WEEKLY_NARRATIVE_ENABLED", "true").lower() == "true"
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def get_token(config: dict) -> str | None:
|
||||
"""Get Gitea token from environment or file."""
|
||||
if "token" in config:
|
||||
return config["token"]
|
||||
|
||||
token_file = Path(config["token_file"]).expanduser()
|
||||
if token_file.exists():
|
||||
return token_file.read_text().strip()
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def load_jsonl(path: Path) -> list[dict]:
|
||||
"""Load a JSONL file, skipping bad lines."""
|
||||
if not path.exists():
|
||||
return []
|
||||
entries = []
|
||||
for line in path.read_text().strip().splitlines():
|
||||
try:
|
||||
entries.append(json.loads(line))
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
return entries
|
||||
|
||||
|
||||
def parse_ts(ts_str: str) -> datetime | None:
|
||||
"""Parse an ISO timestamp, tolerating missing tz."""
|
||||
if not ts_str:
|
||||
return None
|
||||
try:
|
||||
dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=UTC)
|
||||
return dt
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
|
||||
# ── Gitea API Client ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
"""Simple Gitea API client with graceful degradation."""
|
||||
|
||||
def __init__(self, config: dict, token: str | None):
|
||||
self.api_base = config["gitea_api"].rstrip("/")
|
||||
self.repo_slug = config["repo_slug"]
|
||||
self.token = token
|
||||
self._available: bool | None = None
|
||||
|
||||
def _headers(self) -> dict:
|
||||
headers = {"Accept": "application/json"}
|
||||
if self.token:
|
||||
headers["Authorization"] = f"token {self.token}"
|
||||
return headers
|
||||
|
||||
def _api_url(self, path: str) -> str:
|
||||
return f"{self.api_base}/repos/{self.repo_slug}/{path}"
|
||||
|
||||
def is_available(self) -> bool:
|
||||
"""Check if Gitea API is reachable."""
|
||||
if self._available is not None:
|
||||
return self._available
|
||||
|
||||
try:
|
||||
req = Request(
|
||||
f"{self.api_base}/version",
|
||||
headers=self._headers(),
|
||||
method="GET",
|
||||
)
|
||||
with urlopen(req, timeout=5) as resp:
|
||||
self._available = resp.status == 200
|
||||
return self._available
|
||||
except (HTTPError, URLError, TimeoutError):
|
||||
self._available = False
|
||||
return False
|
||||
|
||||
def get_paginated(self, path: str, params: dict | None = None) -> list:
|
||||
"""Fetch all pages of a paginated endpoint."""
|
||||
all_items = []
|
||||
page = 1
|
||||
limit = 50
|
||||
|
||||
while True:
|
||||
url = self._api_url(path)
|
||||
query_parts = [f"limit={limit}", f"page={page}"]
|
||||
if params:
|
||||
for key, val in params.items():
|
||||
query_parts.append(f"{key}={val}")
|
||||
url = f"{url}?{'&'.join(query_parts)}"
|
||||
|
||||
req = Request(url, headers=self._headers(), method="GET")
|
||||
with urlopen(req, timeout=15) as resp:
|
||||
batch = json.loads(resp.read())
|
||||
|
||||
if not batch:
|
||||
break
|
||||
|
||||
all_items.extend(batch)
|
||||
if len(batch) < limit:
|
||||
break
|
||||
page += 1
|
||||
|
||||
return all_items
|
||||
|
||||
|
||||
# ── Data Collection ────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def collect_cycles_data(since: datetime) -> dict:
|
||||
"""Load cycle retrospective data from the lookback period."""
|
||||
cycles_file = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
|
||||
if not cycles_file.exists():
|
||||
return {"cycles": [], "total": 0, "successes": 0, "failures": 0}
|
||||
|
||||
entries = load_jsonl(cycles_file)
|
||||
recent = []
|
||||
for e in entries:
|
||||
ts = parse_ts(e.get("timestamp", ""))
|
||||
if ts and ts >= since:
|
||||
recent.append(e)
|
||||
|
||||
successes = [e for e in recent if e.get("success")]
|
||||
failures = [e for e in recent if not e.get("success")]
|
||||
|
||||
return {
|
||||
"cycles": recent,
|
||||
"total": len(recent),
|
||||
"successes": len(successes),
|
||||
"failures": len(failures),
|
||||
"success_rate": round(len(successes) / len(recent), 2) if recent else 0,
|
||||
}
|
||||
|
||||
|
||||
def collect_issues_data(client: GiteaClient, since: datetime) -> dict:
|
||||
"""Collect issue activity from Gitea."""
|
||||
if not client.is_available():
|
||||
return {"error": "Gitea unavailable", "issues": [], "closed": [], "opened": []}
|
||||
|
||||
try:
|
||||
issues = client.get_paginated("issues", {"state": "all", "sort": "updated", "limit": 100})
|
||||
except (HTTPError, URLError) as exc:
|
||||
return {"error": str(exc), "issues": [], "closed": [], "opened": []}
|
||||
|
||||
touched = []
|
||||
closed = []
|
||||
opened = []
|
||||
|
||||
for issue in issues:
|
||||
updated_at = issue.get("updated_at", "")
|
||||
created_at = issue.get("created_at", "")
|
||||
|
||||
updated = parse_ts(updated_at)
|
||||
created = parse_ts(created_at)
|
||||
|
||||
if updated and updated >= since:
|
||||
touched.append(issue)
|
||||
|
||||
if issue.get("state") == "closed":
|
||||
closed_at = issue.get("closed_at", "")
|
||||
closed_dt = parse_ts(closed_at)
|
||||
if closed_dt and closed_dt >= since:
|
||||
closed.append(issue)
|
||||
elif created and created >= since:
|
||||
opened.append(issue)
|
||||
|
||||
return {
|
||||
"issues": touched,
|
||||
"closed": closed,
|
||||
"opened": opened,
|
||||
"touched_count": len(touched),
|
||||
"closed_count": len(closed),
|
||||
"opened_count": len(opened),
|
||||
}
|
||||
|
||||
|
||||
def collect_prs_data(client: GiteaClient, since: datetime) -> dict:
|
||||
"""Collect PR activity from Gitea."""
|
||||
if not client.is_available():
|
||||
return {"error": "Gitea unavailable", "prs": [], "merged": [], "opened": []}
|
||||
|
||||
try:
|
||||
prs = client.get_paginated("pulls", {"state": "all", "sort": "updated", "limit": 100})
|
||||
except (HTTPError, URLError) as exc:
|
||||
return {"error": str(exc), "prs": [], "merged": [], "opened": []}
|
||||
|
||||
touched = []
|
||||
merged = []
|
||||
opened = []
|
||||
|
||||
for pr in prs:
|
||||
updated_at = pr.get("updated_at", "")
|
||||
created_at = pr.get("created_at", "")
|
||||
merged_at = pr.get("merged_at", "")
|
||||
|
||||
updated = parse_ts(updated_at)
|
||||
created = parse_ts(created_at)
|
||||
merged_dt = parse_ts(merged_at) if merged_at else None
|
||||
|
||||
if updated and updated >= since:
|
||||
touched.append(pr)
|
||||
|
||||
if pr.get("merged") and merged_dt and merged_dt >= since:
|
||||
merged.append(pr)
|
||||
elif created and created >= since:
|
||||
opened.append(pr)
|
||||
|
||||
return {
|
||||
"prs": touched,
|
||||
"merged": merged,
|
||||
"opened": opened,
|
||||
"touched_count": len(touched),
|
||||
"merged_count": len(merged),
|
||||
"opened_count": len(opened),
|
||||
}
|
||||
|
||||
|
||||
def collect_triage_data(since: datetime) -> dict:
|
||||
"""Load triage and introspection data."""
|
||||
triage_file = REPO_ROOT / ".loop" / "retro" / "triage.jsonl"
|
||||
insights_file = REPO_ROOT / ".loop" / "retro" / "insights.json"
|
||||
|
||||
triage_entries = load_jsonl(triage_file)
|
||||
recent_triage = [
|
||||
e for e in triage_entries
|
||||
if parse_ts(e.get("timestamp", "")) and parse_ts(e.get("timestamp", "")) >= since
|
||||
]
|
||||
|
||||
insights = {}
|
||||
if insights_file.exists():
|
||||
try:
|
||||
insights = json.loads(insights_file.read_text())
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
|
||||
return {
|
||||
"triage_runs": len(recent_triage),
|
||||
"triage_entries": recent_triage,
|
||||
"latest_insights": insights,
|
||||
}
|
||||
|
||||
|
||||
def collect_token_data(since: datetime) -> dict:
|
||||
"""Load token economy data from the lightning ledger."""
|
||||
# The ledger is in-memory but we can look for any persisted data
|
||||
# For now, return placeholder that will be filled by the ledger module
|
||||
return {
|
||||
"note": "Token economy data is ephemeral — check dashboard for live metrics",
|
||||
"balance_sats": 0, # Placeholder
|
||||
"transactions_week": 0,
|
||||
}
|
||||
|
||||
|
||||
# ── Analysis Functions ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def extract_themes(issues: list[dict]) -> list[dict]:
|
||||
"""Extract themes from issue labels."""
|
||||
label_counts = Counter()
|
||||
layer_counts = Counter()
|
||||
type_counts = Counter()
|
||||
|
||||
for issue in issues:
|
||||
for label in issue.get("labels", []):
|
||||
name = label.get("name", "")
|
||||
label_counts[name] += 1
|
||||
|
||||
if name.startswith("layer:"):
|
||||
layer_counts[name.replace("layer:", "")] += 1
|
||||
if name in ("bug", "feature", "refactor", "docs", "test", "chore"):
|
||||
type_counts[name] += 1
|
||||
|
||||
# Top themes (labels excluding layer prefixes)
|
||||
themes = [
|
||||
{"name": name, "count": count}
|
||||
for name, count in label_counts.most_common(10)
|
||||
if not name.startswith(("layer:", "size:"))
|
||||
]
|
||||
|
||||
# Layers
|
||||
layers = [
|
||||
{"name": name, "count": count}
|
||||
for name, count in layer_counts.most_common()
|
||||
]
|
||||
|
||||
# Types
|
||||
types = [
|
||||
{"name": name, "count": count}
|
||||
for name, count in type_counts.most_common()
|
||||
]
|
||||
|
||||
return {
|
||||
"top_labels": themes,
|
||||
"layers": layers,
|
||||
"types": types,
|
||||
}
|
||||
|
||||
|
||||
def extract_agent_contributions(issues: list[dict], prs: list[dict], cycles: list[dict]) -> dict:
|
||||
"""Extract agent contribution patterns."""
|
||||
# Count by assignee
|
||||
assignee_counts = Counter()
|
||||
for issue in issues:
|
||||
assignee = issue.get("assignee")
|
||||
if assignee and isinstance(assignee, dict):
|
||||
assignee_counts[assignee.get("login", "unknown")] += 1
|
||||
|
||||
# Count PR authors
|
||||
pr_authors = Counter()
|
||||
for pr in prs:
|
||||
user = pr.get("user")
|
||||
if user and isinstance(user, dict):
|
||||
pr_authors[user.get("login", "unknown")] += 1
|
||||
|
||||
# Check for Kimi mentions in cycle notes
|
||||
kimi_mentions = sum(
|
||||
1 for c in cycles
|
||||
if "kimi" in c.get("notes", "").lower() or "kimi" in c.get("reason", "").lower()
|
||||
)
|
||||
|
||||
return {
|
||||
"active_assignees": [
|
||||
{"login": login, "issues_count": count}
|
||||
for login, count in assignee_counts.most_common()
|
||||
],
|
||||
"pr_authors": [
|
||||
{"login": login, "prs_count": count}
|
||||
for login, count in pr_authors.most_common()
|
||||
],
|
||||
"kimi_mentioned_cycles": kimi_mentions,
|
||||
}
|
||||
|
||||
|
||||
def analyze_test_shifts(cycles: list[dict]) -> dict:
|
||||
"""Analyze shifts in test patterns."""
|
||||
if not cycles:
|
||||
return {"note": "No cycle data available"}
|
||||
|
||||
total_tests_passed = sum(c.get("tests_passed", 0) for c in cycles)
|
||||
total_tests_added = sum(c.get("tests_added", 0) for c in cycles)
|
||||
avg_tests_per_cycle = round(total_tests_passed / len(cycles), 1) if cycles else 0
|
||||
|
||||
# Look for test-related issues
|
||||
test_focused = [
|
||||
c for c in cycles
|
||||
if c.get("type") == "test" or "test" in c.get("notes", "").lower()
|
||||
]
|
||||
|
||||
return {
|
||||
"total_tests_passed": total_tests_passed,
|
||||
"total_tests_added": total_tests_added,
|
||||
"avg_tests_per_cycle": avg_tests_per_cycle,
|
||||
"test_focused_cycles": len(test_focused),
|
||||
}
|
||||
|
||||
|
||||
def analyze_triage_shifts(triage_data: dict) -> dict:
|
||||
"""Analyze shifts in triage patterns."""
|
||||
insights = triage_data.get("latest_insights", {})
|
||||
recommendations = insights.get("recommendations", [])
|
||||
|
||||
high_priority_recs = [
|
||||
r for r in recommendations
|
||||
if r.get("severity") == "high"
|
||||
]
|
||||
|
||||
return {
|
||||
"triage_runs": triage_data.get("triage_runs", 0),
|
||||
"insights_generated": insights.get("generated_at") is not None,
|
||||
"high_priority_recommendations": len(high_priority_recs),
|
||||
"recent_recommendations": recommendations[:3] if recommendations else [],
|
||||
}
|
||||
|
||||
|
||||
def generate_vibe_summary(
|
||||
cycles_data: dict,
|
||||
issues_data: dict,
|
||||
prs_data: dict,
|
||||
themes: dict,
|
||||
agent_contrib: dict,
|
||||
test_shifts: dict,
|
||||
triage_shifts: dict,
|
||||
) -> dict:
|
||||
"""Generate the human-readable 'vibe' summary."""
|
||||
# Determine overall vibe
|
||||
success_rate = cycles_data.get("success_rate", 0)
|
||||
failures = cycles_data.get("failures", 0)
|
||||
closed_count = issues_data.get("closed_count", 0)
|
||||
merged_count = prs_data.get("merged_count", 0)
|
||||
|
||||
if success_rate >= 0.9 and closed_count > 0:
|
||||
vibe = "productive"
|
||||
vibe_description = "A strong week with solid delivery and healthy success rates."
|
||||
elif success_rate >= 0.7:
|
||||
vibe = "steady"
|
||||
vibe_description = "Steady progress with some bumps. Things are moving forward."
|
||||
elif failures > cycles_data.get("successes", 0):
|
||||
vibe = "struggling"
|
||||
vibe_description = "A challenging week with more failures than successes. Time to regroup."
|
||||
else:
|
||||
vibe = "quiet"
|
||||
vibe_description = "A lighter week with limited activity."
|
||||
|
||||
# Focus areas from themes
|
||||
focus_areas = []
|
||||
for layer in themes.get("layers", [])[:3]:
|
||||
focus_areas.append(f"{layer['name']} ({layer['count']} items)")
|
||||
|
||||
# Agent activity summary
|
||||
agent_summary = ""
|
||||
active_assignees = agent_contrib.get("active_assignees", [])
|
||||
if active_assignees:
|
||||
top_agent = active_assignees[0]
|
||||
agent_summary = f"{top_agent['login']} led with {top_agent['issues_count']} assigned issues."
|
||||
|
||||
# Notable events
|
||||
notable = []
|
||||
if merged_count > 5:
|
||||
notable.append(f"{merged_count} PRs merged — high integration velocity")
|
||||
if triage_shifts.get("high_priority_recommendations", 0) > 0:
|
||||
notable.append("High-priority recommendations from loop introspection")
|
||||
if test_shifts.get("test_focused_cycles", 0) > 3:
|
||||
notable.append("Strong test coverage focus")
|
||||
if not notable:
|
||||
notable.append("Regular development flow")
|
||||
|
||||
return {
|
||||
"overall": vibe,
|
||||
"description": vibe_description,
|
||||
"focus_areas": focus_areas,
|
||||
"agent_summary": agent_summary,
|
||||
"notable_events": notable,
|
||||
}
|
||||
|
||||
|
||||
# ── Narrative Generation ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
def generate_narrative(
|
||||
cycles_data: dict,
|
||||
issues_data: dict,
|
||||
prs_data: dict,
|
||||
triage_data: dict,
|
||||
themes: dict,
|
||||
agent_contrib: dict,
|
||||
test_shifts: dict,
|
||||
triage_shifts: dict,
|
||||
token_data: dict,
|
||||
since: datetime,
|
||||
until: datetime,
|
||||
) -> dict:
|
||||
"""Generate the complete weekly narrative."""
|
||||
vibe = generate_vibe_summary(
|
||||
cycles_data, issues_data, prs_data, themes, agent_contrib, test_shifts, triage_shifts
|
||||
)
|
||||
|
||||
return {
|
||||
"generated_at": datetime.now(UTC).isoformat(),
|
||||
"period": {
|
||||
"start": since.isoformat(),
|
||||
"end": until.isoformat(),
|
||||
"days": 7,
|
||||
},
|
||||
"vibe": vibe,
|
||||
"activity": {
|
||||
"cycles": {
|
||||
"total": cycles_data.get("total", 0),
|
||||
"successes": cycles_data.get("successes", 0),
|
||||
"failures": cycles_data.get("failures", 0),
|
||||
"success_rate": cycles_data.get("success_rate", 0),
|
||||
},
|
||||
"issues": {
|
||||
"touched": issues_data.get("touched_count", 0),
|
||||
"closed": issues_data.get("closed_count", 0),
|
||||
"opened": issues_data.get("opened_count", 0),
|
||||
},
|
||||
"pull_requests": {
|
||||
"touched": prs_data.get("touched_count", 0),
|
||||
"merged": prs_data.get("merged_count", 0),
|
||||
"opened": prs_data.get("opened_count", 0),
|
||||
},
|
||||
},
|
||||
"themes": themes,
|
||||
"agents": agent_contrib,
|
||||
"test_health": test_shifts,
|
||||
"triage_health": triage_shifts,
|
||||
"token_economy": token_data,
|
||||
}
|
||||
|
||||
|
||||
def generate_markdown_summary(narrative: dict) -> str:
|
||||
"""Generate a human-readable markdown summary."""
|
||||
vibe = narrative.get("vibe", {})
|
||||
activity = narrative.get("activity", {})
|
||||
cycles = activity.get("cycles", {})
|
||||
issues = activity.get("issues", {})
|
||||
prs = activity.get("pull_requests", {})
|
||||
|
||||
lines = [
|
||||
"# Weekly Narrative Summary",
|
||||
"",
|
||||
f"**Period:** {narrative['period']['start'][:10]} to {narrative['period']['end'][:10]}",
|
||||
f"**Vibe:** {vibe.get('overall', 'unknown').title()}",
|
||||
"",
|
||||
f"{vibe.get('description', '')}",
|
||||
"",
|
||||
"## Activity Highlights",
|
||||
"",
|
||||
f"- **Development Cycles:** {cycles.get('total', 0)} total ({cycles.get('successes', 0)} success, {cycles.get('failures', 0)} failure)",
|
||||
f"- **Issues:** {issues.get('closed', 0)} closed, {issues.get('opened', 0)} opened",
|
||||
f"- **Pull Requests:** {prs.get('merged', 0)} merged, {prs.get('opened', 0)} opened",
|
||||
"",
|
||||
]
|
||||
|
||||
# Focus areas
|
||||
focus = vibe.get("focus_areas", [])
|
||||
if focus:
|
||||
lines.append("## Focus Areas")
|
||||
lines.append("")
|
||||
for area in focus:
|
||||
lines.append(f"- {area}")
|
||||
lines.append("")
|
||||
|
||||
# Agent contributions
|
||||
agent_summary = vibe.get("agent_summary", "")
|
||||
if agent_summary:
|
||||
lines.append("## Agent Activity")
|
||||
lines.append("")
|
||||
lines.append(agent_summary)
|
||||
lines.append("")
|
||||
|
||||
# Notable events
|
||||
notable = vibe.get("notable_events", [])
|
||||
if notable:
|
||||
lines.append("## Notable Events")
|
||||
lines.append("")
|
||||
for event in notable:
|
||||
lines.append(f"- {event}")
|
||||
lines.append("")
|
||||
|
||||
# Triage health
|
||||
triage = narrative.get("triage_health", {})
|
||||
if triage.get("high_priority_recommendations", 0) > 0:
|
||||
lines.append("## Triage Notes")
|
||||
lines.append("")
|
||||
lines.append(f"⚠️ {triage['high_priority_recommendations']} high-priority recommendation(s) from loop introspection.")
|
||||
lines.append("")
|
||||
for rec in triage.get("recent_recommendations", [])[:2]:
|
||||
lines.append(f"- **{rec.get('category', 'general')}:** {rec.get('finding', '')}")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ── Main ───────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(
|
||||
description="Generate weekly narrative summary of work and vibes",
|
||||
)
|
||||
p.add_argument(
|
||||
"--json", "-j",
|
||||
action="store_true",
|
||||
help="Output as JSON instead of markdown",
|
||||
)
|
||||
p.add_argument(
|
||||
"--output", "-o",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Output file path (default from config)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--days",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Override lookback days (default 7)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--force",
|
||||
action="store_true",
|
||||
help="Run even if disabled in config",
|
||||
)
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
config = load_automation_config()
|
||||
|
||||
# Check if enabled
|
||||
if not config.get("enabled", True) and not args.force:
|
||||
print("[weekly_narrative] Skipped — weekly narrative is disabled in config")
|
||||
print("[weekly_narrative] Use --force to run anyway")
|
||||
return 0
|
||||
|
||||
# Determine lookback period
|
||||
days = args.days if args.days is not None else config.get("lookback_days", 7)
|
||||
until = datetime.now(UTC)
|
||||
since = until - timedelta(days=days)
|
||||
|
||||
print(f"[weekly_narrative] Generating narrative for the past {days} days...")
|
||||
|
||||
# Setup Gitea client
|
||||
token = get_token(config)
|
||||
client = GiteaClient(config, token)
|
||||
|
||||
if not client.is_available():
|
||||
print("[weekly_narrative] Warning: Gitea API unavailable — will use local data only")
|
||||
|
||||
# Collect data
|
||||
cycles_data = collect_cycles_data(since)
|
||||
issues_data = collect_issues_data(client, since)
|
||||
prs_data = collect_prs_data(client, since)
|
||||
triage_data = collect_triage_data(since)
|
||||
token_data = collect_token_data(since)
|
||||
|
||||
# Analyze
|
||||
themes = extract_themes(issues_data.get("issues", []))
|
||||
agent_contrib = extract_agent_contributions(
|
||||
issues_data.get("issues", []),
|
||||
prs_data.get("prs", []),
|
||||
cycles_data.get("cycles", []),
|
||||
)
|
||||
test_shifts = analyze_test_shifts(cycles_data.get("cycles", []))
|
||||
triage_shifts = analyze_triage_shifts(triage_data)
|
||||
|
||||
# Generate narrative
|
||||
narrative = generate_narrative(
|
||||
cycles_data,
|
||||
issues_data,
|
||||
prs_data,
|
||||
triage_data,
|
||||
themes,
|
||||
agent_contrib,
|
||||
test_shifts,
|
||||
triage_shifts,
|
||||
token_data,
|
||||
since,
|
||||
until,
|
||||
)
|
||||
|
||||
# Determine output path
|
||||
output_path = args.output or config.get("output_file", ".loop/weekly_narrative.json")
|
||||
output_file = REPO_ROOT / output_path
|
||||
output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Write JSON output
|
||||
output_file.write_text(json.dumps(narrative, indent=2) + "\n")
|
||||
|
||||
# Write markdown summary alongside JSON
|
||||
md_output_file = output_file.with_suffix(".md")
|
||||
md_output_file.write_text(generate_markdown_summary(narrative))
|
||||
|
||||
# Print output
|
||||
if args.json:
|
||||
print(json.dumps(narrative, indent=2))
|
||||
else:
|
||||
print()
|
||||
print(generate_markdown_summary(narrative))
|
||||
|
||||
print(f"\n[weekly_narrative] Written to: {output_file}")
|
||||
print(f"[weekly_narrative] Markdown summary: {md_output_file}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user