Compare commits

..

1 Commits

Author SHA1 Message Date
kimi
919a011cae feat: adapt token rewards based on system stress signals (#714)
Implements adaptive token rewards that respond to system stress:

- StressDetector module (timmy/stress_detector.py):
  - Monitors 4 stress signals: flaky test rate, P1 backlog growth,
    CI failure rate, open bug count
  - Calculates weighted stress score (0-1) and determines mode:
    calm (<0.3), elevated (0.3-0.6), high (>0.6)
  - Applies quest-specific multipliers based on current mode

- Configuration (config/stress_modes.yaml):
  - Thresholds for mode transitions
  - Signal weights and thresholds
  - Multipliers per mode (e.g., test_improve: 1.5x in high stress)

- Quest system integration:
  - Rewards now include stress bonus/penalty in notification
  - Quest status API includes adjusted_reward and multiplier
  - Agent can see current stress mode and why rewards changed

- API endpoints:
  - GET /quests/api/stress - current stress mode and signals
  - POST /quests/api/stress/refresh - force refresh stress detection

Fixes #714
2026-03-21 17:26:40 -04:00
10 changed files with 1081 additions and 1123 deletions

98
config/stress_modes.yaml Normal file
View File

@@ -0,0 +1,98 @@
# ── System Stress Modes Configuration ────────────────────────────────────────
#
# This configuration defines how token rewards adapt based on system stress.
# When the system detects elevated stress (flaky tests, growing backlog,
# CI failures), quest rewards are adjusted to incentivize agents to focus
# on the most critical areas.
#
# ── How It Works ─────────────────────────────────────────────────────────────
#
# 1. SIGNALS: System metrics are monitored continuously
# 2. SCORE: Weighted contributions from triggered signals create a stress score
# 3. MODE: Score determines the stress mode (calm, elevated, high)
# 4. MULTIPLIERS: Token rewards are multiplied based on the current mode
#
# ── Stress Thresholds ────────────────────────────────────────────────────────
thresholds:
# Minimum score to enter elevated mode (0.0 - 1.0)
elevated_min: 0.3
# Minimum score to enter high stress mode (0.0 - 1.0)
high_min: 0.6
# ── Stress Signals ───────────────────────────────────────────────────────────
#
# Each signal has:
# - threshold: Value at which signal is considered "triggered"
# - weight: Contribution to overall stress score (should sum to ~1.0)
signals:
flaky_test_rate:
threshold: 0.15 # 15% of tests showing flakiness
weight: 0.30
description: "Percentage of test runs that are flaky"
p1_backlog_growth:
threshold: 5 # 5 new P1 issues in lookback period
weight: 0.25
description: "Net growth in P1 priority issues over 7 days"
ci_failure_rate:
threshold: 0.20 # 20% of CI runs failing
weight: 0.25
description: "Percentage of CI runs failing in lookback period"
open_bug_count:
threshold: 20 # 20 open bugs
weight: 0.20
description: "Total open issues labeled as 'bug'"
# ── Token Multipliers ────────────────────────────────────────────────────────
#
# Multipliers are applied to quest rewards based on current stress mode.
# Values > 1.0 increase rewards, < 1.0 decrease rewards.
#
# Quest types:
# - test_improve: Test coverage/quality improvements
# - docs_update: Documentation updates
# - issue_count: Closing specific issue types
# - issue_reduce: Reducing overall issue backlog
# - daily_run: Daily Run session completion
# - custom: Special/manual quests
# - exploration: Exploratory work
# - refactor: Code refactoring
multipliers:
calm:
# Calm periods: incentivize maintenance and exploration
test_improve: 1.0
docs_update: 1.2
issue_count: 1.0
issue_reduce: 1.0
daily_run: 1.0
custom: 1.0
exploration: 1.3
refactor: 1.2
elevated:
# Elevated stress: start emphasizing stability
test_improve: 1.2
docs_update: 1.0
issue_count: 1.1
issue_reduce: 1.1
daily_run: 1.0
custom: 1.0
exploration: 1.0
refactor: 0.9 # Discourage risky changes
high:
# High stress: crisis mode, focus on stabilization
test_improve: 1.5 # Strongly incentivize testing
docs_update: 0.8 # Deprioritize docs
issue_count: 1.3 # Reward closing issues
issue_reduce: 1.4 # Strongly reward reducing backlog
daily_run: 1.1
custom: 1.0
exploration: 0.7 # Discourage exploration
refactor: 0.6 # Discourage refactors during crisis

View File

@@ -330,13 +330,6 @@ class Settings(BaseSettings):
autoresearch_max_iterations: int = 100
autoresearch_metric: str = "val_bpb" # metric to optimise (lower = better)
# ── Weekly Narrative Summary ───────────────────────────────────────
# Generates a human-readable weekly summary of development activity.
# Disabling this will stop the weekly narrative generation.
weekly_narrative_enabled: bool = True
weekly_narrative_lookback_days: int = 7
weekly_narrative_output_dir: str = ".loop"
# ── Local Hands (Shell + Git) ──────────────────────────────────────
# Enable local shell/git execution hands.
hands_shell_enabled: bool = True

View File

@@ -187,6 +187,76 @@ async def reload_quest_config_api() -> JSONResponse:
)
# ---------------------------------------------------------------------------
# Stress Mode Endpoints
# ---------------------------------------------------------------------------
@router.get("/api/stress")
async def get_stress_status_api() -> JSONResponse:
"""Get current stress mode status and multipliers.
Returns:
Current stress mode, score, active signals, and multipliers
"""
try:
from timmy.stress_detector import (
detect_stress_mode,
get_stress_summary,
)
snapshot = detect_stress_mode()
summary = get_stress_summary()
return JSONResponse(
{
"status": "ok",
"stress": summary,
"raw": snapshot.to_dict(),
}
)
except Exception as exc:
logger.warning("Failed to get stress status: %s", exc)
return JSONResponse(
{
"status": "error",
"error": str(exc),
},
status_code=500,
)
@router.post("/api/stress/refresh")
async def refresh_stress_detection_api() -> JSONResponse:
"""Force a fresh stress detection check.
Normally stress is cached for 60 seconds. This endpoint
bypasses the cache for immediate results.
"""
try:
from timmy.stress_detector import detect_stress_mode, get_stress_summary
snapshot = detect_stress_mode(force_refresh=True)
summary = get_stress_summary()
return JSONResponse(
{
"status": "ok",
"stress": summary,
"raw": snapshot.to_dict(),
}
)
except Exception as exc:
logger.warning("Failed to refresh stress detection: %s", exc)
return JSONResponse(
{
"status": "error",
"error": str(exc),
},
status_code=500,
)
# ---------------------------------------------------------------------------
# Dashboard UI Endpoints
# ---------------------------------------------------------------------------

View File

@@ -269,6 +269,22 @@ def _is_on_cooldown(progress: QuestProgress, quest: QuestDefinition) -> bool:
return False
def _apply_stress_multiplier(base_reward: int, quest_type: QuestType) -> tuple[int, float]:
"""Apply stress-based multiplier to quest reward.
Returns:
Tuple of (adjusted_reward, multiplier_used)
"""
try:
from timmy.stress_detector import apply_multiplier
multiplier = apply_multiplier(base_reward, quest_type.value)
return multiplier, multiplier / max(base_reward, 1)
except Exception as exc:
logger.debug("Failed to apply stress multiplier: %s", exc)
return base_reward, 1.0
def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
"""Claim the token reward for a completed quest.
@@ -292,13 +308,18 @@ def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
return None
try:
# Apply stress-based multiplier
adjusted_reward, multiplier = _apply_stress_multiplier(
quest.reward_tokens, quest.quest_type
)
# Award tokens via ledger
from lightning.ledger import create_invoice_entry, mark_settled
# Create a mock invoice for the reward
invoice_entry = create_invoice_entry(
payment_hash=f"quest_{quest_id}_{agent_id}_{int(time.time())}",
amount_sats=quest.reward_tokens,
amount_sats=adjusted_reward,
memo=f"Quest reward: {quest.name}",
source="quest_reward",
agent_id=agent_id,
@@ -320,12 +341,21 @@ def claim_quest_reward(quest_id: str, agent_id: str) -> dict[str, Any] | None:
progress.completed_at = ""
progress.claimed_at = ""
notification = quest.notification_message.format(tokens=quest.reward_tokens)
# Build notification with multiplier info
notification = quest.notification_message.format(tokens=adjusted_reward)
if multiplier != 1.0:
pct = int((multiplier - 1.0) * 100)
if pct > 0:
notification += f" (+{pct}% stress bonus)"
else:
notification += f" ({pct}% stress adjustment)"
return {
"quest_id": quest_id,
"agent_id": agent_id,
"tokens_awarded": quest.reward_tokens,
"tokens_awarded": adjusted_reward,
"base_reward": quest.reward_tokens,
"multiplier": round(multiplier, 2),
"notification": notification,
"completion_count": progress.completion_count,
}
@@ -467,6 +497,14 @@ def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
total_rewards = 0
completed_count = 0
# Get current stress mode for adjusted rewards display
try:
from timmy.stress_detector import get_current_stress_mode, get_multiplier
current_mode = get_current_stress_mode()
except Exception:
current_mode = None
for quest_id, quest in definitions.items():
progress = get_quest_progress(quest_id, agent_id)
if not progress:
@@ -474,11 +512,23 @@ def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
is_on_cooldown = _is_on_cooldown(progress, quest) if quest.repeatable else False
# Calculate adjusted reward with stress multiplier
adjusted_reward = quest.reward_tokens
multiplier = 1.0
if current_mode:
try:
multiplier = get_multiplier(quest.quest_type.value, current_mode)
adjusted_reward = int(quest.reward_tokens * multiplier)
except Exception:
pass
quest_info = {
"quest_id": quest_id,
"name": quest.name,
"description": quest.description,
"reward_tokens": quest.reward_tokens,
"adjusted_reward": adjusted_reward,
"multiplier": round(multiplier, 2),
"type": quest.quest_type.value,
"enabled": quest.enabled,
"repeatable": quest.repeatable,
@@ -509,6 +559,7 @@ def get_agent_quests_status(agent_id: str) -> dict[str, Any]:
"total_tokens_earned": total_rewards,
"total_quests_completed": completed_count,
"active_quests_count": len([q for q in quests_status if q["enabled"]]),
"stress_mode": current_mode.value if current_mode else None,
}

View File

@@ -0,0 +1,565 @@
"""System stress detection for adaptive token rewards.
Monitors system signals like flakiness, backlog growth, and CI failures
to determine the current stress mode. Token rewards are then adjusted
based on the stress mode to incentivize agents to focus on critical areas.
"""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from enum import StrEnum
from pathlib import Path
from typing import Any
import yaml
from config import settings
logger = logging.getLogger(__name__)
# Path to stress mode configuration
STRESS_CONFIG_PATH = Path(settings.repo_root) / "config" / "stress_modes.yaml"
class StressMode(StrEnum):
"""System stress modes.
- CALM: Normal operations, incentivize exploration and refactoring
- ELEVATED: Some stress signals detected, balance incentives
- HIGH: Critical stress, strongly incentivize bug fixes and stabilization
"""
CALM = "calm"
ELEVATED = "elevated"
HIGH = "high"
@dataclass
class StressSignal:
"""A single stress signal reading."""
name: str
value: float
threshold: float
weight: float
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
@property
def is_triggered(self) -> bool:
"""Whether this signal exceeds its threshold."""
return self.value >= self.threshold
@property
def contribution(self) -> float:
"""Calculate this signal's contribution to stress score."""
if not self.is_triggered:
return 0.0
# Contribution is weighted ratio of value to threshold
return min(1.0, (self.value / max(self.threshold, 1.0))) * self.weight
@dataclass
class StressSnapshot:
"""Complete stress assessment at a point in time."""
mode: StressMode
score: float
signals: list[StressSignal]
multipliers: dict[str, float]
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"mode": self.mode.value,
"score": round(self.score, 3),
"signals": [
{
"name": s.name,
"value": s.value,
"threshold": s.threshold,
"triggered": s.is_triggered,
"contribution": round(s.contribution, 3),
}
for s in self.signals
],
"multipliers": self.multipliers,
"timestamp": self.timestamp,
}
@dataclass
class StressThresholds:
"""Thresholds for entering/exiting stress modes."""
elevated_min: float = 0.3
high_min: float = 0.6
def get_mode_for_score(self, score: float) -> StressMode:
"""Determine stress mode based on score."""
if score >= self.high_min:
return StressMode.HIGH
elif score >= self.elevated_min:
return StressMode.ELEVATED
return StressMode.CALM
# In-memory storage for stress state
_current_snapshot: StressSnapshot | None = None
_last_check_time: datetime | None = None
_config_cache: dict[str, Any] | None = None
_config_mtime: float = 0.0
def _load_stress_config() -> dict[str, Any]:
"""Load stress mode configuration from YAML.
Returns:
Configuration dictionary with default fallbacks
"""
global _config_cache, _config_mtime
# Check if config file has been modified
if STRESS_CONFIG_PATH.exists():
mtime = STRESS_CONFIG_PATH.stat().st_mtime
if mtime != _config_mtime or _config_cache is None:
try:
raw = STRESS_CONFIG_PATH.read_text()
_config_cache = yaml.safe_load(raw) or {}
_config_mtime = mtime
logger.debug("Loaded stress config from %s", STRESS_CONFIG_PATH)
except (OSError, yaml.YAMLError) as exc:
logger.warning("Failed to load stress config: %s", exc)
_config_cache = {}
if _config_cache is None:
_config_cache = {}
return _config_cache
def get_default_config() -> dict[str, Any]:
"""Get default stress configuration."""
return {
"thresholds": {
"elevated_min": 0.3,
"high_min": 0.6,
},
"signals": {
"flaky_test_rate": {
"threshold": 0.15, # 15% flaky test rate
"weight": 0.3,
"description": "Percentage of tests that are flaky",
},
"p1_backlog_growth": {
"threshold": 5, # 5 new P1 issues
"weight": 0.25,
"description": "Net growth in P1 priority issues",
},
"ci_failure_rate": {
"threshold": 0.2, # 20% CI failure rate
"weight": 0.25,
"description": "Percentage of CI runs failing",
},
"open_bug_count": {
"threshold": 20, # 20 open bugs
"weight": 0.2,
"description": "Total open issues labeled as bugs",
},
},
"multipliers": {
StressMode.CALM.value: {
"test_improve": 1.0,
"docs_update": 1.2, # Calm periods good for docs
"issue_count": 1.0,
"issue_reduce": 1.0,
"daily_run": 1.0,
"custom": 1.0,
"exploration": 1.3, # Encourage exploration
"refactor": 1.2, # Encourage refactoring
},
StressMode.ELEVATED.value: {
"test_improve": 1.2, # Start emphasizing tests
"docs_update": 1.0,
"issue_count": 1.1,
"issue_reduce": 1.1,
"daily_run": 1.0,
"custom": 1.0,
"exploration": 1.0,
"refactor": 0.9, # Discourage risky refactors
},
StressMode.HIGH.value: {
"test_improve": 1.5, # Strongly incentivize testing
"docs_update": 0.8, # Deprioritize docs
"issue_count": 1.3, # Reward closing issues
"issue_reduce": 1.4, # Strongly reward reducing backlog
"daily_run": 1.1,
"custom": 1.0,
"exploration": 0.7, # Discourage exploration
"refactor": 0.6, # Discourage refactors during crisis
},
},
}
def _get_config_value(key_path: str, default: Any = None) -> Any:
"""Get a value from config using dot notation path."""
config = _load_stress_config()
keys = key_path.split(".")
value = config
for key in keys:
if isinstance(value, dict):
value = value.get(key)
else:
return default
return value if value is not None else default
def _calculate_flaky_test_rate() -> float:
"""Calculate current flaky test rate from available data."""
try:
# Try to load from daily run metrics or test results
test_results_path = Path(settings.repo_root) / ".loop" / "test_results.jsonl"
if not test_results_path.exists():
return 0.0
# Count recent test runs and flaky results
now = datetime.now(UTC)
cutoff = now - timedelta(days=7)
total_runs = 0
flaky_runs = 0
if test_results_path.exists():
for line in test_results_path.read_text().strip().splitlines():
try:
entry = json.loads(line)
ts_str = entry.get("timestamp", "")
if not ts_str:
continue
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
if ts >= cutoff:
total_runs += 1
if entry.get("is_flaky", False):
flaky_runs += 1
except (json.JSONDecodeError, ValueError):
continue
return flaky_runs / max(total_runs, 1)
except Exception as exc:
logger.debug("Failed to calculate flaky test rate: %s", exc)
return 0.0
def _calculate_p1_backlog_growth() -> float:
"""Calculate P1 issue backlog growth."""
try:
from dashboard.routes.daily_run import GiteaClient, _load_config
config = _load_config()
token = config.get("token")
client = GiteaClient(config, token)
if not client.is_available():
return 0.0
# Get current P1 issues
now = datetime.now(UTC)
cutoff_current = now - timedelta(days=7)
cutoff_previous = now - timedelta(days=14)
issues = client.get_paginated("issues", {"state": "all", "labels": "P1", "limit": 100})
current_count = 0
previous_count = 0
for issue in issues:
created_at = issue.get("created_at", "")
if not created_at:
continue
try:
created = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
if created >= cutoff_current:
current_count += 1
elif created >= cutoff_previous:
previous_count += 1
except (ValueError, TypeError):
continue
# Return net growth (positive means growing backlog)
return max(0, current_count - previous_count)
except Exception as exc:
logger.debug("Failed to calculate P1 backlog growth: %s", exc)
return 0.0
def _calculate_ci_failure_rate() -> float:
"""Calculate CI failure rate from recent runs."""
try:
# Try to get CI metrics from Gitea or local files
ci_results_path = Path(settings.repo_root) / ".loop" / "ci_results.jsonl"
if not ci_results_path.exists():
return 0.0
now = datetime.now(UTC)
cutoff = now - timedelta(days=7)
total_runs = 0
failed_runs = 0
for line in ci_results_path.read_text().strip().splitlines():
try:
entry = json.loads(line)
ts_str = entry.get("timestamp", "")
if not ts_str:
continue
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
if ts >= cutoff:
total_runs += 1
if entry.get("status") != "success":
failed_runs += 1
except (json.JSONDecodeError, ValueError):
continue
return failed_runs / max(total_runs, 1)
except Exception as exc:
logger.debug("Failed to calculate CI failure rate: %s", exc)
return 0.0
def _calculate_open_bug_count() -> float:
"""Calculate current open bug count."""
try:
from dashboard.routes.daily_run import GiteaClient, _load_config
config = _load_config()
token = config.get("token")
client = GiteaClient(config, token)
if not client.is_available():
return 0.0
issues = client.get_paginated("issues", {"state": "open", "labels": "bug", "limit": 100})
return float(len(issues))
except Exception as exc:
logger.debug("Failed to calculate open bug count: %s", exc)
return 0.0
def _collect_stress_signals() -> list[StressSignal]:
"""Collect all stress signals from the system."""
config = _load_stress_config()
default_config = get_default_config()
signals_config = config.get("signals", default_config["signals"])
signals = []
# Define signal collectors
collectors = {
"flaky_test_rate": _calculate_flaky_test_rate,
"p1_backlog_growth": _calculate_p1_backlog_growth,
"ci_failure_rate": _calculate_ci_failure_rate,
"open_bug_count": _calculate_open_bug_count,
}
for signal_name, collector in collectors.items():
signal_cfg = signals_config.get(signal_name, {})
default_cfg = default_config["signals"].get(signal_name, {})
try:
value = collector()
threshold = signal_cfg.get("threshold", default_cfg.get("threshold", 1.0))
weight = signal_cfg.get("weight", default_cfg.get("weight", 0.25))
signals.append(
StressSignal(
name=signal_name,
value=value,
threshold=threshold,
weight=weight,
)
)
except Exception as exc:
logger.debug("Failed to collect signal %s: %s", signal_name, exc)
return signals
def _calculate_stress_score(signals: list[StressSignal]) -> float:
"""Calculate overall stress score from signals.
Score is weighted sum of triggered signal contributions,
normalized to 0-1 range.
"""
if not signals:
return 0.0
total_weight = sum(s.weight for s in signals)
if total_weight == 0:
return 0.0
triggered_contribution = sum(s.contribution for s in signals)
return min(1.0, triggered_contribution / total_weight)
def _get_multipliers_for_mode(mode: StressMode) -> dict[str, float]:
"""Get token multipliers for a specific stress mode."""
config = _load_stress_config()
default_config = get_default_config()
multipliers = config.get("multipliers", default_config["multipliers"])
mode_multipliers = multipliers.get(mode.value, {})
default_mode_multipliers = default_config["multipliers"].get(mode.value, {})
# Merge with defaults
result = default_mode_multipliers.copy()
result.update(mode_multipliers)
return result
def detect_stress_mode(
force_refresh: bool = False,
min_check_interval_seconds: int = 60,
) -> StressSnapshot:
"""Detect current system stress mode.
Args:
force_refresh: Force a new check even if recently checked
min_check_interval_seconds: Minimum seconds between checks
Returns:
StressSnapshot with mode, score, signals, and multipliers
"""
global _current_snapshot, _last_check_time
now = datetime.now(UTC)
# Return cached snapshot if recent and not forced
if not force_refresh and _current_snapshot is not None and _last_check_time is not None:
elapsed = (now - _last_check_time).total_seconds()
if elapsed < min_check_interval_seconds:
return _current_snapshot
# Collect signals and calculate stress
signals = _collect_stress_signals()
score = _calculate_stress_score(signals)
# Determine mode from score
config = _load_stress_config()
default_config = get_default_config()
thresholds_cfg = config.get("thresholds", default_config["thresholds"])
thresholds = StressThresholds(
elevated_min=thresholds_cfg.get("elevated_min", 0.3),
high_min=thresholds_cfg.get("high_min", 0.6),
)
mode = thresholds.get_mode_for_score(score)
# Get multipliers for this mode
multipliers = _get_multipliers_for_mode(mode)
# Create snapshot
snapshot = StressSnapshot(
mode=mode,
score=score,
signals=signals,
multipliers=multipliers,
timestamp=now.isoformat(),
)
# Cache result
_current_snapshot = snapshot
_last_check_time = now
# Log mode changes
if _current_snapshot is not None and _current_snapshot.mode != mode:
logger.info(
"Stress mode changed: %s -> %s (score: %.2f)",
_current_snapshot.mode.value if _current_snapshot else "none",
mode.value,
score,
)
return snapshot
def get_current_stress_mode() -> StressMode:
"""Get current stress mode (uses cached or fresh detection)."""
snapshot = detect_stress_mode()
return snapshot.mode
def get_multiplier(quest_type: str, mode: StressMode | None = None) -> float:
"""Get token multiplier for a quest type.
Args:
quest_type: Type of quest (test_improve, issue_count, etc.)
mode: Specific mode to get multiplier for, or None for current
Returns:
Multiplier value (1.0 = normal, 1.5 = 50% bonus, etc.)
"""
if mode is None:
mode = get_current_stress_mode()
multipliers = _get_multipliers_for_mode(mode)
return multipliers.get(quest_type, 1.0)
def apply_multiplier(base_reward: int, quest_type: str) -> int:
"""Apply stress-based multiplier to a base reward.
Args:
base_reward: Base token reward amount
quest_type: Type of quest for multiplier lookup
Returns:
Adjusted reward amount (always >= 1)
"""
multiplier = get_multiplier(quest_type)
adjusted = int(base_reward * multiplier)
return max(1, adjusted)
def get_stress_summary() -> dict[str, Any]:
"""Get a human-readable summary of current stress state."""
snapshot = detect_stress_mode()
# Generate explanation
explanations = {
StressMode.CALM: "System is calm. Good time for exploration and refactoring.",
StressMode.ELEVATED: "Elevated stress detected. Focus on stability and tests.",
StressMode.HIGH: "HIGH STRESS MODE. Prioritize bug fixes and test hardening.",
}
triggered_signals = [s for s in snapshot.signals if s.is_triggered]
return {
"mode": snapshot.mode.value,
"score": round(snapshot.score, 3),
"explanation": explanations.get(snapshot.mode, "Unknown mode"),
"active_signals": [
{
"name": s.name,
"value": round(s.value, 3),
"threshold": s.threshold,
}
for s in triggered_signals
],
"current_multipliers": snapshot.multipliers,
"last_updated": snapshot.timestamp,
}
def reset_stress_state() -> None:
"""Reset stress state cache (useful for testing)."""
global _current_snapshot, _last_check_time, _config_cache, _config_mtime
_current_snapshot = None
_last_check_time = None
_config_cache = None
_config_mtime = 0.0

View File

@@ -1,343 +0,0 @@
"""Tests for weekly_narrative.py script."""
from __future__ import annotations
import json
import sys
from datetime import UTC, datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
# Add timmy_automations to path for imports
sys.path.insert(
0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations" / "daily_run")
)
import weekly_narrative as wn
class TestParseTimestamp:
"""Test timestamp parsing."""
def test_parse_iso_with_z(self):
"""Parse ISO timestamp with Z suffix."""
result = wn.parse_ts("2026-03-21T12:00:00Z")
assert result is not None
assert result.year == 2026
assert result.month == 3
assert result.day == 21
def test_parse_iso_with_offset(self):
"""Parse ISO timestamp with timezone offset."""
result = wn.parse_ts("2026-03-21T12:00:00+00:00")
assert result is not None
assert result.year == 2026
def test_parse_empty_string(self):
"""Empty string returns None."""
result = wn.parse_ts("")
assert result is None
def test_parse_invalid_string(self):
"""Invalid string returns None."""
result = wn.parse_ts("not-a-timestamp")
assert result is None
class TestCollectCyclesData:
"""Test cycle data collection."""
def test_no_cycles_file(self, tmp_path):
"""Handle missing cycles file gracefully."""
with patch.object(wn, "REPO_ROOT", tmp_path):
since = datetime.now(UTC) - timedelta(days=7)
result = wn.collect_cycles_data(since)
assert result["total"] == 0
assert result["successes"] == 0
assert result["failures"] == 0
def test_collect_recent_cycles(self, tmp_path):
"""Collect cycles within lookback period."""
retro_dir = tmp_path / ".loop" / "retro"
retro_dir.mkdir(parents=True)
now = datetime.now(UTC)
cycles = [
{"timestamp": now.isoformat(), "success": True, "cycle": 1},
{"timestamp": now.isoformat(), "success": False, "cycle": 2},
{"timestamp": (now - timedelta(days=10)).isoformat(), "success": True, "cycle": 3},
]
with open(retro_dir / "cycles.jsonl", "w") as f:
for c in cycles:
f.write(json.dumps(c) + "\n")
with patch.object(wn, "REPO_ROOT", tmp_path):
since = now - timedelta(days=7)
result = wn.collect_cycles_data(since)
assert result["total"] == 2 # Only recent 2
assert result["successes"] == 1
assert result["failures"] == 1
class TestExtractThemes:
"""Test theme extraction from issues."""
def test_extract_layer_labels(self):
"""Extract layer labels from issues."""
issues = [
{"labels": [{"name": "layer:triage"}, {"name": "bug"}]},
{"labels": [{"name": "layer:tests"}, {"name": "bug"}]},
{"labels": [{"name": "layer:triage"}, {"name": "feature"}]},
]
result = wn.extract_themes(issues)
assert len(result["layers"]) == 2
layer_names = {layer["name"] for layer in result["layers"]}
assert "triage" in layer_names
assert "tests" in layer_names
def test_extract_type_labels(self):
"""Extract type labels (bug/feature/etc)."""
issues = [
{"labels": [{"name": "bug"}]},
{"labels": [{"name": "feature"}]},
{"labels": [{"name": "bug"}]},
]
result = wn.extract_themes(issues)
type_names = {t_type["name"] for t_type in result["types"]}
assert "bug" in type_names
assert "feature" in type_names
def test_empty_issues(self):
"""Handle empty issue list."""
result = wn.extract_themes([])
assert result["layers"] == []
assert result["types"] == []
assert result["top_labels"] == []
class TestExtractAgentContributions:
"""Test agent contribution extraction."""
def test_extract_assignees(self):
"""Extract assignee counts."""
issues = [
{"assignee": {"login": "kimi"}},
{"assignee": {"login": "hermes"}},
{"assignee": {"login": "kimi"}},
]
result = wn.extract_agent_contributions(issues, [], [])
assert len(result["active_assignees"]) == 2
assignee_logins = {a["login"] for a in result["active_assignees"]} # noqa: E741
assert "kimi" in assignee_logins
assert "hermes" in assignee_logins
def test_extract_pr_authors(self):
"""Extract PR author counts."""
prs = [
{"user": {"login": "kimi"}},
{"user": {"login": "claude"}},
{"user": {"login": "kimi"}},
]
result = wn.extract_agent_contributions([], prs, [])
assert len(result["pr_authors"]) == 2
def test_kimi_mentions_in_cycles(self):
"""Count Kimi mentions in cycle notes."""
cycles = [
{"notes": "Kimi did great work", "reason": ""},
{"notes": "", "reason": "Kimi timeout"},
{"notes": "All good", "reason": ""},
]
result = wn.extract_agent_contributions([], [], cycles)
assert result["kimi_mentioned_cycles"] == 2
class TestAnalyzeTestShifts:
"""Test test pattern analysis."""
def test_no_cycles(self):
"""Handle no cycle data."""
result = wn.analyze_test_shifts([])
assert "note" in result
def test_test_metrics(self):
"""Calculate test metrics from cycles."""
cycles = [
{"tests_passed": 100, "tests_added": 5},
{"tests_passed": 150, "tests_added": 3},
]
result = wn.analyze_test_shifts(cycles)
assert result["total_tests_passed"] == 250
assert result["total_tests_added"] == 8
class TestGenerateVibeSummary:
"""Test vibe summary generation."""
def test_productive_vibe(self):
"""High success rate and activity = productive vibe."""
cycles_data = {"success_rate": 0.95, "successes": 10, "failures": 1}
issues_data = {"closed_count": 5}
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
assert result["overall"] == "productive"
assert "strong week" in result["description"].lower()
def test_struggling_vibe(self):
"""More failures than successes = struggling vibe."""
cycles_data = {"success_rate": 0.3, "successes": 3, "failures": 7}
issues_data = {"closed_count": 0}
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
assert result["overall"] == "struggling"
def test_quiet_vibe(self):
"""Low activity = quiet vibe."""
cycles_data = {"success_rate": 0.0, "successes": 0, "failures": 0}
issues_data = {"closed_count": 0}
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
assert result["overall"] == "quiet"
class TestGenerateMarkdownSummary:
"""Test markdown summary generation."""
def test_includes_header(self):
"""Markdown includes header."""
narrative = {
"period": {"start": "2026-03-14T00:00:00", "end": "2026-03-21T00:00:00"},
"vibe": {"overall": "productive", "description": "Good week"},
"activity": {
"cycles": {"total": 10, "successes": 9, "failures": 1},
"issues": {"closed": 5, "opened": 3},
"pull_requests": {"merged": 4, "opened": 2},
},
}
result = wn.generate_markdown_summary(narrative)
assert "# Weekly Narrative Summary" in result
assert "productive" in result.lower()
assert "10 total" in result or "10" in result
def test_includes_focus_areas(self):
"""Markdown includes focus areas when present."""
narrative = {
"period": {"start": "2026-03-14", "end": "2026-03-21"},
"vibe": {
"overall": "productive",
"description": "Good week",
"focus_areas": ["triage (5 items)", "tests (3 items)"],
},
"activity": {
"cycles": {"total": 0, "successes": 0, "failures": 0},
"issues": {"closed": 0, "opened": 0},
"pull_requests": {"merged": 0, "opened": 0},
},
}
result = wn.generate_markdown_summary(narrative)
assert "Focus Areas" in result
assert "triage" in result
class TestConfigLoading:
"""Test configuration loading."""
def test_default_config(self, tmp_path):
"""Default config when manifest missing."""
with patch.object(wn, "CONFIG_PATH", tmp_path / "nonexistent.json"):
config = wn.load_automation_config()
assert config["lookback_days"] == 7
assert config["enabled"] is True
def test_environment_override(self, tmp_path):
"""Environment variables override config."""
with patch.dict("os.environ", {"TIMMY_WEEKLY_NARRATIVE_ENABLED": "false"}):
with patch.object(wn, "CONFIG_PATH", tmp_path / "nonexistent.json"):
config = wn.load_automation_config()
assert config["enabled"] is False
class TestMain:
"""Test main function."""
def test_disabled_exits_cleanly(self, tmp_path):
"""When disabled and no --force, exits cleanly."""
with patch.object(wn, "REPO_ROOT", tmp_path):
with patch.object(wn, "load_automation_config", return_value={"enabled": False}):
with patch("sys.argv", ["weekly_narrative"]):
result = wn.main()
assert result == 0
def test_force_runs_when_disabled(self, tmp_path):
"""--force runs even when disabled."""
# Setup minimal structure
(tmp_path / ".loop" / "retro").mkdir(parents=True)
with patch.object(wn, "REPO_ROOT", tmp_path):
with patch.object(
wn,
"load_automation_config",
return_value={
"enabled": False,
"lookback_days": 7,
"gitea_api": "http://localhost:3000/api/v1",
"repo_slug": "test/repo",
"token_file": "~/.hermes/gitea_token",
},
):
with patch.object(wn, "GiteaClient") as mock_client:
mock_instance = MagicMock()
mock_instance.is_available.return_value = False
mock_client.return_value = mock_instance
with patch("sys.argv", ["weekly_narrative", "--force"]):
result = wn.main()
# Should complete without error even though Gitea unavailable
assert result == 0
class TestGiteaClient:
"""Test Gitea API client."""
def test_is_available_when_unavailable(self):
"""is_available returns False when server down."""
config = {"gitea_api": "http://localhost:99999", "repo_slug": "test/repo"}
client = wn.GiteaClient(config, None)
# Should return False without raising
assert client.is_available() is False
def test_headers_with_token(self):
"""Headers include Authorization when token provided."""
config = {"gitea_api": "http://localhost:3000", "repo_slug": "test/repo"}
client = wn.GiteaClient(config, "test-token")
headers = client._headers()
assert headers["Authorization"] == "token test-token"
def test_headers_without_token(self):
"""Headers don't include Authorization when no token."""
config = {"gitea_api": "http://localhost:3000", "repo_slug": "test/repo"}
client = wn.GiteaClient(config, None)
headers = client._headers()
assert "Authorization" not in headers

View File

@@ -0,0 +1,294 @@
"""Unit tests for the stress detector module.
Tests stress signal calculation, mode detection, multipliers,
and integration with the quest system.
"""
from __future__ import annotations
import pytest
from timmy.stress_detector import (
StressMode,
StressSignal,
StressSnapshot,
StressThresholds,
_calculate_stress_score,
_get_multipliers_for_mode,
apply_multiplier,
get_default_config,
reset_stress_state,
)
@pytest.fixture(autouse=True)
def clean_stress_state():
"""Reset stress state between tests."""
reset_stress_state()
yield
reset_stress_state()
# ── Stress Mode Tests ──────────────────────────────────────────────────────
class TestStressMode:
def test_stress_mode_values(self):
"""StressMode enum has expected values."""
assert StressMode.CALM.value == "calm"
assert StressMode.ELEVATED.value == "elevated"
assert StressMode.HIGH.value == "high"
# ── Stress Signal Tests ────────────────────────────────────────────────────
class TestStressSignal:
def test_signal_not_triggered(self):
"""Signal with value below threshold is not triggered."""
signal = StressSignal(
name="test_signal",
value=5.0,
threshold=10.0,
weight=0.5,
)
assert not signal.is_triggered
assert signal.contribution == 0.0
def test_signal_triggered(self):
"""Signal with value at threshold is triggered."""
signal = StressSignal(
name="test_signal",
value=10.0,
threshold=10.0,
weight=0.5,
)
assert signal.is_triggered
assert signal.contribution == 0.5 # weight * min(1, value/threshold)
def test_signal_contribution_capped(self):
"""Signal contribution is capped at weight when value >> threshold."""
signal = StressSignal(
name="test_signal",
value=100.0,
threshold=10.0,
weight=0.5,
)
assert signal.is_triggered
assert signal.contribution == 0.5 # Capped at weight
def test_signal_partial_contribution(self):
"""Signal contribution scales with value/threshold ratio."""
signal = StressSignal(
name="test_signal",
value=15.0,
threshold=10.0,
weight=0.5,
)
assert signal.is_triggered
# contribution = min(1, 15/10) * 0.5 = 0.5 (capped)
assert signal.contribution == 0.5
# ── Stress Thresholds Tests ────────────────────────────────────────────────
class TestStressThresholds:
def test_calm_mode(self):
"""Score below elevated_min returns CALM mode."""
thresholds = StressThresholds(elevated_min=0.3, high_min=0.6)
assert thresholds.get_mode_for_score(0.0) == StressMode.CALM
assert thresholds.get_mode_for_score(0.1) == StressMode.CALM
assert thresholds.get_mode_for_score(0.29) == StressMode.CALM
def test_elevated_mode(self):
"""Score between elevated_min and high_min returns ELEVATED mode."""
thresholds = StressThresholds(elevated_min=0.3, high_min=0.6)
assert thresholds.get_mode_for_score(0.3) == StressMode.ELEVATED
assert thresholds.get_mode_for_score(0.5) == StressMode.ELEVATED
assert thresholds.get_mode_for_score(0.59) == StressMode.ELEVATED
def test_high_mode(self):
"""Score at or above high_min returns HIGH mode."""
thresholds = StressThresholds(elevated_min=0.3, high_min=0.6)
assert thresholds.get_mode_for_score(0.6) == StressMode.HIGH
assert thresholds.get_mode_for_score(0.8) == StressMode.HIGH
assert thresholds.get_mode_for_score(1.0) == StressMode.HIGH
# ── Stress Score Calculation Tests ─────────────────────────────────────────
class TestStressScoreCalculation:
def test_empty_signals(self):
"""Empty signal list returns zero stress score."""
score = _calculate_stress_score([])
assert score == 0.0
def test_no_triggered_signals(self):
"""No triggered signals means zero stress score."""
signals = [
StressSignal(name="s1", value=1.0, threshold=10.0, weight=0.5),
StressSignal(name="s2", value=2.0, threshold=10.0, weight=0.5),
]
score = _calculate_stress_score(signals)
assert score == 0.0
def test_single_triggered_signal(self):
"""Single triggered signal contributes its weight."""
signals = [
StressSignal(name="s1", value=10.0, threshold=10.0, weight=0.5),
]
score = _calculate_stress_score(signals)
# contribution = 0.5, total_weight = 0.5, score = 0.5/0.5 = 1.0
assert score == 1.0
def test_mixed_signals(self):
"""Mix of triggered and non-triggered signals."""
signals = [
StressSignal(name="s1", value=10.0, threshold=10.0, weight=0.3),
StressSignal(name="s2", value=1.0, threshold=10.0, weight=0.3),
StressSignal(name="s3", value=10.0, threshold=10.0, weight=0.4),
]
score = _calculate_stress_score(signals)
# triggered contributions: 0.3 + 0.4 = 0.7
# total_weight: 0.3 + 0.3 + 0.4 = 1.0
# score = 0.7 / 1.0 = 0.7
assert score == 0.7
def test_score_capped_at_one(self):
"""Stress score is capped at 1.0."""
signals = [
StressSignal(name="s1", value=100.0, threshold=10.0, weight=1.0),
StressSignal(name="s2", value=100.0, threshold=10.0, weight=1.0),
]
score = _calculate_stress_score(signals)
assert score == 1.0 # Capped
# ── Multiplier Tests ───────────────────────────────────────────────────────
class TestMultipliers:
def test_default_config_structure(self):
"""Default config has expected structure."""
config = get_default_config()
assert "thresholds" in config
assert "signals" in config
assert "multipliers" in config
def test_calm_mode_multipliers(self):
"""Calm mode has expected multipliers."""
multipliers = _get_multipliers_for_mode(StressMode.CALM)
assert multipliers["test_improve"] == 1.0
assert multipliers["docs_update"] == 1.2
assert multipliers["exploration"] == 1.3
assert multipliers["refactor"] == 1.2
def test_elevated_mode_multipliers(self):
"""Elevated mode has expected multipliers."""
multipliers = _get_multipliers_for_mode(StressMode.ELEVATED)
assert multipliers["test_improve"] == 1.2
assert multipliers["issue_reduce"] == 1.1
assert multipliers["refactor"] == 0.9
def test_high_mode_multipliers(self):
"""High stress mode has expected multipliers."""
multipliers = _get_multipliers_for_mode(StressMode.HIGH)
assert multipliers["test_improve"] == 1.5
assert multipliers["issue_reduce"] == 1.4
assert multipliers["exploration"] == 0.7
assert multipliers["refactor"] == 0.6
def test_multiplier_fallback_for_unknown_type(self):
"""Unknown quest types return default multiplier of 1.0."""
multipliers = _get_multipliers_for_mode(StressMode.CALM)
assert multipliers.get("unknown_type", 1.0) == 1.0
# ── Apply Multiplier Tests ─────────────────────────────────────────────────
class TestApplyMultiplier:
def test_apply_multiplier_calm(self):
"""Multiplier applies correctly in calm mode."""
# This test uses get_multiplier which reads from current stress mode
# Since we can't easily mock the stress mode, we test the apply_multiplier logic
base = 100
# In calm mode with test_improve = 1.0
result = apply_multiplier(base, "unknown_type")
assert result >= 1 # At least 1 token
def test_apply_multiplier_minimum_one(self):
"""Applied reward is at least 1 token."""
# Even with very low multiplier, result should be >= 1
result = apply_multiplier(1, "any_type")
assert result >= 1
# ── Stress Snapshot Tests ──────────────────────────────────────────────────
class TestStressSnapshot:
def test_snapshot_to_dict(self):
"""Snapshot can be converted to dictionary."""
signals = [
StressSignal(name="test", value=10.0, threshold=5.0, weight=0.5),
]
snapshot = StressSnapshot(
mode=StressMode.ELEVATED,
score=0.5,
signals=signals,
multipliers={"test_improve": 1.2},
)
data = snapshot.to_dict()
assert data["mode"] == "elevated"
assert data["score"] == 0.5
assert len(data["signals"]) == 1
assert data["multipliers"]["test_improve"] == 1.2
# ── Integration Tests ──────────────────────────────────────────────────────
class TestStressDetectorIntegration:
def test_reset_stress_state(self):
"""Reset clears internal state."""
# Just verify reset doesn't error
reset_stress_state()
def test_default_config_contains_all_signals(self):
"""Default config defines all expected signals."""
config = get_default_config()
signals = config["signals"]
expected_signals = [
"flaky_test_rate",
"p1_backlog_growth",
"ci_failure_rate",
"open_bug_count",
]
for signal in expected_signals:
assert signal in signals
assert "threshold" in signals[signal]
assert "weight" in signals[signal]
def test_default_config_contains_all_modes(self):
"""Default config defines all stress modes."""
config = get_default_config()
multipliers = config["multipliers"]
assert "calm" in multipliers
assert "elevated" in multipliers
assert "high" in multipliers
def test_multiplier_weights_sum_approximately_one(self):
"""Signal weights should approximately sum to 1.0."""
config = get_default_config()
signals = config["signals"]
total_weight = sum(s["weight"] for s in signals.values())
# Allow some flexibility but should be close to 1.0
assert 0.9 <= total_weight <= 1.1

View File

@@ -228,27 +228,6 @@
"max_items": 5
},
"outputs": []
},
{
"id": "weekly_narrative",
"name": "Weekly Narrative Summary",
"description": "Generates a human-readable weekly summary of work themes, agent contributions, and token economy shifts",
"script": "timmy_automations/daily_run/weekly_narrative.py",
"category": "daily_run",
"enabled": true,
"trigger": "scheduled",
"schedule": "weekly",
"executable": "python3",
"config": {
"lookback_days": 7,
"output_file": ".loop/weekly_narrative.json",
"gitea_api": "http://localhost:3000/api/v1",
"repo_slug": "rockachopa/Timmy-time-dashboard"
},
"outputs": [
".loop/weekly_narrative.json",
".loop/weekly_narrative.md"
]
}
]
}

View File

@@ -17,10 +17,6 @@
"manual": {
"description": "Run on-demand only",
"automations": ["agent_workspace", "kimi_bootstrap", "kimi_resume", "backfill_retro"]
},
"weekly": {
"description": "Run once per week (Sundays)",
"automations": ["weekly_narrative"]
}
},
"triggers": {

View File

@@ -1,745 +0,0 @@
#!/usr/bin/env python3
"""Weekly narrative summary generator — human-readable loop analysis.
Analyzes the past week's activity across the development loop to produce
a narrative summary of:
- What changed (themes, areas of focus)
- How agents and Timmy contributed
- Any shifts in tests, triage, or token economy
The output is designed to be skimmable — a quick read that gives context
on the week's progress without drowning in metrics.
Run: python3 timmy_automations/daily_run/weekly_narrative.py [--json]
Env: See timmy_automations/config/automations.json for configuration
Refs: #719
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from collections import Counter
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
# ── Configuration ─────────────────────────────────────────────────────────
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
CONFIG_PATH = Path(__file__).parent.parent / "config" / "automations.json"
DEFAULT_CONFIG = {
"gitea_api": "http://localhost:3000/api/v1",
"repo_slug": "rockachopa/Timmy-time-dashboard",
"token_file": "~/.hermes/gitea_token",
"lookback_days": 7,
"output_file": ".loop/weekly_narrative.json",
"enabled": True,
}
# ── Data Loading ───────────────────────────────────────────────────────────
def load_automation_config() -> dict:
"""Load configuration for weekly_narrative from automations manifest."""
config = DEFAULT_CONFIG.copy()
if CONFIG_PATH.exists():
try:
manifest = json.loads(CONFIG_PATH.read_text())
for auto in manifest.get("automations", []):
if auto.get("id") == "weekly_narrative":
config.update(auto.get("config", {}))
config["enabled"] = auto.get("enabled", True)
break
except (json.JSONDecodeError, OSError) as exc:
print(f"[weekly_narrative] Warning: Could not load config: {exc}", file=sys.stderr)
# Environment variable overrides
if os.environ.get("TIMMY_GITEA_API"):
config["gitea_api"] = os.environ.get("TIMMY_GITEA_API")
if os.environ.get("TIMMY_REPO_SLUG"):
config["repo_slug"] = os.environ.get("TIMMY_REPO_SLUG")
if os.environ.get("TIMMY_GITEA_TOKEN"):
config["token"] = os.environ.get("TIMMY_GITEA_TOKEN")
if os.environ.get("TIMMY_WEEKLY_NARRATIVE_ENABLED"):
config["enabled"] = os.environ.get("TIMMY_WEEKLY_NARRATIVE_ENABLED", "true").lower() == "true"
return config
def get_token(config: dict) -> str | None:
"""Get Gitea token from environment or file."""
if "token" in config:
return config["token"]
token_file = Path(config["token_file"]).expanduser()
if token_file.exists():
return token_file.read_text().strip()
return None
def load_jsonl(path: Path) -> list[dict]:
"""Load a JSONL file, skipping bad lines."""
if not path.exists():
return []
entries = []
for line in path.read_text().strip().splitlines():
try:
entries.append(json.loads(line))
except (json.JSONDecodeError, ValueError):
continue
return entries
def parse_ts(ts_str: str) -> datetime | None:
"""Parse an ISO timestamp, tolerating missing tz."""
if not ts_str:
return None
try:
dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=UTC)
return dt
except (ValueError, TypeError):
return None
# ── Gitea API Client ───────────────────────────────────────────────────────
class GiteaClient:
"""Simple Gitea API client with graceful degradation."""
def __init__(self, config: dict, token: str | None):
self.api_base = config["gitea_api"].rstrip("/")
self.repo_slug = config["repo_slug"]
self.token = token
self._available: bool | None = None
def _headers(self) -> dict:
headers = {"Accept": "application/json"}
if self.token:
headers["Authorization"] = f"token {self.token}"
return headers
def _api_url(self, path: str) -> str:
return f"{self.api_base}/repos/{self.repo_slug}/{path}"
def is_available(self) -> bool:
"""Check if Gitea API is reachable."""
if self._available is not None:
return self._available
try:
req = Request(
f"{self.api_base}/version",
headers=self._headers(),
method="GET",
)
with urlopen(req, timeout=5) as resp:
self._available = resp.status == 200
return self._available
except (HTTPError, URLError, TimeoutError):
self._available = False
return False
def get_paginated(self, path: str, params: dict | None = None) -> list:
"""Fetch all pages of a paginated endpoint."""
all_items = []
page = 1
limit = 50
while True:
url = self._api_url(path)
query_parts = [f"limit={limit}", f"page={page}"]
if params:
for key, val in params.items():
query_parts.append(f"{key}={val}")
url = f"{url}?{'&'.join(query_parts)}"
req = Request(url, headers=self._headers(), method="GET")
with urlopen(req, timeout=15) as resp:
batch = json.loads(resp.read())
if not batch:
break
all_items.extend(batch)
if len(batch) < limit:
break
page += 1
return all_items
# ── Data Collection ────────────────────────────────────────────────────────
def collect_cycles_data(since: datetime) -> dict:
"""Load cycle retrospective data from the lookback period."""
cycles_file = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
if not cycles_file.exists():
return {"cycles": [], "total": 0, "successes": 0, "failures": 0}
entries = load_jsonl(cycles_file)
recent = []
for e in entries:
ts = parse_ts(e.get("timestamp", ""))
if ts and ts >= since:
recent.append(e)
successes = [e for e in recent if e.get("success")]
failures = [e for e in recent if not e.get("success")]
return {
"cycles": recent,
"total": len(recent),
"successes": len(successes),
"failures": len(failures),
"success_rate": round(len(successes) / len(recent), 2) if recent else 0,
}
def collect_issues_data(client: GiteaClient, since: datetime) -> dict:
"""Collect issue activity from Gitea."""
if not client.is_available():
return {"error": "Gitea unavailable", "issues": [], "closed": [], "opened": []}
try:
issues = client.get_paginated("issues", {"state": "all", "sort": "updated", "limit": 100})
except (HTTPError, URLError) as exc:
return {"error": str(exc), "issues": [], "closed": [], "opened": []}
touched = []
closed = []
opened = []
for issue in issues:
updated_at = issue.get("updated_at", "")
created_at = issue.get("created_at", "")
updated = parse_ts(updated_at)
created = parse_ts(created_at)
if updated and updated >= since:
touched.append(issue)
if issue.get("state") == "closed":
closed_at = issue.get("closed_at", "")
closed_dt = parse_ts(closed_at)
if closed_dt and closed_dt >= since:
closed.append(issue)
elif created and created >= since:
opened.append(issue)
return {
"issues": touched,
"closed": closed,
"opened": opened,
"touched_count": len(touched),
"closed_count": len(closed),
"opened_count": len(opened),
}
def collect_prs_data(client: GiteaClient, since: datetime) -> dict:
"""Collect PR activity from Gitea."""
if not client.is_available():
return {"error": "Gitea unavailable", "prs": [], "merged": [], "opened": []}
try:
prs = client.get_paginated("pulls", {"state": "all", "sort": "updated", "limit": 100})
except (HTTPError, URLError) as exc:
return {"error": str(exc), "prs": [], "merged": [], "opened": []}
touched = []
merged = []
opened = []
for pr in prs:
updated_at = pr.get("updated_at", "")
created_at = pr.get("created_at", "")
merged_at = pr.get("merged_at", "")
updated = parse_ts(updated_at)
created = parse_ts(created_at)
merged_dt = parse_ts(merged_at) if merged_at else None
if updated and updated >= since:
touched.append(pr)
if pr.get("merged") and merged_dt and merged_dt >= since:
merged.append(pr)
elif created and created >= since:
opened.append(pr)
return {
"prs": touched,
"merged": merged,
"opened": opened,
"touched_count": len(touched),
"merged_count": len(merged),
"opened_count": len(opened),
}
def collect_triage_data(since: datetime) -> dict:
"""Load triage and introspection data."""
triage_file = REPO_ROOT / ".loop" / "retro" / "triage.jsonl"
insights_file = REPO_ROOT / ".loop" / "retro" / "insights.json"
triage_entries = load_jsonl(triage_file)
recent_triage = [
e for e in triage_entries
if parse_ts(e.get("timestamp", "")) and parse_ts(e.get("timestamp", "")) >= since
]
insights = {}
if insights_file.exists():
try:
insights = json.loads(insights_file.read_text())
except (json.JSONDecodeError, OSError):
pass
return {
"triage_runs": len(recent_triage),
"triage_entries": recent_triage,
"latest_insights": insights,
}
def collect_token_data(since: datetime) -> dict:
"""Load token economy data from the lightning ledger."""
# The ledger is in-memory but we can look for any persisted data
# For now, return placeholder that will be filled by the ledger module
return {
"note": "Token economy data is ephemeral — check dashboard for live metrics",
"balance_sats": 0, # Placeholder
"transactions_week": 0,
}
# ── Analysis Functions ─────────────────────────────────────────────────────
def extract_themes(issues: list[dict]) -> list[dict]:
"""Extract themes from issue labels."""
label_counts = Counter()
layer_counts = Counter()
type_counts = Counter()
for issue in issues:
for label in issue.get("labels", []):
name = label.get("name", "")
label_counts[name] += 1
if name.startswith("layer:"):
layer_counts[name.replace("layer:", "")] += 1
if name in ("bug", "feature", "refactor", "docs", "test", "chore"):
type_counts[name] += 1
# Top themes (labels excluding layer prefixes)
themes = [
{"name": name, "count": count}
for name, count in label_counts.most_common(10)
if not name.startswith(("layer:", "size:"))
]
# Layers
layers = [
{"name": name, "count": count}
for name, count in layer_counts.most_common()
]
# Types
types = [
{"name": name, "count": count}
for name, count in type_counts.most_common()
]
return {
"top_labels": themes,
"layers": layers,
"types": types,
}
def extract_agent_contributions(issues: list[dict], prs: list[dict], cycles: list[dict]) -> dict:
"""Extract agent contribution patterns."""
# Count by assignee
assignee_counts = Counter()
for issue in issues:
assignee = issue.get("assignee")
if assignee and isinstance(assignee, dict):
assignee_counts[assignee.get("login", "unknown")] += 1
# Count PR authors
pr_authors = Counter()
for pr in prs:
user = pr.get("user")
if user and isinstance(user, dict):
pr_authors[user.get("login", "unknown")] += 1
# Check for Kimi mentions in cycle notes
kimi_mentions = sum(
1 for c in cycles
if "kimi" in c.get("notes", "").lower() or "kimi" in c.get("reason", "").lower()
)
return {
"active_assignees": [
{"login": login, "issues_count": count}
for login, count in assignee_counts.most_common()
],
"pr_authors": [
{"login": login, "prs_count": count}
for login, count in pr_authors.most_common()
],
"kimi_mentioned_cycles": kimi_mentions,
}
def analyze_test_shifts(cycles: list[dict]) -> dict:
"""Analyze shifts in test patterns."""
if not cycles:
return {"note": "No cycle data available"}
total_tests_passed = sum(c.get("tests_passed", 0) for c in cycles)
total_tests_added = sum(c.get("tests_added", 0) for c in cycles)
avg_tests_per_cycle = round(total_tests_passed / len(cycles), 1) if cycles else 0
# Look for test-related issues
test_focused = [
c for c in cycles
if c.get("type") == "test" or "test" in c.get("notes", "").lower()
]
return {
"total_tests_passed": total_tests_passed,
"total_tests_added": total_tests_added,
"avg_tests_per_cycle": avg_tests_per_cycle,
"test_focused_cycles": len(test_focused),
}
def analyze_triage_shifts(triage_data: dict) -> dict:
"""Analyze shifts in triage patterns."""
insights = triage_data.get("latest_insights", {})
recommendations = insights.get("recommendations", [])
high_priority_recs = [
r for r in recommendations
if r.get("severity") == "high"
]
return {
"triage_runs": triage_data.get("triage_runs", 0),
"insights_generated": insights.get("generated_at") is not None,
"high_priority_recommendations": len(high_priority_recs),
"recent_recommendations": recommendations[:3] if recommendations else [],
}
def generate_vibe_summary(
cycles_data: dict,
issues_data: dict,
prs_data: dict,
themes: dict,
agent_contrib: dict,
test_shifts: dict,
triage_shifts: dict,
) -> dict:
"""Generate the human-readable 'vibe' summary."""
# Determine overall vibe
success_rate = cycles_data.get("success_rate", 0)
failures = cycles_data.get("failures", 0)
closed_count = issues_data.get("closed_count", 0)
merged_count = prs_data.get("merged_count", 0)
if success_rate >= 0.9 and closed_count > 0:
vibe = "productive"
vibe_description = "A strong week with solid delivery and healthy success rates."
elif success_rate >= 0.7:
vibe = "steady"
vibe_description = "Steady progress with some bumps. Things are moving forward."
elif failures > cycles_data.get("successes", 0):
vibe = "struggling"
vibe_description = "A challenging week with more failures than successes. Time to regroup."
else:
vibe = "quiet"
vibe_description = "A lighter week with limited activity."
# Focus areas from themes
focus_areas = []
for layer in themes.get("layers", [])[:3]:
focus_areas.append(f"{layer['name']} ({layer['count']} items)")
# Agent activity summary
agent_summary = ""
active_assignees = agent_contrib.get("active_assignees", [])
if active_assignees:
top_agent = active_assignees[0]
agent_summary = f"{top_agent['login']} led with {top_agent['issues_count']} assigned issues."
# Notable events
notable = []
if merged_count > 5:
notable.append(f"{merged_count} PRs merged — high integration velocity")
if triage_shifts.get("high_priority_recommendations", 0) > 0:
notable.append("High-priority recommendations from loop introspection")
if test_shifts.get("test_focused_cycles", 0) > 3:
notable.append("Strong test coverage focus")
if not notable:
notable.append("Regular development flow")
return {
"overall": vibe,
"description": vibe_description,
"focus_areas": focus_areas,
"agent_summary": agent_summary,
"notable_events": notable,
}
# ── Narrative Generation ───────────────────────────────────────────────────
def generate_narrative(
cycles_data: dict,
issues_data: dict,
prs_data: dict,
triage_data: dict,
themes: dict,
agent_contrib: dict,
test_shifts: dict,
triage_shifts: dict,
token_data: dict,
since: datetime,
until: datetime,
) -> dict:
"""Generate the complete weekly narrative."""
vibe = generate_vibe_summary(
cycles_data, issues_data, prs_data, themes, agent_contrib, test_shifts, triage_shifts
)
return {
"generated_at": datetime.now(UTC).isoformat(),
"period": {
"start": since.isoformat(),
"end": until.isoformat(),
"days": 7,
},
"vibe": vibe,
"activity": {
"cycles": {
"total": cycles_data.get("total", 0),
"successes": cycles_data.get("successes", 0),
"failures": cycles_data.get("failures", 0),
"success_rate": cycles_data.get("success_rate", 0),
},
"issues": {
"touched": issues_data.get("touched_count", 0),
"closed": issues_data.get("closed_count", 0),
"opened": issues_data.get("opened_count", 0),
},
"pull_requests": {
"touched": prs_data.get("touched_count", 0),
"merged": prs_data.get("merged_count", 0),
"opened": prs_data.get("opened_count", 0),
},
},
"themes": themes,
"agents": agent_contrib,
"test_health": test_shifts,
"triage_health": triage_shifts,
"token_economy": token_data,
}
def generate_markdown_summary(narrative: dict) -> str:
"""Generate a human-readable markdown summary."""
vibe = narrative.get("vibe", {})
activity = narrative.get("activity", {})
cycles = activity.get("cycles", {})
issues = activity.get("issues", {})
prs = activity.get("pull_requests", {})
lines = [
"# Weekly Narrative Summary",
"",
f"**Period:** {narrative['period']['start'][:10]} to {narrative['period']['end'][:10]}",
f"**Vibe:** {vibe.get('overall', 'unknown').title()}",
"",
f"{vibe.get('description', '')}",
"",
"## Activity Highlights",
"",
f"- **Development Cycles:** {cycles.get('total', 0)} total ({cycles.get('successes', 0)} success, {cycles.get('failures', 0)} failure)",
f"- **Issues:** {issues.get('closed', 0)} closed, {issues.get('opened', 0)} opened",
f"- **Pull Requests:** {prs.get('merged', 0)} merged, {prs.get('opened', 0)} opened",
"",
]
# Focus areas
focus = vibe.get("focus_areas", [])
if focus:
lines.append("## Focus Areas")
lines.append("")
for area in focus:
lines.append(f"- {area}")
lines.append("")
# Agent contributions
agent_summary = vibe.get("agent_summary", "")
if agent_summary:
lines.append("## Agent Activity")
lines.append("")
lines.append(agent_summary)
lines.append("")
# Notable events
notable = vibe.get("notable_events", [])
if notable:
lines.append("## Notable Events")
lines.append("")
for event in notable:
lines.append(f"- {event}")
lines.append("")
# Triage health
triage = narrative.get("triage_health", {})
if triage.get("high_priority_recommendations", 0) > 0:
lines.append("## Triage Notes")
lines.append("")
lines.append(f"⚠️ {triage['high_priority_recommendations']} high-priority recommendation(s) from loop introspection.")
lines.append("")
for rec in triage.get("recent_recommendations", [])[:2]:
lines.append(f"- **{rec.get('category', 'general')}:** {rec.get('finding', '')}")
lines.append("")
return "\n".join(lines)
# ── Main ───────────────────────────────────────────────────────────────────
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Generate weekly narrative summary of work and vibes",
)
p.add_argument(
"--json", "-j",
action="store_true",
help="Output as JSON instead of markdown",
)
p.add_argument(
"--output", "-o",
type=str,
default=None,
help="Output file path (default from config)",
)
p.add_argument(
"--days",
type=int,
default=None,
help="Override lookback days (default 7)",
)
p.add_argument(
"--force",
action="store_true",
help="Run even if disabled in config",
)
return p.parse_args()
def main() -> int:
args = parse_args()
config = load_automation_config()
# Check if enabled
if not config.get("enabled", True) and not args.force:
print("[weekly_narrative] Skipped — weekly narrative is disabled in config")
print("[weekly_narrative] Use --force to run anyway")
return 0
# Determine lookback period
days = args.days if args.days is not None else config.get("lookback_days", 7)
until = datetime.now(UTC)
since = until - timedelta(days=days)
print(f"[weekly_narrative] Generating narrative for the past {days} days...")
# Setup Gitea client
token = get_token(config)
client = GiteaClient(config, token)
if not client.is_available():
print("[weekly_narrative] Warning: Gitea API unavailable — will use local data only")
# Collect data
cycles_data = collect_cycles_data(since)
issues_data = collect_issues_data(client, since)
prs_data = collect_prs_data(client, since)
triage_data = collect_triage_data(since)
token_data = collect_token_data(since)
# Analyze
themes = extract_themes(issues_data.get("issues", []))
agent_contrib = extract_agent_contributions(
issues_data.get("issues", []),
prs_data.get("prs", []),
cycles_data.get("cycles", []),
)
test_shifts = analyze_test_shifts(cycles_data.get("cycles", []))
triage_shifts = analyze_triage_shifts(triage_data)
# Generate narrative
narrative = generate_narrative(
cycles_data,
issues_data,
prs_data,
triage_data,
themes,
agent_contrib,
test_shifts,
triage_shifts,
token_data,
since,
until,
)
# Determine output path
output_path = args.output or config.get("output_file", ".loop/weekly_narrative.json")
output_file = REPO_ROOT / output_path
output_file.parent.mkdir(parents=True, exist_ok=True)
# Write JSON output
output_file.write_text(json.dumps(narrative, indent=2) + "\n")
# Write markdown summary alongside JSON
md_output_file = output_file.with_suffix(".md")
md_output_file.write_text(generate_markdown_summary(narrative))
# Print output
if args.json:
print(json.dumps(narrative, indent=2))
else:
print()
print(generate_markdown_summary(narrative))
print(f"\n[weekly_narrative] Written to: {output_file}")
print(f"[weekly_narrative] Markdown summary: {md_output_file}")
return 0
if __name__ == "__main__":
sys.exit(main())