diff --git a/tests/timmy/test_golden_path.py b/tests/timmy/test_golden_path.py new file mode 100644 index 00000000..aa61462d --- /dev/null +++ b/tests/timmy/test_golden_path.py @@ -0,0 +1,536 @@ +"""Tests for the Golden Path generator.""" + +import json +from datetime import UTC, datetime +from unittest.mock import MagicMock, patch + +from timmy_automations.daily_run.golden_path import ( + TIME_ESTIMATES, + TYPE_PATTERNS, + GiteaClient, + GoldenPath, + PathItem, + build_golden_path, + classify_issue_type, + estimate_time, + extract_size, + generate_golden_path, + get_token, + group_issues_by_type, + load_config, + score_issue_for_path, +) + + +class TestLoadConfig: + """Tests for configuration loading.""" + + def test_load_config_defaults(self): + """Config should have sensible defaults.""" + config = load_config() + assert "gitea_api" in config + assert "repo_slug" in config + assert "size_labels" in config + + def test_load_config_env_override(self, monkeypatch): + """Environment variables should override defaults.""" + monkeypatch.setenv("TIMMY_GITEA_API", "http://custom:3000/api/v1") + monkeypatch.setenv("TIMMY_REPO_SLUG", "custom/repo") + monkeypatch.setenv("TIMMY_GITEA_TOKEN", "test-token") + + config = load_config() + assert config["gitea_api"] == "http://custom:3000/api/v1" + assert config["repo_slug"] == "custom/repo" + assert config["token"] == "test-token" + + +class TestGetToken: + """Tests for token retrieval.""" + + def test_get_token_from_config(self): + """Token from config takes precedence.""" + config = {"token": "config-token", "token_file": "~/.test"} + assert get_token(config) == "config-token" + + @patch("pathlib.Path.exists") + @patch("pathlib.Path.read_text") + def test_get_token_from_file(self, mock_read, mock_exists): + """Token can be read from file.""" + mock_exists.return_value = True + mock_read.return_value = "file-token\n" + + config = {"token_file": "~/.hermes/test_token"} + assert get_token(config) == "file-token" + + def test_get_token_none(self): + """Returns None if no token available.""" + config = {"token_file": "/nonexistent/path"} + assert get_token(config) is None + + +class TestExtractSize: + """Tests for size label extraction.""" + + def test_extract_size_xs(self): + """Should extract XS size.""" + labels = [{"name": "size:XS"}, {"name": "bug"}] + assert extract_size(labels) == "XS" + + def test_extract_size_s(self): + """Should extract S size.""" + labels = [{"name": "bug"}, {"name": "size:S"}] + assert extract_size(labels) == "S" + + def test_extract_size_m(self): + """Should extract M size.""" + labels = [{"name": "size:M"}] + assert extract_size(labels) == "M" + + def test_extract_size_unknown(self): + """Should return ? for unknown size.""" + labels = [{"name": "bug"}, {"name": "feature"}] + assert extract_size(labels) == "?" + + def test_extract_size_empty(self): + """Should return ? for empty labels.""" + assert extract_size([]) == "?" + + +class TestClassifyIssueType: + """Tests for issue type classification.""" + + def test_classify_triage(self): + """Should classify triage issues.""" + issue = { + "title": "Triage new issues", + "labels": [{"name": "triage"}], + } + assert classify_issue_type(issue) == "triage" + + def test_classify_test(self): + """Should classify test issues.""" + issue = { + "title": "Add unit tests for parser", + "labels": [{"name": "test"}], + } + assert classify_issue_type(issue) == "test" + + def test_classify_fix(self): + """Should classify fix issues.""" + issue = { + "title": "Fix login bug", + "labels": [{"name": "bug"}], + } + assert classify_issue_type(issue) == "fix" + + def test_classify_docs(self): + """Should classify docs issues.""" + issue = { + "title": "Update README", + "labels": [{"name": "docs"}], + } + assert classify_issue_type(issue) == "docs" + + def test_classify_refactor(self): + """Should classify refactor issues.""" + issue = { + "title": "Refactor validation logic", + "labels": [{"name": "refactor"}], + } + assert classify_issue_type(issue) == "refactor" + + def test_classify_default_to_fix(self): + """Should default to fix for uncategorized.""" + issue = { + "title": "Something vague", + "labels": [{"name": "question"}], + } + assert classify_issue_type(issue) == "fix" + + def test_classify_title_priority(self): + """Title patterns should contribute to classification.""" + issue = { + "title": "Fix the broken parser", + "labels": [], + } + assert classify_issue_type(issue) == "fix" + + +class TestEstimateTime: + """Tests for time estimation.""" + + def test_estimate_xs_fix(self): + """XS fix should be 10 minutes.""" + issue = { + "title": "Fix typo", + "labels": [{"name": "size:XS"}, {"name": "bug"}], + } + assert estimate_time(issue) == 10 + + def test_estimate_s_test(self): + """S test should be 15 minutes.""" + issue = { + "title": "Add test coverage", + "labels": [{"name": "size:S"}, {"name": "test"}], + } + assert estimate_time(issue) == 15 + + def test_estimate_m_fix(self): + """M fix should be 25 minutes.""" + issue = { + "title": "Fix complex bug", + "labels": [{"name": "size:M"}, {"name": "bug"}], + } + assert estimate_time(issue) == 25 + + def test_estimate_unknown_size(self): + """Unknown size should fallback to S.""" + issue = { + "title": "Some fix", + "labels": [{"name": "bug"}], + } + # Falls back to S/fix = 15 + assert estimate_time(issue) == 15 + + +class TestScoreIssueForPath: + """Tests for issue scoring.""" + + def test_score_prefers_xs(self): + """XS issues should score higher.""" + xs = {"title": "Fix", "labels": [{"name": "size:XS"}]} + s = {"title": "Fix", "labels": [{"name": "size:S"}]} + m = {"title": "Fix", "labels": [{"name": "size:M"}]} + + assert score_issue_for_path(xs) > score_issue_for_path(s) + assert score_issue_for_path(s) > score_issue_for_path(m) + + def test_score_prefers_clear_types(self): + """Issues with clear type labels score higher.""" + # Bug label adds score, so with bug should be >= without bug + with_type = { + "title": "Fix bug", + "labels": [{"name": "size:S"}, {"name": "bug"}], + } + without_type = { + "title": "Something", + "labels": [{"name": "size:S"}], + } + + assert score_issue_for_path(with_type) >= score_issue_for_path(without_type) + + def test_score_accepts_criteria(self): + """Issues with acceptance criteria score higher.""" + with_criteria = { + "title": "Fix", + "labels": [{"name": "size:S"}], + "body": "## Acceptance Criteria\n- [ ] Fix it", + } + without_criteria = { + "title": "Fix", + "labels": [{"name": "size:S"}], + "body": "Just fix it", + } + + assert score_issue_for_path(with_criteria) > score_issue_for_path(without_criteria) + + +class TestGroupIssuesByType: + """Tests for issue grouping.""" + + def test_groups_by_type(self): + """Issues should be grouped by their type.""" + issues = [ + {"title": "Fix bug", "labels": [{"name": "bug"}], "number": 1}, + {"title": "Add test", "labels": [{"name": "test"}], "number": 2}, + {"title": "Another fix", "labels": [{"name": "bug"}], "number": 3}, + ] + + grouped = group_issues_by_type(issues) + + assert len(grouped["fix"]) == 2 + assert len(grouped["test"]) == 1 + assert len(grouped["triage"]) == 0 + + def test_sorts_by_score(self): + """Issues within groups should be sorted by score.""" + issues = [ + {"title": "Fix", "labels": [{"name": "size:M"}], "number": 1}, + {"title": "Fix", "labels": [{"name": "size:XS"}], "number": 2}, + {"title": "Fix", "labels": [{"name": "size:S"}], "number": 3}, + ] + + grouped = group_issues_by_type(issues) + + # XS should be first (highest score) + assert grouped["fix"][0]["number"] == 2 + # M should be last (lowest score) + assert grouped["fix"][2]["number"] == 1 + + +class TestBuildGoldenPath: + """Tests for Golden Path building.""" + + def test_builds_path_with_all_types(self): + """Path should include items from different types.""" + grouped = { + "triage": [ + {"title": "Triage", "labels": [{"name": "size:XS"}], "number": 1, "html_url": ""}, + ], + "fix": [ + {"title": "Fix 1", "labels": [{"name": "size:S"}], "number": 2, "html_url": ""}, + {"title": "Fix 2", "labels": [{"name": "size:XS"}], "number": 3, "html_url": ""}, + ], + "test": [ + {"title": "Test", "labels": [{"name": "size:S"}], "number": 4, "html_url": ""}, + ], + "docs": [], + "refactor": [], + } + + path = build_golden_path(grouped, target_minutes=45) + + assert path.item_count >= 3 + assert path.items[0].issue_type == "triage" # Warm-up + assert any(item.issue_type == "test" for item in path.items) + + def test_respects_time_budget(self): + """Path should stay within reasonable time budget.""" + grouped = { + "triage": [ + {"title": "Triage", "labels": [{"name": "size:S"}], "number": 1, "html_url": ""}, + ], + "fix": [ + {"title": "Fix 1", "labels": [{"name": "size:S"}], "number": 2, "html_url": ""}, + {"title": "Fix 2", "labels": [{"name": "size:S"}], "number": 3, "html_url": ""}, + ], + "test": [ + {"title": "Test", "labels": [{"name": "size:S"}], "number": 4, "html_url": ""}, + ], + "docs": [], + "refactor": [], + } + + path = build_golden_path(grouped, target_minutes=45) + + # Should be in 30-60 minute range + assert 20 <= path.total_estimated_minutes <= 70 + + def test_no_duplicate_issues(self): + """Path should not include the same issue twice.""" + grouped = { + "triage": [], + "fix": [ + {"title": "Fix", "labels": [{"name": "size:S"}], "number": 1, "html_url": ""}, + ], + "test": [], + "docs": [], + "refactor": [], + } + + path = build_golden_path(grouped, target_minutes=45) + + numbers = [item.number for item in path.items] + assert len(numbers) == len(set(numbers)) # No duplicates + + def test_fallback_when_triage_missing(self): + """Should use fallback when no triage issues available.""" + grouped = { + "triage": [], + "fix": [ + {"title": "Fix", "labels": [{"name": "size:XS"}], "number": 1, "html_url": ""}, + ], + "test": [ + {"title": "Test", "labels": [{"name": "size:XS"}], "number": 2, "html_url": ""}, + ], + "docs": [], + "refactor": [], + } + + path = build_golden_path(grouped, target_minutes=45) + + assert path.item_count > 0 + + +class TestGoldenPathDataclass: + """Tests for the GoldenPath dataclass.""" + + def test_total_time_calculation(self): + """Should sum item times correctly.""" + path = GoldenPath( + generated_at=datetime.now(UTC).isoformat(), + target_minutes=45, + items=[ + PathItem(1, "Test 1", "XS", "fix", 10, ""), + PathItem(2, "Test 2", "S", "test", 15, ""), + ], + ) + + assert path.total_estimated_minutes == 25 + + def test_to_dict(self): + """Should convert to dict correctly.""" + path = GoldenPath( + generated_at="2024-01-01T00:00:00+00:00", + target_minutes=45, + items=[PathItem(1, "Test", "XS", "fix", 10, "http://test")], + ) + + data = path.to_dict() + + assert data["target_minutes"] == 45 + assert data["total_estimated_minutes"] == 10 + assert data["item_count"] == 1 + assert len(data["items"]) == 1 + + def test_to_json(self): + """Should convert to JSON correctly.""" + path = GoldenPath( + generated_at="2024-01-01T00:00:00+00:00", + target_minutes=45, + items=[], + ) + + json_str = path.to_json() + data = json.loads(json_str) + + assert data["target_minutes"] == 45 + + +class TestGiteaClient: + """Tests for the GiteaClient.""" + + def test_client_initialization(self): + """Client should initialize with config.""" + config = { + "gitea_api": "http://test:3000/api/v1", + "repo_slug": "test/repo", + } + client = GiteaClient(config, "token123") + + assert client.api_base == "http://test:3000/api/v1" + assert client.repo_slug == "test/repo" + assert client.token == "token123" + + def test_headers_with_token(self): + """Headers should include auth token.""" + config = {"gitea_api": "http://test", "repo_slug": "test/repo"} + client = GiteaClient(config, "mytoken") + + headers = client._headers() + + assert headers["Authorization"] == "token mytoken" + assert headers["Accept"] == "application/json" + + def test_headers_without_token(self): + """Headers should work without token.""" + config = {"gitea_api": "http://test", "repo_slug": "test/repo"} + client = GiteaClient(config, None) + + headers = client._headers() + + assert "Authorization" not in headers + assert headers["Accept"] == "application/json" + + @patch("timmy_automations.daily_run.golden_path.urlopen") + def test_is_available_success(self, mock_urlopen): + """Should detect API availability.""" + mock_response = MagicMock() + mock_response.status = 200 + mock_context = MagicMock() + mock_context.__enter__ = MagicMock(return_value=mock_response) + mock_context.__exit__ = MagicMock(return_value=False) + mock_urlopen.return_value = mock_context + + config = {"gitea_api": "http://test", "repo_slug": "test/repo"} + client = GiteaClient(config, None) + + assert client.is_available() is True + + @patch("urllib.request.urlopen") + def test_is_available_failure(self, mock_urlopen): + """Should handle API unavailability.""" + from urllib.error import URLError + + mock_urlopen.side_effect = URLError("Connection refused") + + config = {"gitea_api": "http://test", "repo_slug": "test/repo"} + client = GiteaClient(config, None) + + assert client.is_available() is False + + +class TestIntegration: + """Integration-style tests.""" + + @patch("timmy_automations.daily_run.golden_path.GiteaClient") + def test_generate_golden_path_integration(self, mock_client_class): + """End-to-end test with mocked Gitea.""" + # Setup mock + mock_client = MagicMock() + mock_client.is_available.return_value = True + mock_client.get_paginated.return_value = [ + { + "number": 1, + "title": "Triage issues", + "labels": [{"name": "size:XS"}, {"name": "triage"}], + "html_url": "http://test/1", + }, + { + "number": 2, + "title": "Fix bug", + "labels": [{"name": "size:S"}, {"name": "bug"}], + "html_url": "http://test/2", + }, + { + "number": 3, + "title": "Add tests", + "labels": [{"name": "size:S"}, {"name": "test"}], + "html_url": "http://test/3", + }, + { + "number": 4, + "title": "Another fix", + "labels": [{"name": "size:XS"}, {"name": "bug"}], + "html_url": "http://test/4", + }, + ] + mock_client_class.return_value = mock_client + + path = generate_golden_path(target_minutes=45) + + assert path.item_count >= 3 + assert all(item.url.startswith("http://test/") for item in path.items) + + @patch("timmy_automations.daily_run.golden_path.GiteaClient") + def test_generate_when_unavailable(self, mock_client_class): + """Should return empty path when Gitea unavailable.""" + mock_client = MagicMock() + mock_client.is_available.return_value = False + mock_client_class.return_value = mock_client + + path = generate_golden_path(target_minutes=45) + + assert path.item_count == 0 + assert path.items == [] + + +class TestTypePatterns: + """Tests for type pattern definitions.""" + + def test_type_patterns_structure(self): + """Type patterns should have required keys.""" + for _issue_type, patterns in TYPE_PATTERNS.items(): + assert "labels" in patterns + assert "title" in patterns + assert isinstance(patterns["labels"], list) + assert isinstance(patterns["title"], list) + + def test_time_estimates_structure(self): + """Time estimates should have all sizes.""" + for size in ["XS", "S", "M"]: + assert size in TIME_ESTIMATES + for issue_type in ["triage", "fix", "test", "docs", "refactor"]: + assert issue_type in TIME_ESTIMATES[size] + assert isinstance(TIME_ESTIMATES[size][issue_type], int) + assert TIME_ESTIMATES[size][issue_type] > 0 diff --git a/timmy_automations/config/automations.json b/timmy_automations/config/automations.json index 48dc7af2..9bb93e8a 100644 --- a/timmy_automations/config/automations.json +++ b/timmy_automations/config/automations.json @@ -211,6 +211,23 @@ "agenda_time_minutes": 10 }, "outputs": [] + }, + { + "id": "golden_path", + "name": "Golden Path Generator", + "description": "Generates coherent 30-60 minute mini-sessions from real Gitea issues — triage, fixes, and tests", + "script": "timmy_automations/daily_run/golden_path.py", + "category": "daily_run", + "enabled": true, + "trigger": "manual", + "executable": "python3", + "config": { + "target_minutes": 45, + "size_labels": ["size:XS", "size:S", "size:M"], + "min_items": 3, + "max_items": 5 + }, + "outputs": [] } ] } diff --git a/timmy_automations/daily_run/golden_path.py b/timmy_automations/daily_run/golden_path.py new file mode 100644 index 00000000..183e45f9 --- /dev/null +++ b/timmy_automations/daily_run/golden_path.py @@ -0,0 +1,583 @@ +"""Golden Path generator — coherent 30-60 minute mini-sessions from real issues. + +Fetches issues from Gitea and assembles them into ordered sequences forming +a coherent mini-session. Each Golden Path includes: +- One small triage cleanup +- Two micro-fixes (XS/S sized) +- One test-improvement task + +All tasks are real issues from the Gitea repository, never synthetic. + +Usage: + from timmy_automations.daily_run.golden_path import generate_golden_path + path = generate_golden_path(minutes=45) + print(path.to_json()) +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any +from urllib.request import Request, urlopen +from urllib.error import HTTPError, URLError + +# ── Configuration ───────────────────────────────────────────────────────── + +REPO_ROOT = Path(__file__).resolve().parent.parent.parent +CONFIG_PATH = Path(__file__).parent.parent / "config" / "daily_run.json" + +DEFAULT_CONFIG = { + "gitea_api": "http://localhost:3000/api/v1", + "repo_slug": "rockachopa/Timmy-time-dashboard", + "token_file": "~/.hermes/gitea_token", + "size_labels": ["size:XS", "size:S", "size:M"], +} + +# Time estimates (in minutes) by size and type +TIME_ESTIMATES: dict[str, dict[str, int]] = { + "XS": {"triage": 5, "fix": 10, "test": 10, "docs": 8, "refactor": 8}, + "S": {"triage": 10, "fix": 15, "test": 15, "docs": 12, "refactor": 12}, + "M": {"triage": 15, "fix": 25, "test": 25, "docs": 20, "refactor": 20}, +} + +# Issue type detection patterns +TYPE_PATTERNS: dict[str, dict[str, list[str]]] = { + "triage": { + "labels": ["triage", "cleanup", "organize", "sort", "categorize"], + "title": ["triage", "cleanup", "organize", "sort", "categorize", "clean up"], + }, + "fix": { + "labels": ["bug", "fix", "error", "broken"], + "title": ["fix", "bug", "error", "broken", "repair", "correct"], + }, + "test": { + "labels": ["test", "testing", "coverage", "pytest"], + "title": ["test", "coverage", "pytest", "unit test", "integration test"], + }, + "docs": { + "labels": ["docs", "documentation", "readme", "docstring"], + "title": ["doc", "readme", "comment", "guide", "tutorial"], + }, + "refactor": { + "labels": ["refactor", "cleanup", "debt", "maintainability"], + "title": ["refactor", "cleanup", "simplify", "extract", "reorganize"], + }, +} + + +def load_config() -> dict: + """Load configuration from config file with fallback to defaults.""" + config = DEFAULT_CONFIG.copy() + if CONFIG_PATH.exists(): + try: + file_config = json.loads(CONFIG_PATH.read_text()) + if "orchestrator" in file_config: + config.update(file_config["orchestrator"]) + except (json.JSONDecodeError, OSError) as exc: + print(f"[golden_path] Warning: Could not load config: {exc}", file=sys.stderr) + + # Environment variable overrides + if os.environ.get("TIMMY_GITEA_API"): + config["gitea_api"] = os.environ.get("TIMMY_GITEA_API") + if os.environ.get("TIMMY_REPO_SLUG"): + config["repo_slug"] = os.environ.get("TIMMY_REPO_SLUG") + if os.environ.get("TIMMY_GITEA_TOKEN"): + config["token"] = os.environ.get("TIMMY_GITEA_TOKEN") + + return config + + +def get_token(config: dict) -> str | None: + """Get Gitea token from environment or file.""" + if "token" in config: + return config["token"] + + token_file = Path(config["token_file"]).expanduser() + if token_file.exists(): + return token_file.read_text().strip() + + return None + + +# ── Gitea API Client ────────────────────────────────────────────────────── + + +class GiteaClient: + """Simple Gitea API client with graceful degradation.""" + + def __init__(self, config: dict, token: str | None): + self.api_base = config["gitea_api"].rstrip("/") + self.repo_slug = config["repo_slug"] + self.token = token + self._available: bool | None = None + + def _headers(self) -> dict: + headers = {"Accept": "application/json"} + if self.token: + headers["Authorization"] = f"token {self.token}" + return headers + + def _api_url(self, path: str) -> str: + return f"{self.api_base}/repos/{self.repo_slug}/{path}" + + def is_available(self) -> bool: + """Check if Gitea API is reachable.""" + if self._available is not None: + return self._available + + try: + req = Request( + f"{self.api_base}/version", + headers=self._headers(), + method="GET", + ) + with urlopen(req, timeout=5) as resp: + self._available = resp.status == 200 + return self._available + except (HTTPError, URLError, TimeoutError): + self._available = False + return False + + def get(self, path: str, params: dict | None = None) -> list | dict: + """Make a GET request to the Gitea API.""" + url = self._api_url(path) + if params: + query = "&".join(f"{k}={v}" for k, v in params.items()) + url = f"{url}?{query}" + + req = Request(url, headers=self._headers(), method="GET") + with urlopen(req, timeout=15) as resp: + return json.loads(resp.read()) + + def get_paginated(self, path: str, params: dict | None = None) -> list: + """Fetch all pages of a paginated endpoint.""" + all_items = [] + page = 1 + limit = 50 + + while True: + page_params = {"limit": limit, "page": page} + if params: + page_params.update(params) + + batch = self.get(path, page_params) + if not batch: + break + + all_items.extend(batch) + if len(batch) < limit: + break + page += 1 + + return all_items + + +# ── Issue Classification ────────────────────────────────────────────────── + + +def extract_size(labels: list[dict]) -> str: + """Extract size label from issue labels.""" + for label in labels: + name = label.get("name", "") + if name.startswith("size:"): + return name.replace("size:", "").upper() + return "?" + + +def classify_issue_type(issue: dict) -> str: + """Classify an issue into a type based on labels and title.""" + labels = [l.get("name", "").lower() for l in issue.get("labels", [])] + title = issue.get("title", "").lower() + + scores: dict[str, int] = {} + + for issue_type, patterns in TYPE_PATTERNS.items(): + score = 0 + # Check labels + for pattern in patterns["labels"]: + if any(pattern in label for label in labels): + score += 2 + # Check title + for pattern in patterns["title"]: + if pattern in title: + score += 1 + scores[issue_type] = score + + # Return the type with highest score, or "fix" as default + if scores: + best_type = max(scores, key=lambda k: scores[k]) + if scores[best_type] > 0: + return best_type + + return "fix" # Default to fix for uncategorized issues + + +def estimate_time(issue: dict) -> int: + """Estimate time in minutes for an issue based on size and type.""" + size = extract_size(issue.get("labels", [])) + issue_type = classify_issue_type(issue) + + # Default to fix time estimates if type not found + type_map = issue_type if issue_type in TIME_ESTIMATES.get(size, {}) else "fix" + + return TIME_ESTIMATES.get(size, TIME_ESTIMATES["S"]).get(type_map, 15) + + +def score_issue_for_path(issue: dict) -> int: + """Score an issue for Golden Path suitability (higher = better fit).""" + score = 0 + labels = [l.get("name", "").lower() for l in issue.get("labels", [])] + issue_type = classify_issue_type(issue) + + # Prefer smaller sizes for predictability + if "size:xs" in labels: + score += 10 + elif "size:s" in labels: + score += 7 + elif "size:m" in labels: + score += 3 + + # Prefer issues with clear type labels + if issue_type in ["triage", "test", "fix"]: + score += 3 + + # Prefer issues with acceptance criteria or good description + body = issue.get("body", "") + if body: + if "## acceptance criteria" in body.lower() or "acceptance criteria" in body.lower(): + score += 3 + if len(body) > 200: + score += 1 + + # Prefer issues with recent activity + updated_at = issue.get("updated_at", "") + if updated_at: + try: + updated = datetime.fromisoformat(updated_at.replace("Z", "+00:00")) + days_old = (datetime.now(timezone.utc) - updated).days + if days_old < 7: + score += 2 + elif days_old < 30: + score += 1 + except (ValueError, TypeError): + pass + + return score + + +# ── Golden Path Generation ──────────────────────────────────────────────── + + +@dataclass +class PathItem: + """A single item in a Golden Path.""" + + number: int + title: str + size: str + issue_type: str + estimated_minutes: int + url: str + + def to_dict(self) -> dict: + return { + "number": self.number, + "title": self.title, + "size": self.size, + "type": self.issue_type, + "estimated_minutes": self.estimated_minutes, + "url": self.url, + } + + +@dataclass +class GoldenPath: + """A complete Golden Path sequence.""" + + generated_at: str + target_minutes: int + items: list[PathItem] = field(default_factory=list) + + @property + def total_estimated_minutes(self) -> int: + return sum(item.estimated_minutes for item in self.items) + + @property + def item_count(self) -> int: + return len(self.items) + + def to_dict(self) -> dict: + return { + "generated_at": self.generated_at, + "target_minutes": self.target_minutes, + "total_estimated_minutes": self.total_estimated_minutes, + "item_count": self.item_count, + "items": [item.to_dict() for item in self.items], + } + + def to_json(self, indent: int = 2) -> str: + return json.dumps(self.to_dict(), indent=indent) + + +def fetch_eligible_issues(client: GiteaClient, config: dict) -> list[dict]: + """Fetch open issues eligible for Golden Paths.""" + size_labels = config.get("size_labels", ["size:XS", "size:S", "size:M"]) + + try: + # Fetch all open issues + issues = client.get_paginated("issues", {"state": "open", "sort": "updated"}) + except (HTTPError, URLError) as exc: + print(f"[golden_path] Warning: Failed to fetch issues: {exc}", file=sys.stderr) + return [] + + # Filter by size labels if specified + if size_labels: + filtered = [] + size_names = {s.lower() for s in size_labels} + for issue in issues: + issue_labels = {l.get("name", "").lower() for l in issue.get("labels", [])} + if issue_labels & size_names: + filtered.append(issue) + issues = filtered + + return issues + + +def group_issues_by_type(issues: list[dict]) -> dict[str, list[dict]]: + """Group issues by their classified type, sorted by score.""" + groups: dict[str, list[dict]] = { + "triage": [], + "fix": [], + "test": [], + "docs": [], + "refactor": [], + } + + for issue in issues: + issue_type = classify_issue_type(issue) + if issue_type in groups: + groups[issue_type].append(issue) + + # Sort each group by score (highest first) + for issue_type in groups: + groups[issue_type] = sorted( + groups[issue_type], + key=lambda i: score_issue_for_path(i), + reverse=True, + ) + + return groups + + +def build_golden_path( + grouped_issues: dict[str, list[dict]], + target_minutes: int = 45, +) -> GoldenPath: + """Build a Golden Path from grouped issues. + + The path follows a coherent sequence: + 1. One small triage cleanup (warm-up) + 2. One micro-fix (momentum building) + 3. One test-improvement (quality focus) + 4. One more micro-fix or docs (closure) + """ + path = GoldenPath( + generated_at=datetime.now(timezone.utc).isoformat(), + target_minutes=target_minutes, + ) + + used_issue_numbers: set[int] = set() + + def add_best_item(issues: list[dict], max_minutes: int | None = None) -> bool: + """Add the best available issue of a type to the path.""" + for issue in issues: + number = issue.get("number", 0) + if number in used_issue_numbers: + continue + + est_time = estimate_time(issue) + if max_minutes and est_time > max_minutes: + continue + + used_issue_numbers.add(number) + path.items.append( + PathItem( + number=number, + title=issue.get("title", "Untitled"), + size=extract_size(issue.get("labels", [])), + issue_type=classify_issue_type(issue), + estimated_minutes=est_time, + url=issue.get("html_url", ""), + ) + ) + return True + return False + + # Phase 1: Warm-up with triage (5-10 min) + if grouped_issues["triage"]: + add_best_item(grouped_issues["triage"], max_minutes=15) + else: + # Fallback: use smallest available issue + all_issues = ( + grouped_issues["fix"] + + grouped_issues["docs"] + + grouped_issues["refactor"] + ) + all_issues.sort(key=lambda i: score_issue_for_path(i), reverse=True) + add_best_item(all_issues, max_minutes=10) + + # Phase 2: First micro-fix (10-15 min) + if grouped_issues["fix"]: + add_best_item(grouped_issues["fix"], max_minutes=20) + else: + # Fallback to refactor + add_best_item(grouped_issues["refactor"], max_minutes=15) + + # Phase 3: Test improvement (10-15 min) + if grouped_issues["test"]: + add_best_item(grouped_issues["test"], max_minutes=20) + else: + # If no test issues, add another fix + add_best_item(grouped_issues["fix"], max_minutes=15) + + # Phase 4: Closure fix or docs (10-15 min) + # Try to fill remaining time + remaining_budget = target_minutes - path.total_estimated_minutes + if remaining_budget >= 10: + # Prefer fix, then docs + if not add_best_item(grouped_issues["fix"], max_minutes=remaining_budget): + if not add_best_item(grouped_issues["docs"], max_minutes=remaining_budget): + add_best_item(grouped_issues["refactor"], max_minutes=remaining_budget) + + return path + + +def generate_golden_path( + target_minutes: int = 45, + config: dict | None = None, +) -> GoldenPath: + """Generate a Golden Path for the specified time budget. + + Args: + target_minutes: Target session length (30-60 recommended) + config: Optional config override + + Returns: + A GoldenPath with ordered items from real Gitea issues + """ + cfg = config or load_config() + token = get_token(cfg) + client = GiteaClient(cfg, token) + + if not client.is_available(): + # Return empty path with error indication + return GoldenPath( + generated_at=datetime.now(timezone.utc).isoformat(), + target_minutes=target_minutes, + items=[], + ) + + issues = fetch_eligible_issues(client, cfg) + grouped = group_issues_by_type(issues) + return build_golden_path(grouped, target_minutes) + + +# ── Output Formatting ───────────────────────────────────────────────────── + + +def print_golden_path(path: GoldenPath) -> None: + """Print a formatted Golden Path to stdout.""" + print("=" * 60) + print("🌟 GOLDEN PATH") + print("=" * 60) + print(f"Generated: {path.generated_at}") + print(f"Target: {path.target_minutes} minutes") + print(f"Estimated: {path.total_estimated_minutes} minutes") + print() + + if not path.items: + print("No eligible issues found for a Golden Path.") + print() + print("To create Golden Paths, ensure issues have:") + print(" - Size labels: size:XS, size:S, or size:M") + print(" - Type labels: bug, test, triage, docs, refactor") + print() + return + + for i, item in enumerate(path.items, 1): + type_emoji = { + "triage": "🧹", + "fix": "🔧", + "test": "🧪", + "docs": "📚", + "refactor": "♻️", + }.get(item.issue_type, "📋") + + print(f"{i}. {type_emoji} #{item.number} [{item.size}] ({item.estimated_minutes}m)") + print(f" Title: {item.title}") + print(f" Type: {item.issue_type.upper()}") + if item.url: + print(f" URL: {item.url}") + print() + + print("-" * 60) + print("Instructions:") + print(" 1. Start with the triage item to warm up") + print(" 2. Progress through fixes to build momentum") + print(" 3. Use the test item for quality focus") + print(" 4. Check off items as you complete them") + print() + + +# ── CLI ─────────────────────────────────────────────────────────────────── + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser( + description="Golden Path generator — coherent 30-60 minute mini-sessions", + ) + p.add_argument( + "--minutes", + "-m", + type=int, + default=45, + help="Target session length in minutes (default: 45)", + ) + p.add_argument( + "--json", + "-j", + action="store_true", + help="Output as JSON instead of formatted text", + ) + return p.parse_args() + + +def main() -> int: + args = parse_args() + + # Validate target minutes + target = max(30, min(60, args.minutes)) + if target != args.minutes: + print( + f"[golden_path] Warning: Clamped {args.minutes}m to {target}m range", + file=sys.stderr, + ) + + path = generate_golden_path(target_minutes=target) + + if args.json: + print(path.to_json()) + else: + print_golden_path(path) + + return 0 if path.items else 1 + + +if __name__ == "__main__": + sys.exit(main())