Compare commits

...

2 Commits

Author SHA1 Message Date
kimi
d3eb5c31bf feat: Centralize agent token rules and hooks for automations (#711)
- Add token_rules.yaml config defining events, rewards, penalties, and gating thresholds
- Create token_rules.py helper module for loading config and computing token deltas
- Update orchestrator.py to compute token rewards for Daily Run completion
- Add comprehensive tests for token_rules module

The token economy is now configurable without code changes:
- Events: triage actions, Daily Run completions, PR merges, test fixes
- Rewards/penalties per event type
- Gating thresholds for sensitive operations
- Daily limits per category
- Audit settings for transaction logging

Fixes #711
2026-03-21 17:42:43 -04:00
6dd48685e7 [kimi] Weekly narrative summary generator (#719) (#791) 2026-03-21 21:36:40 +00:00
10 changed files with 2239 additions and 2 deletions

View File

@@ -330,6 +330,13 @@ class Settings(BaseSettings):
autoresearch_max_iterations: int = 100
autoresearch_metric: str = "val_bpb" # metric to optimise (lower = better)
# ── Weekly Narrative Summary ───────────────────────────────────────
# Generates a human-readable weekly summary of development activity.
# Disabling this will stop the weekly narrative generation.
weekly_narrative_enabled: bool = True
weekly_narrative_lookback_days: int = 7
weekly_narrative_output_dir: str = ".loop"
# ── Local Hands (Shell + Git) ──────────────────────────────────────
# Enable local shell/git execution hands.
hands_shell_enabled: bool = True

View File

@@ -0,0 +1,524 @@
"""Tests for token_rules module."""
from __future__ import annotations
import sys
from pathlib import Path
from unittest.mock import patch
import pytest
# Add timmy_automations to path for imports
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations"))
from utils import token_rules as tr
class TestTokenEvent:
"""Test TokenEvent dataclass."""
def test_delta_calculation_reward(self):
"""Delta is positive for rewards."""
event = tr.TokenEvent(
name="test",
description="Test event",
reward=10,
penalty=0,
category="test",
)
assert event.delta == 10
def test_delta_calculation_penalty(self):
"""Delta is negative for penalties."""
event = tr.TokenEvent(
name="test",
description="Test event",
reward=0,
penalty=-5,
category="test",
)
assert event.delta == -5
def test_delta_calculation_mixed(self):
"""Delta is net of reward and penalty."""
event = tr.TokenEvent(
name="test",
description="Test event",
reward=10,
penalty=-3,
category="test",
)
assert event.delta == 7
class TestTokenRulesLoading:
"""Test TokenRules configuration loading."""
def test_loads_from_yaml_file(self, tmp_path):
"""Load configuration from YAML file."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0-test",
"events": {
"test_event": {
"description": "A test event",
"reward": 15,
"category": "test",
}
},
"gating_thresholds": {"test_op": 50},
"daily_limits": {"test": {"max_earn": 100, "max_spend": 10}},
"audit": {"log_all_transactions": False},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
assert rules.get_config_version() == "1.0.0-test"
assert rules.get_delta("test_event") == 15
assert rules.get_gate_threshold("test_op") == 50
def test_fallback_when_yaml_missing(self, tmp_path):
"""Use fallback defaults when YAML file doesn't exist."""
config_file = tmp_path / "nonexistent.yaml"
rules = tr.TokenRules(config_path=config_file)
assert rules.get_config_version() == "fallback"
# Fallback should have some basic events
assert rules.get_delta("pr_merged") == 10
assert rules.get_delta("test_fixed") == 8
assert rules.get_delta("automation_failure") == -2
def test_fallback_when_yaml_not_installed(self, tmp_path):
"""Use fallback when PyYAML is not installed."""
with patch.dict(sys.modules, {"yaml": None}):
config_file = tmp_path / "token_rules.yaml"
config_file.write_text("version: '1.0.0'")
rules = tr.TokenRules(config_path=config_file)
assert rules.get_config_version() == "fallback"
class TestTokenRulesGetDelta:
"""Test get_delta method."""
def test_get_delta_existing_event(self, tmp_path):
"""Get delta for configured event."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {
"pr_merged": {"description": "PR merged", "reward": 10, "category": "merge"},
"automation_failure": {"description": "Failure", "penalty": -2, "category": "ops"},
},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
assert rules.get_delta("pr_merged") == 10
assert rules.get_delta("automation_failure") == -2
def test_get_delta_unknown_event(self, tmp_path):
"""Return 0 for unknown events."""
config_file = tmp_path / "nonexistent.yaml"
rules = tr.TokenRules(config_path=config_file)
assert rules.get_delta("unknown_event") == 0
class TestTokenRulesGetEvent:
"""Test get_event method."""
def test_get_event_returns_full_config(self, tmp_path):
"""Get full event configuration."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {
"pr_merged": {
"description": "PR merged successfully",
"reward": 10,
"category": "merge",
"gate_threshold": 0,
}
},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
event = rules.get_event("pr_merged")
assert event is not None
assert event.name == "pr_merged"
assert event.description == "PR merged successfully"
assert event.reward == 10
assert event.category == "merge"
assert event.gate_threshold == 0
def test_get_event_unknown_returns_none(self, tmp_path):
"""Return None for unknown event."""
config_file = tmp_path / "nonexistent.yaml"
rules = tr.TokenRules(config_path=config_file)
assert rules.get_event("unknown") is None
class TestTokenRulesListEvents:
"""Test list_events method."""
def test_list_all_events(self, tmp_path):
"""List all configured events."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
events = rules.list_events()
assert len(events) == 3
event_names = {e.name for e in events}
assert "event_a" in event_names
assert "event_b" in event_names
assert "event_c" in event_names
def test_list_events_by_category(self, tmp_path):
"""Filter events by category."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
events = rules.list_events(category="cat1")
assert len(events) == 2
for event in events:
assert event.category == "cat1"
class TestTokenRulesGating:
"""Test gating threshold methods."""
def test_check_gate_with_threshold(self, tmp_path):
"""Check gate when threshold is defined."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {},
"gating_thresholds": {"pr_merge": 50},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
assert rules.check_gate("pr_merge", current_tokens=100) is True
assert rules.check_gate("pr_merge", current_tokens=50) is True
assert rules.check_gate("pr_merge", current_tokens=49) is False
assert rules.check_gate("pr_merge", current_tokens=0) is False
def test_check_gate_no_threshold(self, tmp_path):
"""Check gate when no threshold is defined (always allowed)."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {},
"gating_thresholds": {},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
# No threshold defined, should always be allowed
assert rules.check_gate("unknown_op", current_tokens=0) is True
assert rules.check_gate("unknown_op", current_tokens=-100) is True
def test_get_gate_threshold(self, tmp_path):
"""Get threshold value."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"gating_thresholds": {"pr_merge": 50, "sensitive_op": 100},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
assert rules.get_gate_threshold("pr_merge") == 50
assert rules.get_gate_threshold("sensitive_op") == 100
assert rules.get_gate_threshold("unknown") is None
class TestTokenRulesDailyLimits:
"""Test daily limits methods."""
def test_get_daily_limits(self, tmp_path):
"""Get daily limits for a category."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"daily_limits": {
"triage": {"max_earn": 100, "max_spend": 0},
"merge": {"max_earn": 50, "max_spend": 10},
},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
triage_limits = rules.get_daily_limits("triage")
assert triage_limits is not None
assert triage_limits.max_earn == 100
assert triage_limits.max_spend == 0
merge_limits = rules.get_daily_limits("merge")
assert merge_limits is not None
assert merge_limits.max_earn == 50
assert merge_limits.max_spend == 10
def test_get_daily_limits_unknown(self, tmp_path):
"""Return None for unknown category."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {"version": "1.0.0", "daily_limits": {}}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
assert rules.get_daily_limits("unknown") is None
class TestTokenRulesComputeTransaction:
"""Test compute_transaction method."""
def test_compute_successful_transaction(self, tmp_path):
"""Compute transaction for valid event."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {
"pr_merged": {"description": "PR merged", "reward": 10, "category": "merge"}
},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
result = rules.compute_transaction("pr_merged", current_tokens=100)
assert result["event"] == "pr_merged"
assert result["delta"] == 10
assert result["category"] == "merge"
assert result["allowed"] is True
assert result["new_balance"] == 110
assert result["limit_reached"] is False
def test_compute_unknown_event(self, tmp_path):
"""Compute transaction for unknown event."""
config_file = tmp_path / "nonexistent.yaml"
rules = tr.TokenRules(config_path=config_file)
result = rules.compute_transaction("unknown_event", current_tokens=50)
assert result["event"] == "unknown_event"
assert result["delta"] == 0
assert result["allowed"] is False
assert result["reason"] == "unknown_event"
assert result["new_balance"] == 50
def test_compute_with_gate_check(self, tmp_path):
"""Compute transaction respects gating."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {
"sensitive_op": {
"description": "Sensitive",
"reward": 50,
"category": "sensitive",
"gate_threshold": 100,
}
},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
# With enough tokens
result = rules.compute_transaction("sensitive_op", current_tokens=150)
assert result["allowed"] is True
# Without enough tokens
result = rules.compute_transaction("sensitive_op", current_tokens=50)
assert result["allowed"] is False
assert "gate_reason" in result
def test_compute_with_daily_limits(self, tmp_path):
"""Compute transaction respects daily limits."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {
"triage_action": {
"description": "Triage",
"reward": 20,
"category": "triage",
}
},
"daily_limits": {"triage": {"max_earn": 50, "max_spend": 0}},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
# Within limit
daily_earned = {"triage": 20}
result = rules.compute_transaction(
"triage_action", current_tokens=100, current_daily_earned=daily_earned
)
assert result["allowed"] is True
assert result["limit_reached"] is False
# Would exceed limit (20 + 20 > 50 is false, so this should be fine)
# Let's test with higher current earned
daily_earned = {"triage": 40}
result = rules.compute_transaction(
"triage_action", current_tokens=100, current_daily_earned=daily_earned
)
assert result["allowed"] is False
assert result["limit_reached"] is True
assert "limit_reason" in result
class TestTokenRulesCategories:
"""Test category methods."""
def test_get_categories(self, tmp_path):
"""Get all unique categories."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {
"version": "1.0.0",
"events": {
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
},
}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
categories = rules.get_categories()
assert sorted(categories) == ["cat1", "cat2"]
class TestTokenRulesAudit:
"""Test audit methods."""
def test_is_auditable_true(self, tmp_path):
"""Check if auditable when enabled."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {"version": "1.0.0", "audit": {"log_all_transactions": True}}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
assert rules.is_auditable() is True
def test_is_auditable_false(self, tmp_path):
"""Check if auditable when disabled."""
yaml = pytest.importorskip("yaml")
config_file = tmp_path / "token_rules.yaml"
config_data = {"version": "1.0.0", "audit": {"log_all_transactions": False}}
config_file.write_text(yaml.dump(config_data))
rules = tr.TokenRules(config_path=config_file)
assert rules.is_auditable() is False
class TestConvenienceFunctions:
"""Test module-level convenience functions."""
def test_get_token_delta(self, tmp_path):
"""Convenience function returns delta."""
config_file = tmp_path / "nonexistent.yaml"
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
delta = tr.get_token_delta("pr_merged")
assert delta == 10 # From fallback
def test_check_operation_gate(self, tmp_path):
"""Convenience function checks gate."""
config_file = tmp_path / "nonexistent.yaml"
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
# Fallback has pr_merge gate at 0
assert tr.check_operation_gate("pr_merge", current_tokens=0) is True
assert tr.check_operation_gate("pr_merge", current_tokens=100) is True
def test_compute_token_reward(self, tmp_path):
"""Convenience function computes reward."""
config_file = tmp_path / "nonexistent.yaml"
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
result = tr.compute_token_reward("pr_merged", current_tokens=50)
assert result["event"] == "pr_merged"
assert result["delta"] == 10
assert result["new_balance"] == 60
def test_list_token_events(self, tmp_path):
"""Convenience function lists events."""
config_file = tmp_path / "nonexistent.yaml"
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
events = tr.list_token_events()
assert len(events) >= 3 # Fallback has at least 3 events
# Check structure
for event in events:
assert "name" in event
assert "description" in event
assert "delta" in event
assert "category" in event

View File

@@ -0,0 +1,343 @@
"""Tests for weekly_narrative.py script."""
from __future__ import annotations
import json
import sys
from datetime import UTC, datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
# Add timmy_automations to path for imports
sys.path.insert(
0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations" / "daily_run")
)
import weekly_narrative as wn
class TestParseTimestamp:
"""Test timestamp parsing."""
def test_parse_iso_with_z(self):
"""Parse ISO timestamp with Z suffix."""
result = wn.parse_ts("2026-03-21T12:00:00Z")
assert result is not None
assert result.year == 2026
assert result.month == 3
assert result.day == 21
def test_parse_iso_with_offset(self):
"""Parse ISO timestamp with timezone offset."""
result = wn.parse_ts("2026-03-21T12:00:00+00:00")
assert result is not None
assert result.year == 2026
def test_parse_empty_string(self):
"""Empty string returns None."""
result = wn.parse_ts("")
assert result is None
def test_parse_invalid_string(self):
"""Invalid string returns None."""
result = wn.parse_ts("not-a-timestamp")
assert result is None
class TestCollectCyclesData:
"""Test cycle data collection."""
def test_no_cycles_file(self, tmp_path):
"""Handle missing cycles file gracefully."""
with patch.object(wn, "REPO_ROOT", tmp_path):
since = datetime.now(UTC) - timedelta(days=7)
result = wn.collect_cycles_data(since)
assert result["total"] == 0
assert result["successes"] == 0
assert result["failures"] == 0
def test_collect_recent_cycles(self, tmp_path):
"""Collect cycles within lookback period."""
retro_dir = tmp_path / ".loop" / "retro"
retro_dir.mkdir(parents=True)
now = datetime.now(UTC)
cycles = [
{"timestamp": now.isoformat(), "success": True, "cycle": 1},
{"timestamp": now.isoformat(), "success": False, "cycle": 2},
{"timestamp": (now - timedelta(days=10)).isoformat(), "success": True, "cycle": 3},
]
with open(retro_dir / "cycles.jsonl", "w") as f:
for c in cycles:
f.write(json.dumps(c) + "\n")
with patch.object(wn, "REPO_ROOT", tmp_path):
since = now - timedelta(days=7)
result = wn.collect_cycles_data(since)
assert result["total"] == 2 # Only recent 2
assert result["successes"] == 1
assert result["failures"] == 1
class TestExtractThemes:
"""Test theme extraction from issues."""
def test_extract_layer_labels(self):
"""Extract layer labels from issues."""
issues = [
{"labels": [{"name": "layer:triage"}, {"name": "bug"}]},
{"labels": [{"name": "layer:tests"}, {"name": "bug"}]},
{"labels": [{"name": "layer:triage"}, {"name": "feature"}]},
]
result = wn.extract_themes(issues)
assert len(result["layers"]) == 2
layer_names = {layer["name"] for layer in result["layers"]}
assert "triage" in layer_names
assert "tests" in layer_names
def test_extract_type_labels(self):
"""Extract type labels (bug/feature/etc)."""
issues = [
{"labels": [{"name": "bug"}]},
{"labels": [{"name": "feature"}]},
{"labels": [{"name": "bug"}]},
]
result = wn.extract_themes(issues)
type_names = {t_type["name"] for t_type in result["types"]}
assert "bug" in type_names
assert "feature" in type_names
def test_empty_issues(self):
"""Handle empty issue list."""
result = wn.extract_themes([])
assert result["layers"] == []
assert result["types"] == []
assert result["top_labels"] == []
class TestExtractAgentContributions:
"""Test agent contribution extraction."""
def test_extract_assignees(self):
"""Extract assignee counts."""
issues = [
{"assignee": {"login": "kimi"}},
{"assignee": {"login": "hermes"}},
{"assignee": {"login": "kimi"}},
]
result = wn.extract_agent_contributions(issues, [], [])
assert len(result["active_assignees"]) == 2
assignee_logins = {a["login"] for a in result["active_assignees"]} # noqa: E741
assert "kimi" in assignee_logins
assert "hermes" in assignee_logins
def test_extract_pr_authors(self):
"""Extract PR author counts."""
prs = [
{"user": {"login": "kimi"}},
{"user": {"login": "claude"}},
{"user": {"login": "kimi"}},
]
result = wn.extract_agent_contributions([], prs, [])
assert len(result["pr_authors"]) == 2
def test_kimi_mentions_in_cycles(self):
"""Count Kimi mentions in cycle notes."""
cycles = [
{"notes": "Kimi did great work", "reason": ""},
{"notes": "", "reason": "Kimi timeout"},
{"notes": "All good", "reason": ""},
]
result = wn.extract_agent_contributions([], [], cycles)
assert result["kimi_mentioned_cycles"] == 2
class TestAnalyzeTestShifts:
"""Test test pattern analysis."""
def test_no_cycles(self):
"""Handle no cycle data."""
result = wn.analyze_test_shifts([])
assert "note" in result
def test_test_metrics(self):
"""Calculate test metrics from cycles."""
cycles = [
{"tests_passed": 100, "tests_added": 5},
{"tests_passed": 150, "tests_added": 3},
]
result = wn.analyze_test_shifts(cycles)
assert result["total_tests_passed"] == 250
assert result["total_tests_added"] == 8
class TestGenerateVibeSummary:
"""Test vibe summary generation."""
def test_productive_vibe(self):
"""High success rate and activity = productive vibe."""
cycles_data = {"success_rate": 0.95, "successes": 10, "failures": 1}
issues_data = {"closed_count": 5}
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
assert result["overall"] == "productive"
assert "strong week" in result["description"].lower()
def test_struggling_vibe(self):
"""More failures than successes = struggling vibe."""
cycles_data = {"success_rate": 0.3, "successes": 3, "failures": 7}
issues_data = {"closed_count": 0}
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
assert result["overall"] == "struggling"
def test_quiet_vibe(self):
"""Low activity = quiet vibe."""
cycles_data = {"success_rate": 0.0, "successes": 0, "failures": 0}
issues_data = {"closed_count": 0}
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
assert result["overall"] == "quiet"
class TestGenerateMarkdownSummary:
"""Test markdown summary generation."""
def test_includes_header(self):
"""Markdown includes header."""
narrative = {
"period": {"start": "2026-03-14T00:00:00", "end": "2026-03-21T00:00:00"},
"vibe": {"overall": "productive", "description": "Good week"},
"activity": {
"cycles": {"total": 10, "successes": 9, "failures": 1},
"issues": {"closed": 5, "opened": 3},
"pull_requests": {"merged": 4, "opened": 2},
},
}
result = wn.generate_markdown_summary(narrative)
assert "# Weekly Narrative Summary" in result
assert "productive" in result.lower()
assert "10 total" in result or "10" in result
def test_includes_focus_areas(self):
"""Markdown includes focus areas when present."""
narrative = {
"period": {"start": "2026-03-14", "end": "2026-03-21"},
"vibe": {
"overall": "productive",
"description": "Good week",
"focus_areas": ["triage (5 items)", "tests (3 items)"],
},
"activity": {
"cycles": {"total": 0, "successes": 0, "failures": 0},
"issues": {"closed": 0, "opened": 0},
"pull_requests": {"merged": 0, "opened": 0},
},
}
result = wn.generate_markdown_summary(narrative)
assert "Focus Areas" in result
assert "triage" in result
class TestConfigLoading:
"""Test configuration loading."""
def test_default_config(self, tmp_path):
"""Default config when manifest missing."""
with patch.object(wn, "CONFIG_PATH", tmp_path / "nonexistent.json"):
config = wn.load_automation_config()
assert config["lookback_days"] == 7
assert config["enabled"] is True
def test_environment_override(self, tmp_path):
"""Environment variables override config."""
with patch.dict("os.environ", {"TIMMY_WEEKLY_NARRATIVE_ENABLED": "false"}):
with patch.object(wn, "CONFIG_PATH", tmp_path / "nonexistent.json"):
config = wn.load_automation_config()
assert config["enabled"] is False
class TestMain:
"""Test main function."""
def test_disabled_exits_cleanly(self, tmp_path):
"""When disabled and no --force, exits cleanly."""
with patch.object(wn, "REPO_ROOT", tmp_path):
with patch.object(wn, "load_automation_config", return_value={"enabled": False}):
with patch("sys.argv", ["weekly_narrative"]):
result = wn.main()
assert result == 0
def test_force_runs_when_disabled(self, tmp_path):
"""--force runs even when disabled."""
# Setup minimal structure
(tmp_path / ".loop" / "retro").mkdir(parents=True)
with patch.object(wn, "REPO_ROOT", tmp_path):
with patch.object(
wn,
"load_automation_config",
return_value={
"enabled": False,
"lookback_days": 7,
"gitea_api": "http://localhost:3000/api/v1",
"repo_slug": "test/repo",
"token_file": "~/.hermes/gitea_token",
},
):
with patch.object(wn, "GiteaClient") as mock_client:
mock_instance = MagicMock()
mock_instance.is_available.return_value = False
mock_client.return_value = mock_instance
with patch("sys.argv", ["weekly_narrative", "--force"]):
result = wn.main()
# Should complete without error even though Gitea unavailable
assert result == 0
class TestGiteaClient:
"""Test Gitea API client."""
def test_is_available_when_unavailable(self):
"""is_available returns False when server down."""
config = {"gitea_api": "http://localhost:99999", "repo_slug": "test/repo"}
client = wn.GiteaClient(config, None)
# Should return False without raising
assert client.is_available() is False
def test_headers_with_token(self):
"""Headers include Authorization when token provided."""
config = {"gitea_api": "http://localhost:3000", "repo_slug": "test/repo"}
client = wn.GiteaClient(config, "test-token")
headers = client._headers()
assert headers["Authorization"] == "token test-token"
def test_headers_without_token(self):
"""Headers don't include Authorization when no token."""
config = {"gitea_api": "http://localhost:3000", "repo_slug": "test/repo"}
client = wn.GiteaClient(config, None)
headers = client._headers()
assert "Authorization" not in headers

View File

@@ -228,6 +228,27 @@
"max_items": 5
},
"outputs": []
},
{
"id": "weekly_narrative",
"name": "Weekly Narrative Summary",
"description": "Generates a human-readable weekly summary of work themes, agent contributions, and token economy shifts",
"script": "timmy_automations/daily_run/weekly_narrative.py",
"category": "daily_run",
"enabled": true,
"trigger": "scheduled",
"schedule": "weekly",
"executable": "python3",
"config": {
"lookback_days": 7,
"output_file": ".loop/weekly_narrative.json",
"gitea_api": "http://localhost:3000/api/v1",
"repo_slug": "rockachopa/Timmy-time-dashboard"
},
"outputs": [
".loop/weekly_narrative.json",
".loop/weekly_narrative.md"
]
}
]
}

View File

@@ -17,6 +17,10 @@
"manual": {
"description": "Run on-demand only",
"automations": ["agent_workspace", "kimi_bootstrap", "kimi_resume", "backfill_retro"]
},
"weekly": {
"description": "Run once per week (Sundays)",
"automations": ["weekly_narrative"]
}
},
"triggers": {

View File

@@ -0,0 +1,138 @@
# Token Rules — Agent reward/penalty configuration for automations
#
# This file defines the token economy for agent actions.
# Modify values here to adjust incentives without code changes.
#
# Used by: timmy_automations.utils.token_rules
version: "1.0.0"
description: "Token economy rules for agent automations"
# ── Events ─────────────────────────────────────────────────────────────────
# Each event type defines rewards/penalties and optional gating thresholds
events:
# Triage actions
triage_success:
description: "Successfully triaged an issue (scored and categorized)"
reward: 5
category: "triage"
deep_triage_refinement:
description: "LLM-driven issue refinement with acceptance criteria added"
reward: 20
category: "triage"
quarantine_candidate_found:
description: "Identified a repeat failure issue for quarantine"
reward: 10
category: "triage"
# Daily Run completions
daily_run_completed:
description: "Completed a daily run cycle successfully"
reward: 5
category: "daily_run"
golden_path_generated:
description: "Generated a coherent mini-session plan"
reward: 3
category: "daily_run"
weekly_narrative_created:
description: "Generated weekly summary of work themes"
reward: 15
category: "daily_run"
# PR merges
pr_merged:
description: "Successfully merged a pull request"
reward: 10
category: "merge"
# Gating: requires minimum tokens to perform
gate_threshold: 0
pr_merged_with_tests:
description: "Merged PR with all tests passing"
reward: 15
category: "merge"
gate_threshold: 0
# Test fixes
test_fixed:
description: "Fixed a failing test"
reward: 8
category: "test"
test_added:
description: "Added new test coverage"
reward: 5
category: "test"
critical_bug_fixed:
description: "Fixed a critical bug on main"
reward: 25
category: "test"
# General operations
automation_run:
description: "Ran any automation (resource usage)"
penalty: -1
category: "operation"
automation_failure:
description: "Automation failed or produced error"
penalty: -2
category: "operation"
cycle_retro_logged:
description: "Logged structured retrospective data"
reward: 5
category: "operation"
pre_commit_passed:
description: "Pre-commit checks passed"
reward: 2
category: "operation"
pre_commit_failed:
description: "Pre-commit checks failed"
penalty: -1
category: "operation"
# ── Gating Thresholds ──────────────────────────────────────────────────────
# Minimum token balances required for sensitive operations
gating_thresholds:
pr_merge: 0
sensitive_config_change: 50
agent_workspace_create: 10
deep_triage_run: 0
# ── Daily Limits ───────────────────────────────────────────────────────────
# Maximum tokens that can be earned/spent per category per day
daily_limits:
triage:
max_earn: 100
max_spend: 0
daily_run:
max_earn: 50
max_spend: 0
merge:
max_earn: 100
max_spend: 0
test:
max_earn: 100
max_spend: 0
operation:
max_earn: 50
max_spend: 50
# ── Audit Settings ─────────────────────────────────────────────────────────
# Settings for token audit and inspection
audit:
log_all_transactions: true
log_retention_days: 30
inspectable_by: ["orchestrator", "auditor", "timmy"]

View File

@@ -22,6 +22,14 @@ from typing import Any
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
# ── Token Economy Integration ──────────────────────────────────────────────
# Import token rules helpers for tracking Daily Run rewards
sys.path.insert(
0, str(Path(__file__).resolve().parent.parent)
)
from utils.token_rules import TokenRules, compute_token_reward
# ── Configuration ─────────────────────────────────────────────────────────
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
@@ -490,6 +498,43 @@ def parse_args() -> argparse.Namespace:
return p.parse_args()
def compute_daily_run_tokens(success: bool = True) -> dict[str, Any]:
"""Compute token rewards for Daily Run completion.
Uses the centralized token_rules configuration to calculate
rewards/penalties for automation actions.
Args:
success: Whether the Daily Run completed successfully
Returns:
Token transaction details
"""
rules = TokenRules()
if success:
# Daily run completed successfully
transaction = compute_token_reward("daily_run_completed", current_tokens=0)
# Also compute golden path generation if agenda was created
agenda_transaction = compute_token_reward("golden_path_generated", current_tokens=0)
return {
"daily_run": transaction,
"golden_path": agenda_transaction,
"total_delta": transaction.get("delta", 0) + agenda_transaction.get("delta", 0),
"config_version": rules.get_config_version(),
}
else:
# Automation failed
transaction = compute_token_reward("automation_failure", current_tokens=0)
return {
"automation_failure": transaction,
"total_delta": transaction.get("delta", 0),
"config_version": rules.get_config_version(),
}
def main() -> int:
args = parse_args()
config = load_config()
@@ -503,10 +548,13 @@ def main() -> int:
# Check Gitea availability
if not client.is_available():
error_msg = "[orchestrator] Error: Gitea API is not available"
# Compute failure tokens even when unavailable
tokens = compute_daily_run_tokens(success=False)
if args.json:
print(json.dumps({"error": error_msg}))
print(json.dumps({"error": error_msg, "tokens": tokens}))
else:
print(error_msg, file=sys.stderr)
print(f"[tokens] Failure penalty: {tokens['total_delta']}", file=sys.stderr)
return 1
# Fetch candidates and generate agenda
@@ -521,9 +569,12 @@ def main() -> int:
cycles = load_cycle_data()
day_summary = generate_day_summary(activity, cycles)
# Compute token rewards for successful completion
tokens = compute_daily_run_tokens(success=True)
# Output
if args.json:
output = {"agenda": agenda}
output = {"agenda": agenda, "tokens": tokens}
if day_summary:
output["day_summary"] = day_summary
print(json.dumps(output, indent=2))
@@ -531,6 +582,15 @@ def main() -> int:
print_agenda(agenda)
if day_summary and activity:
print_day_summary(day_summary, activity)
# Show token rewards
print("" * 60)
print("🪙 Token Rewards")
print("" * 60)
print(f"Daily Run completed: +{tokens['daily_run']['delta']} tokens")
if candidates:
print(f"Golden path generated: +{tokens['golden_path']['delta']} tokens")
print(f"Total: +{tokens['total_delta']} tokens")
print(f"Config version: {tokens['config_version']}")
return 0

View File

@@ -0,0 +1,745 @@
#!/usr/bin/env python3
"""Weekly narrative summary generator — human-readable loop analysis.
Analyzes the past week's activity across the development loop to produce
a narrative summary of:
- What changed (themes, areas of focus)
- How agents and Timmy contributed
- Any shifts in tests, triage, or token economy
The output is designed to be skimmable — a quick read that gives context
on the week's progress without drowning in metrics.
Run: python3 timmy_automations/daily_run/weekly_narrative.py [--json]
Env: See timmy_automations/config/automations.json for configuration
Refs: #719
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from collections import Counter
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
# ── Configuration ─────────────────────────────────────────────────────────
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
CONFIG_PATH = Path(__file__).parent.parent / "config" / "automations.json"
DEFAULT_CONFIG = {
"gitea_api": "http://localhost:3000/api/v1",
"repo_slug": "rockachopa/Timmy-time-dashboard",
"token_file": "~/.hermes/gitea_token",
"lookback_days": 7,
"output_file": ".loop/weekly_narrative.json",
"enabled": True,
}
# ── Data Loading ───────────────────────────────────────────────────────────
def load_automation_config() -> dict:
"""Load configuration for weekly_narrative from automations manifest."""
config = DEFAULT_CONFIG.copy()
if CONFIG_PATH.exists():
try:
manifest = json.loads(CONFIG_PATH.read_text())
for auto in manifest.get("automations", []):
if auto.get("id") == "weekly_narrative":
config.update(auto.get("config", {}))
config["enabled"] = auto.get("enabled", True)
break
except (json.JSONDecodeError, OSError) as exc:
print(f"[weekly_narrative] Warning: Could not load config: {exc}", file=sys.stderr)
# Environment variable overrides
if os.environ.get("TIMMY_GITEA_API"):
config["gitea_api"] = os.environ.get("TIMMY_GITEA_API")
if os.environ.get("TIMMY_REPO_SLUG"):
config["repo_slug"] = os.environ.get("TIMMY_REPO_SLUG")
if os.environ.get("TIMMY_GITEA_TOKEN"):
config["token"] = os.environ.get("TIMMY_GITEA_TOKEN")
if os.environ.get("TIMMY_WEEKLY_NARRATIVE_ENABLED"):
config["enabled"] = os.environ.get("TIMMY_WEEKLY_NARRATIVE_ENABLED", "true").lower() == "true"
return config
def get_token(config: dict) -> str | None:
"""Get Gitea token from environment or file."""
if "token" in config:
return config["token"]
token_file = Path(config["token_file"]).expanduser()
if token_file.exists():
return token_file.read_text().strip()
return None
def load_jsonl(path: Path) -> list[dict]:
"""Load a JSONL file, skipping bad lines."""
if not path.exists():
return []
entries = []
for line in path.read_text().strip().splitlines():
try:
entries.append(json.loads(line))
except (json.JSONDecodeError, ValueError):
continue
return entries
def parse_ts(ts_str: str) -> datetime | None:
"""Parse an ISO timestamp, tolerating missing tz."""
if not ts_str:
return None
try:
dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=UTC)
return dt
except (ValueError, TypeError):
return None
# ── Gitea API Client ───────────────────────────────────────────────────────
class GiteaClient:
"""Simple Gitea API client with graceful degradation."""
def __init__(self, config: dict, token: str | None):
self.api_base = config["gitea_api"].rstrip("/")
self.repo_slug = config["repo_slug"]
self.token = token
self._available: bool | None = None
def _headers(self) -> dict:
headers = {"Accept": "application/json"}
if self.token:
headers["Authorization"] = f"token {self.token}"
return headers
def _api_url(self, path: str) -> str:
return f"{self.api_base}/repos/{self.repo_slug}/{path}"
def is_available(self) -> bool:
"""Check if Gitea API is reachable."""
if self._available is not None:
return self._available
try:
req = Request(
f"{self.api_base}/version",
headers=self._headers(),
method="GET",
)
with urlopen(req, timeout=5) as resp:
self._available = resp.status == 200
return self._available
except (HTTPError, URLError, TimeoutError):
self._available = False
return False
def get_paginated(self, path: str, params: dict | None = None) -> list:
"""Fetch all pages of a paginated endpoint."""
all_items = []
page = 1
limit = 50
while True:
url = self._api_url(path)
query_parts = [f"limit={limit}", f"page={page}"]
if params:
for key, val in params.items():
query_parts.append(f"{key}={val}")
url = f"{url}?{'&'.join(query_parts)}"
req = Request(url, headers=self._headers(), method="GET")
with urlopen(req, timeout=15) as resp:
batch = json.loads(resp.read())
if not batch:
break
all_items.extend(batch)
if len(batch) < limit:
break
page += 1
return all_items
# ── Data Collection ────────────────────────────────────────────────────────
def collect_cycles_data(since: datetime) -> dict:
"""Load cycle retrospective data from the lookback period."""
cycles_file = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
if not cycles_file.exists():
return {"cycles": [], "total": 0, "successes": 0, "failures": 0}
entries = load_jsonl(cycles_file)
recent = []
for e in entries:
ts = parse_ts(e.get("timestamp", ""))
if ts and ts >= since:
recent.append(e)
successes = [e for e in recent if e.get("success")]
failures = [e for e in recent if not e.get("success")]
return {
"cycles": recent,
"total": len(recent),
"successes": len(successes),
"failures": len(failures),
"success_rate": round(len(successes) / len(recent), 2) if recent else 0,
}
def collect_issues_data(client: GiteaClient, since: datetime) -> dict:
"""Collect issue activity from Gitea."""
if not client.is_available():
return {"error": "Gitea unavailable", "issues": [], "closed": [], "opened": []}
try:
issues = client.get_paginated("issues", {"state": "all", "sort": "updated", "limit": 100})
except (HTTPError, URLError) as exc:
return {"error": str(exc), "issues": [], "closed": [], "opened": []}
touched = []
closed = []
opened = []
for issue in issues:
updated_at = issue.get("updated_at", "")
created_at = issue.get("created_at", "")
updated = parse_ts(updated_at)
created = parse_ts(created_at)
if updated and updated >= since:
touched.append(issue)
if issue.get("state") == "closed":
closed_at = issue.get("closed_at", "")
closed_dt = parse_ts(closed_at)
if closed_dt and closed_dt >= since:
closed.append(issue)
elif created and created >= since:
opened.append(issue)
return {
"issues": touched,
"closed": closed,
"opened": opened,
"touched_count": len(touched),
"closed_count": len(closed),
"opened_count": len(opened),
}
def collect_prs_data(client: GiteaClient, since: datetime) -> dict:
"""Collect PR activity from Gitea."""
if not client.is_available():
return {"error": "Gitea unavailable", "prs": [], "merged": [], "opened": []}
try:
prs = client.get_paginated("pulls", {"state": "all", "sort": "updated", "limit": 100})
except (HTTPError, URLError) as exc:
return {"error": str(exc), "prs": [], "merged": [], "opened": []}
touched = []
merged = []
opened = []
for pr in prs:
updated_at = pr.get("updated_at", "")
created_at = pr.get("created_at", "")
merged_at = pr.get("merged_at", "")
updated = parse_ts(updated_at)
created = parse_ts(created_at)
merged_dt = parse_ts(merged_at) if merged_at else None
if updated and updated >= since:
touched.append(pr)
if pr.get("merged") and merged_dt and merged_dt >= since:
merged.append(pr)
elif created and created >= since:
opened.append(pr)
return {
"prs": touched,
"merged": merged,
"opened": opened,
"touched_count": len(touched),
"merged_count": len(merged),
"opened_count": len(opened),
}
def collect_triage_data(since: datetime) -> dict:
"""Load triage and introspection data."""
triage_file = REPO_ROOT / ".loop" / "retro" / "triage.jsonl"
insights_file = REPO_ROOT / ".loop" / "retro" / "insights.json"
triage_entries = load_jsonl(triage_file)
recent_triage = [
e for e in triage_entries
if parse_ts(e.get("timestamp", "")) and parse_ts(e.get("timestamp", "")) >= since
]
insights = {}
if insights_file.exists():
try:
insights = json.loads(insights_file.read_text())
except (json.JSONDecodeError, OSError):
pass
return {
"triage_runs": len(recent_triage),
"triage_entries": recent_triage,
"latest_insights": insights,
}
def collect_token_data(since: datetime) -> dict:
"""Load token economy data from the lightning ledger."""
# The ledger is in-memory but we can look for any persisted data
# For now, return placeholder that will be filled by the ledger module
return {
"note": "Token economy data is ephemeral — check dashboard for live metrics",
"balance_sats": 0, # Placeholder
"transactions_week": 0,
}
# ── Analysis Functions ─────────────────────────────────────────────────────
def extract_themes(issues: list[dict]) -> list[dict]:
"""Extract themes from issue labels."""
label_counts = Counter()
layer_counts = Counter()
type_counts = Counter()
for issue in issues:
for label in issue.get("labels", []):
name = label.get("name", "")
label_counts[name] += 1
if name.startswith("layer:"):
layer_counts[name.replace("layer:", "")] += 1
if name in ("bug", "feature", "refactor", "docs", "test", "chore"):
type_counts[name] += 1
# Top themes (labels excluding layer prefixes)
themes = [
{"name": name, "count": count}
for name, count in label_counts.most_common(10)
if not name.startswith(("layer:", "size:"))
]
# Layers
layers = [
{"name": name, "count": count}
for name, count in layer_counts.most_common()
]
# Types
types = [
{"name": name, "count": count}
for name, count in type_counts.most_common()
]
return {
"top_labels": themes,
"layers": layers,
"types": types,
}
def extract_agent_contributions(issues: list[dict], prs: list[dict], cycles: list[dict]) -> dict:
"""Extract agent contribution patterns."""
# Count by assignee
assignee_counts = Counter()
for issue in issues:
assignee = issue.get("assignee")
if assignee and isinstance(assignee, dict):
assignee_counts[assignee.get("login", "unknown")] += 1
# Count PR authors
pr_authors = Counter()
for pr in prs:
user = pr.get("user")
if user and isinstance(user, dict):
pr_authors[user.get("login", "unknown")] += 1
# Check for Kimi mentions in cycle notes
kimi_mentions = sum(
1 for c in cycles
if "kimi" in c.get("notes", "").lower() or "kimi" in c.get("reason", "").lower()
)
return {
"active_assignees": [
{"login": login, "issues_count": count}
for login, count in assignee_counts.most_common()
],
"pr_authors": [
{"login": login, "prs_count": count}
for login, count in pr_authors.most_common()
],
"kimi_mentioned_cycles": kimi_mentions,
}
def analyze_test_shifts(cycles: list[dict]) -> dict:
"""Analyze shifts in test patterns."""
if not cycles:
return {"note": "No cycle data available"}
total_tests_passed = sum(c.get("tests_passed", 0) for c in cycles)
total_tests_added = sum(c.get("tests_added", 0) for c in cycles)
avg_tests_per_cycle = round(total_tests_passed / len(cycles), 1) if cycles else 0
# Look for test-related issues
test_focused = [
c for c in cycles
if c.get("type") == "test" or "test" in c.get("notes", "").lower()
]
return {
"total_tests_passed": total_tests_passed,
"total_tests_added": total_tests_added,
"avg_tests_per_cycle": avg_tests_per_cycle,
"test_focused_cycles": len(test_focused),
}
def analyze_triage_shifts(triage_data: dict) -> dict:
"""Analyze shifts in triage patterns."""
insights = triage_data.get("latest_insights", {})
recommendations = insights.get("recommendations", [])
high_priority_recs = [
r for r in recommendations
if r.get("severity") == "high"
]
return {
"triage_runs": triage_data.get("triage_runs", 0),
"insights_generated": insights.get("generated_at") is not None,
"high_priority_recommendations": len(high_priority_recs),
"recent_recommendations": recommendations[:3] if recommendations else [],
}
def generate_vibe_summary(
cycles_data: dict,
issues_data: dict,
prs_data: dict,
themes: dict,
agent_contrib: dict,
test_shifts: dict,
triage_shifts: dict,
) -> dict:
"""Generate the human-readable 'vibe' summary."""
# Determine overall vibe
success_rate = cycles_data.get("success_rate", 0)
failures = cycles_data.get("failures", 0)
closed_count = issues_data.get("closed_count", 0)
merged_count = prs_data.get("merged_count", 0)
if success_rate >= 0.9 and closed_count > 0:
vibe = "productive"
vibe_description = "A strong week with solid delivery and healthy success rates."
elif success_rate >= 0.7:
vibe = "steady"
vibe_description = "Steady progress with some bumps. Things are moving forward."
elif failures > cycles_data.get("successes", 0):
vibe = "struggling"
vibe_description = "A challenging week with more failures than successes. Time to regroup."
else:
vibe = "quiet"
vibe_description = "A lighter week with limited activity."
# Focus areas from themes
focus_areas = []
for layer in themes.get("layers", [])[:3]:
focus_areas.append(f"{layer['name']} ({layer['count']} items)")
# Agent activity summary
agent_summary = ""
active_assignees = agent_contrib.get("active_assignees", [])
if active_assignees:
top_agent = active_assignees[0]
agent_summary = f"{top_agent['login']} led with {top_agent['issues_count']} assigned issues."
# Notable events
notable = []
if merged_count > 5:
notable.append(f"{merged_count} PRs merged — high integration velocity")
if triage_shifts.get("high_priority_recommendations", 0) > 0:
notable.append("High-priority recommendations from loop introspection")
if test_shifts.get("test_focused_cycles", 0) > 3:
notable.append("Strong test coverage focus")
if not notable:
notable.append("Regular development flow")
return {
"overall": vibe,
"description": vibe_description,
"focus_areas": focus_areas,
"agent_summary": agent_summary,
"notable_events": notable,
}
# ── Narrative Generation ───────────────────────────────────────────────────
def generate_narrative(
cycles_data: dict,
issues_data: dict,
prs_data: dict,
triage_data: dict,
themes: dict,
agent_contrib: dict,
test_shifts: dict,
triage_shifts: dict,
token_data: dict,
since: datetime,
until: datetime,
) -> dict:
"""Generate the complete weekly narrative."""
vibe = generate_vibe_summary(
cycles_data, issues_data, prs_data, themes, agent_contrib, test_shifts, triage_shifts
)
return {
"generated_at": datetime.now(UTC).isoformat(),
"period": {
"start": since.isoformat(),
"end": until.isoformat(),
"days": 7,
},
"vibe": vibe,
"activity": {
"cycles": {
"total": cycles_data.get("total", 0),
"successes": cycles_data.get("successes", 0),
"failures": cycles_data.get("failures", 0),
"success_rate": cycles_data.get("success_rate", 0),
},
"issues": {
"touched": issues_data.get("touched_count", 0),
"closed": issues_data.get("closed_count", 0),
"opened": issues_data.get("opened_count", 0),
},
"pull_requests": {
"touched": prs_data.get("touched_count", 0),
"merged": prs_data.get("merged_count", 0),
"opened": prs_data.get("opened_count", 0),
},
},
"themes": themes,
"agents": agent_contrib,
"test_health": test_shifts,
"triage_health": triage_shifts,
"token_economy": token_data,
}
def generate_markdown_summary(narrative: dict) -> str:
"""Generate a human-readable markdown summary."""
vibe = narrative.get("vibe", {})
activity = narrative.get("activity", {})
cycles = activity.get("cycles", {})
issues = activity.get("issues", {})
prs = activity.get("pull_requests", {})
lines = [
"# Weekly Narrative Summary",
"",
f"**Period:** {narrative['period']['start'][:10]} to {narrative['period']['end'][:10]}",
f"**Vibe:** {vibe.get('overall', 'unknown').title()}",
"",
f"{vibe.get('description', '')}",
"",
"## Activity Highlights",
"",
f"- **Development Cycles:** {cycles.get('total', 0)} total ({cycles.get('successes', 0)} success, {cycles.get('failures', 0)} failure)",
f"- **Issues:** {issues.get('closed', 0)} closed, {issues.get('opened', 0)} opened",
f"- **Pull Requests:** {prs.get('merged', 0)} merged, {prs.get('opened', 0)} opened",
"",
]
# Focus areas
focus = vibe.get("focus_areas", [])
if focus:
lines.append("## Focus Areas")
lines.append("")
for area in focus:
lines.append(f"- {area}")
lines.append("")
# Agent contributions
agent_summary = vibe.get("agent_summary", "")
if agent_summary:
lines.append("## Agent Activity")
lines.append("")
lines.append(agent_summary)
lines.append("")
# Notable events
notable = vibe.get("notable_events", [])
if notable:
lines.append("## Notable Events")
lines.append("")
for event in notable:
lines.append(f"- {event}")
lines.append("")
# Triage health
triage = narrative.get("triage_health", {})
if triage.get("high_priority_recommendations", 0) > 0:
lines.append("## Triage Notes")
lines.append("")
lines.append(f"⚠️ {triage['high_priority_recommendations']} high-priority recommendation(s) from loop introspection.")
lines.append("")
for rec in triage.get("recent_recommendations", [])[:2]:
lines.append(f"- **{rec.get('category', 'general')}:** {rec.get('finding', '')}")
lines.append("")
return "\n".join(lines)
# ── Main ───────────────────────────────────────────────────────────────────
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Generate weekly narrative summary of work and vibes",
)
p.add_argument(
"--json", "-j",
action="store_true",
help="Output as JSON instead of markdown",
)
p.add_argument(
"--output", "-o",
type=str,
default=None,
help="Output file path (default from config)",
)
p.add_argument(
"--days",
type=int,
default=None,
help="Override lookback days (default 7)",
)
p.add_argument(
"--force",
action="store_true",
help="Run even if disabled in config",
)
return p.parse_args()
def main() -> int:
args = parse_args()
config = load_automation_config()
# Check if enabled
if not config.get("enabled", True) and not args.force:
print("[weekly_narrative] Skipped — weekly narrative is disabled in config")
print("[weekly_narrative] Use --force to run anyway")
return 0
# Determine lookback period
days = args.days if args.days is not None else config.get("lookback_days", 7)
until = datetime.now(UTC)
since = until - timedelta(days=days)
print(f"[weekly_narrative] Generating narrative for the past {days} days...")
# Setup Gitea client
token = get_token(config)
client = GiteaClient(config, token)
if not client.is_available():
print("[weekly_narrative] Warning: Gitea API unavailable — will use local data only")
# Collect data
cycles_data = collect_cycles_data(since)
issues_data = collect_issues_data(client, since)
prs_data = collect_prs_data(client, since)
triage_data = collect_triage_data(since)
token_data = collect_token_data(since)
# Analyze
themes = extract_themes(issues_data.get("issues", []))
agent_contrib = extract_agent_contributions(
issues_data.get("issues", []),
prs_data.get("prs", []),
cycles_data.get("cycles", []),
)
test_shifts = analyze_test_shifts(cycles_data.get("cycles", []))
triage_shifts = analyze_triage_shifts(triage_data)
# Generate narrative
narrative = generate_narrative(
cycles_data,
issues_data,
prs_data,
triage_data,
themes,
agent_contrib,
test_shifts,
triage_shifts,
token_data,
since,
until,
)
# Determine output path
output_path = args.output or config.get("output_file", ".loop/weekly_narrative.json")
output_file = REPO_ROOT / output_path
output_file.parent.mkdir(parents=True, exist_ok=True)
# Write JSON output
output_file.write_text(json.dumps(narrative, indent=2) + "\n")
# Write markdown summary alongside JSON
md_output_file = output_file.with_suffix(".md")
md_output_file.write_text(generate_markdown_summary(narrative))
# Print output
if args.json:
print(json.dumps(narrative, indent=2))
else:
print()
print(generate_markdown_summary(narrative))
print(f"\n[weekly_narrative] Written to: {output_file}")
print(f"[weekly_narrative] Markdown summary: {md_output_file}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,6 @@
"""Timmy Automations utilities.
Shared helper modules for automations.
"""
from __future__ import annotations

View File

@@ -0,0 +1,389 @@
"""Token rules helper — Compute token deltas for agent actions.
This module loads token economy configuration from YAML and provides
functions for automations to compute token rewards/penalties.
Usage:
from timmy_automations.utils.token_rules import TokenRules
rules = TokenRules()
delta = rules.get_delta("pr_merged")
print(f"PR merge reward: {delta}") # 10
# Check if agent can perform sensitive operation
can_merge = rules.check_gate("pr_merge", current_tokens=25)
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@dataclass
class TokenEvent:
"""Represents a single token event configuration."""
name: str
description: str
reward: int
penalty: int
category: str
gate_threshold: int | None = None
@property
def delta(self) -> int:
"""Net token delta (reward + penalty)."""
return self.reward + self.penalty
@dataclass
class TokenCategoryLimits:
"""Daily limits for a token category."""
max_earn: int
max_spend: int
class TokenRules:
"""Token economy rules loader and calculator.
Loads configuration from timmy_automations/config/token_rules.yaml
and provides methods to compute token deltas and check gating.
"""
CONFIG_PATH = Path(__file__).parent.parent / "config" / "token_rules.yaml"
def __init__(self, config_path: Path | None = None) -> None:
"""Initialize token rules from configuration file.
Args:
config_path: Optional override for config file location.
"""
self._config_path = config_path or self.CONFIG_PATH
self._events: dict[str, TokenEvent] = {}
self._gating: dict[str, int] = {}
self._daily_limits: dict[str, TokenCategoryLimits] = {}
self._audit: dict[str, Any] = {}
self._version: str = "unknown"
self._load_config()
def _load_config(self) -> None:
"""Load configuration from YAML file."""
# Graceful degradation if yaml not available or file missing
try:
import yaml
except ImportError:
# YAML not installed, use fallback defaults
self._load_fallback_defaults()
return
if not self._config_path.exists():
self._load_fallback_defaults()
return
try:
config = yaml.safe_load(self._config_path.read_text())
if not config:
self._load_fallback_defaults()
return
self._version = config.get("version", "unknown")
self._parse_events(config.get("events", {}))
self._parse_gating(config.get("gating_thresholds", {}))
self._parse_daily_limits(config.get("daily_limits", {}))
self._audit = config.get("audit", {})
except Exception:
# Any error loading config, use fallbacks
self._load_fallback_defaults()
def _load_fallback_defaults(self) -> None:
"""Load minimal fallback defaults if config unavailable."""
self._version = "fallback"
self._events = {
"pr_merged": TokenEvent(
name="pr_merged",
description="Successfully merged a pull request",
reward=10,
penalty=0,
category="merge",
gate_threshold=0,
),
"test_fixed": TokenEvent(
name="test_fixed",
description="Fixed a failing test",
reward=8,
penalty=0,
category="test",
),
"automation_failure": TokenEvent(
name="automation_failure",
description="Automation failed",
reward=0,
penalty=-2,
category="operation",
),
}
self._gating = {"pr_merge": 0}
self._daily_limits = {}
self._audit = {"log_all_transactions": True}
def _parse_events(self, events_config: dict) -> None:
"""Parse event configurations from YAML."""
for name, config in events_config.items():
if not isinstance(config, dict):
continue
self._events[name] = TokenEvent(
name=name,
description=config.get("description", ""),
reward=config.get("reward", 0),
penalty=config.get("penalty", 0),
category=config.get("category", "unknown"),
gate_threshold=config.get("gate_threshold"),
)
def _parse_gating(self, gating_config: dict) -> None:
"""Parse gating thresholds from YAML."""
for name, threshold in gating_config.items():
if isinstance(threshold, int):
self._gating[name] = threshold
def _parse_daily_limits(self, limits_config: dict) -> None:
"""Parse daily limits from YAML."""
for category, limits in limits_config.items():
if isinstance(limits, dict):
self._daily_limits[category] = TokenCategoryLimits(
max_earn=limits.get("max_earn", 0),
max_spend=limits.get("max_spend", 0),
)
def get_delta(self, event_name: str) -> int:
"""Get token delta for an event.
Args:
event_name: Name of the event (e.g., "pr_merged", "test_fixed")
Returns:
Net token delta (positive for reward, negative for penalty)
"""
event = self._events.get(event_name)
if event:
return event.delta
return 0
def get_event(self, event_name: str) -> TokenEvent | None:
"""Get full event configuration.
Args:
event_name: Name of the event
Returns:
TokenEvent object or None if not found
"""
return self._events.get(event_name)
def list_events(self, category: str | None = None) -> list[TokenEvent]:
"""List all configured events.
Args:
category: Optional category filter
Returns:
List of TokenEvent objects
"""
events = list(self._events.values())
if category:
events = [e for e in events if e.category == category]
return events
def check_gate(self, operation: str, current_tokens: int) -> bool:
"""Check if agent meets token threshold for an operation.
Args:
operation: Operation name (e.g., "pr_merge")
current_tokens: Agent's current token balance
Returns:
True if agent can perform the operation
"""
threshold = self._gating.get(operation)
if threshold is None:
return True # No gate defined, allow
return current_tokens >= threshold
def get_gate_threshold(self, operation: str) -> int | None:
"""Get the gating threshold for an operation.
Args:
operation: Operation name
Returns:
Threshold value or None if no gate defined
"""
return self._gating.get(operation)
def get_daily_limits(self, category: str) -> TokenCategoryLimits | None:
"""Get daily limits for a category.
Args:
category: Category name
Returns:
TokenCategoryLimits or None if not defined
"""
return self._daily_limits.get(category)
def compute_transaction(
self,
event_name: str,
current_tokens: int = 0,
current_daily_earned: dict[str, int] | None = None,
) -> dict[str, Any]:
"""Compute a complete token transaction.
This is the main entry point for agents to use. It returns
a complete transaction record with delta, gating check, and limits.
Args:
event_name: Name of the event
current_tokens: Agent's current token balance
current_daily_earned: Dict of category -> tokens earned today
Returns:
Transaction dict with:
- event: Event name
- delta: Token delta
- allowed: Whether operation is allowed (gating)
- new_balance: Projected new balance
- limit_reached: Whether daily limit would be exceeded
"""
event = self._events.get(event_name)
if not event:
return {
"event": event_name,
"delta": 0,
"allowed": False,
"reason": "unknown_event",
"new_balance": current_tokens,
"limit_reached": False,
}
delta = event.delta
new_balance = current_tokens + delta
# Check gating (for penalties, we don't check gates)
allowed = True
gate_reason = None
if delta > 0 and event.gate_threshold is not None: # Only check gates for positive operations with thresholds
allowed = current_tokens >= event.gate_threshold
if not allowed:
gate_reason = f"requires {event.gate_threshold} tokens"
# Check daily limits
limit_reached = False
limit_reason = None
if current_daily_earned and event.category in current_daily_earned:
limits = self._daily_limits.get(event.category)
if limits:
current_earned = current_daily_earned.get(event.category, 0)
if delta > 0 and current_earned + delta > limits.max_earn:
limit_reached = True
limit_reason = f"daily earn limit ({limits.max_earn}) reached"
result = {
"event": event_name,
"delta": delta,
"category": event.category,
"allowed": allowed and not limit_reached,
"new_balance": new_balance,
"limit_reached": limit_reached,
}
if gate_reason:
result["gate_reason"] = gate_reason
if limit_reason:
result["limit_reason"] = limit_reason
return result
def get_config_version(self) -> str:
"""Get the loaded configuration version."""
return self._version
def get_categories(self) -> list[str]:
"""Get list of all configured categories."""
categories = {e.category for e in self._events.values()}
return sorted(categories)
def is_auditable(self) -> bool:
"""Check if transactions should be logged for audit."""
return self._audit.get("log_all_transactions", True)
# Convenience functions for simple use cases
def get_token_delta(event_name: str) -> int:
"""Get token delta for an event (convenience function).
Args:
event_name: Name of the event
Returns:
Token delta (positive for reward, negative for penalty)
"""
return TokenRules().get_delta(event_name)
def check_operation_gate(operation: str, current_tokens: int) -> bool:
"""Check if agent can perform operation (convenience function).
Args:
operation: Operation name
current_tokens: Agent's current token balance
Returns:
True if operation is allowed
"""
return TokenRules().check_gate(operation, current_tokens)
def compute_token_reward(
event_name: str,
current_tokens: int = 0,
) -> dict[str, Any]:
"""Compute token reward for an event (convenience function).
Args:
event_name: Name of the event
current_tokens: Agent's current token balance
Returns:
Transaction dict with delta, allowed status, new balance
"""
return TokenRules().compute_transaction(event_name, current_tokens)
def list_token_events(category: str | None = None) -> list[dict[str, Any]]:
"""List all token events (convenience function).
Args:
category: Optional category filter
Returns:
List of event dicts with name, description, delta, category
"""
rules = TokenRules()
events = rules.list_events(category)
return [
{
"name": e.name,
"description": e.description,
"delta": e.delta,
"category": e.category,
"gate_threshold": e.gate_threshold,
}
for e in events
]