#!/usr/bin/env python3 """Daily Run orchestration script — the 10-minute ritual. Connects to local Gitea, fetches candidate issues, and produces a concise agenda plus a day summary (review mode). The Daily Run begins with a Quick Health Snapshot (#710) to ensure mandatory systems are green before burning cycles on work that cannot land. Run: python3 timmy_automations/daily_run/orchestrator.py [--review] Env: See timmy_automations/config/daily_run.json for configuration Refs: #703, #923 """ from __future__ import annotations import argparse import json import os import sys from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError # ── Token Economy Integration ────────────────────────────────────────────── # Import token rules helpers for tracking Daily Run rewards sys.path.insert( 0, str(Path(__file__).resolve().parent.parent) ) from utils.token_rules import TokenRules, compute_token_reward # Health snapshot lives in the same package from health_snapshot import generate_snapshot as _generate_health_snapshot from health_snapshot import get_token as _hs_get_token from health_snapshot import load_config as _hs_load_config # ── Configuration ───────────────────────────────────────────────────────── REPO_ROOT = Path(__file__).resolve().parent.parent.parent CONFIG_PATH = Path(__file__).parent.parent / "config" / "daily_run.json" DEFAULT_CONFIG = { "gitea_api": "http://localhost:3000/api/v1", "repo_slug": "rockachopa/Timmy-time-dashboard", "token_file": "~/.hermes/gitea_token", "candidate_labels": ["daily-run"], "size_labels": ["size:XS", "size:S"], "layer_labels_prefix": "layer:", "max_agenda_items": 3, "lookback_hours": 24, "agenda_time_minutes": 10, } def load_config() -> dict: """Load configuration from config file with fallback to defaults.""" config = DEFAULT_CONFIG.copy() if CONFIG_PATH.exists(): try: file_config = json.loads(CONFIG_PATH.read_text()) if "orchestrator" in file_config: config.update(file_config["orchestrator"]) except (json.JSONDecodeError, OSError) as exc: print(f"[orchestrator] Warning: Could not load config: {exc}", file=sys.stderr) # Environment variable overrides if os.environ.get("TIMMY_GITEA_API"): config["gitea_api"] = os.environ.get("TIMMY_GITEA_API") if os.environ.get("TIMMY_REPO_SLUG"): config["repo_slug"] = os.environ.get("TIMMY_REPO_SLUG") if os.environ.get("TIMMY_GITEA_TOKEN"): config["token"] = os.environ.get("TIMMY_GITEA_TOKEN") return config def get_token(config: dict) -> str | None: """Get Gitea token from environment or file.""" if "token" in config: return config["token"] token_file = Path(config["token_file"]).expanduser() if token_file.exists(): return token_file.read_text().strip() return None # ── Gitea API Client ────────────────────────────────────────────────────── class GiteaClient: """Simple Gitea API client with graceful degradation.""" def __init__(self, config: dict, token: str | None): self.api_base = config["gitea_api"].rstrip("/") self.repo_slug = config["repo_slug"] self.token = token self._available: bool | None = None def _headers(self) -> dict: headers = {"Accept": "application/json"} if self.token: headers["Authorization"] = f"token {self.token}" return headers def _api_url(self, path: str) -> str: return f"{self.api_base}/repos/{self.repo_slug}/{path}" def is_available(self) -> bool: """Check if Gitea API is reachable.""" if self._available is not None: return self._available try: req = Request( f"{self.api_base}/version", headers=self._headers(), method="GET", ) with urlopen(req, timeout=5) as resp: self._available = resp.status == 200 return self._available except (HTTPError, URLError, TimeoutError): self._available = False return False def get(self, path: str, params: dict | None = None) -> list | dict: """Make a GET request to the Gitea API.""" url = self._api_url(path) if params: query = "&".join(f"{k}={v}" for k, v in params.items()) url = f"{url}?{query}" req = Request(url, headers=self._headers(), method="GET") with urlopen(req, timeout=15) as resp: return json.loads(resp.read()) def get_paginated(self, path: str, params: dict | None = None) -> list: """Fetch all pages of a paginated endpoint.""" all_items = [] page = 1 limit = 50 while True: page_params = {"limit": limit, "page": page} if params: page_params.update(params) batch = self.get(path, page_params) if not batch: break all_items.extend(batch) if len(batch) < limit: break page += 1 return all_items # ── Issue Processing ────────────────────────────────────────────────────── def extract_size(labels: list[dict]) -> str: """Extract size label from issue labels.""" for label in labels: name = label.get("name", "") if name.startswith("size:"): return name.replace("size:", "") return "?" def extract_layer(labels: list[dict]) -> str | None: """Extract layer label from issue labels.""" for label in labels: name = label.get("name", "") if name.startswith("layer:"): return name.replace("layer:", "") return None def suggest_action_type(issue: dict) -> str: """Suggest an action type based on issue labels and content.""" labels = [l.get("name", "").lower() for l in issue.get("labels", [])] title = issue.get("title", "").lower() if "bug" in labels or "fix" in title: return "fix" if "feature" in labels or "feat" in title: return "implement" if "refactor" in labels or "chore" in title: return "refactor" if "test" in labels or "test" in title: return "test" if "docs" in labels or "doc" in title: return "document" return "review" def score_issue(issue: dict) -> int: """Score an issue for prioritization (higher = more suitable for daily run).""" score = 0 labels = [l.get("name", "").lower() for l in issue.get("labels", [])] # Prefer smaller sizes if "size:xs" in labels: score += 10 elif "size:s" in labels: score += 5 elif "size:m" in labels: score += 2 # Prefer daily-run labeled issues if "daily-run" in labels: score += 3 # Prefer issues with clear type labels if any(l in labels for l in ["bug", "feature", "refactor"]): score += 2 # Slight preference for issues with body content (more context) if issue.get("body") and len(issue.get("body", "")) > 100: score += 1 return score # ── Agenda Generation ───────────────────────────────────────────────────── def fetch_candidates(client: GiteaClient, config: dict) -> list[dict]: """Fetch issues matching candidate criteria.""" candidate_labels = config["candidate_labels"] size_labels = config.get("size_labels", []) all_labels = candidate_labels + size_labels # Build label filter (OR logic via multiple label queries doesn't work well, # so we fetch by candidate label and filter sizes client-side) params = {"state": "open", "sort": "created", "labels": ",".join(candidate_labels)} try: issues = client.get_paginated("issues", params) except (HTTPError, URLError) as exc: print(f"[orchestrator] Warning: Failed to fetch issues: {exc}", file=sys.stderr) return [] # Filter by size labels if specified if size_labels: filtered = [] size_names = {s.lower() for s in size_labels} for issue in issues: issue_labels = {l.get("name", "").lower() for l in issue.get("labels", [])} if issue_labels & size_names: filtered.append(issue) issues = filtered return issues def generate_agenda(issues: list[dict], config: dict) -> dict: """Generate a Daily Run agenda from candidate issues.""" max_items = config.get("max_agenda_items", 3) agenda_time = config.get("agenda_time_minutes", 10) # Score and sort issues scored = [(score_issue(issue), issue) for issue in issues] scored.sort(key=lambda x: (-x[0], x[1].get("number", 0))) selected = scored[:max_items] items = [] for score, issue in selected: item = { "number": issue.get("number"), "title": issue.get("title", "Untitled"), "size": extract_size(issue.get("labels", [])), "layer": extract_layer(issue.get("labels", [])), "action": suggest_action_type(issue), "url": issue.get("html_url", ""), } items.append(item) return { "generated_at": datetime.now(timezone.utc).isoformat(), "time_budget_minutes": agenda_time, "item_count": len(items), "items": items, "candidates_considered": len(issues), } def print_agenda(agenda: dict) -> None: """Print a formatted agenda to stdout.""" print("=" * 60) print("📋 DAILY RUN AGENDA") print("=" * 60) print(f"Generated: {agenda['generated_at']}") print(f"Time budget: {agenda['time_budget_minutes']} minutes") print(f"Candidates considered: {agenda['candidates_considered']}") print() if not agenda["items"]: print("No items matched the criteria for today's Daily Run.") print() return for i, item in enumerate(agenda["items"], 1): layer_str = f"[{item['layer']}]" if item["layer"] else "" print(f"{i}. #{item['number']} [{item['size']}] {layer_str}") print(f" Title: {item['title']}") print(f" Action: {item['action'].upper()}") if item['url']: print(f" URL: {item['url']}") print() # ── Review Mode (Day Summary) ───────────────────────────────────────────── def fetch_recent_activity(client: GiteaClient, config: dict) -> dict: """Fetch recent issues and PRs from the lookback window.""" lookback_hours = config.get("lookback_hours", 24) since = datetime.now(timezone.utc) - timedelta(hours=lookback_hours) since_str = since.isoformat() activity = { "issues_touched": [], "issues_closed": [], "prs_merged": [], "prs_opened": [], "lookback_since": since_str, } try: # Fetch all open and closed issues updated recently for state in ["open", "closed"]: params = {"state": state, "sort": "updated", "limit": 100} issues = client.get_paginated("issues", params) for issue in issues: updated_at = issue.get("updated_at", "") if updated_at and updated_at >= since_str: activity["issues_touched"].append({ "number": issue.get("number"), "title": issue.get("title", "Untitled"), "state": issue.get("state"), "updated_at": updated_at, "url": issue.get("html_url", ""), }) if state == "closed": activity["issues_closed"].append({ "number": issue.get("number"), "title": issue.get("title", "Untitled"), "closed_at": issue.get("closed_at", ""), }) # Fetch PRs prs = client.get_paginated("pulls", {"state": "all", "sort": "updated", "limit": 100}) for pr in prs: updated_at = pr.get("updated_at", "") if updated_at and updated_at >= since_str: pr_info = { "number": pr.get("number"), "title": pr.get("title", "Untitled"), "state": pr.get("state"), "merged": pr.get("merged", False), "updated_at": updated_at, "url": pr.get("html_url", ""), } if pr.get("merged"): merged_at = pr.get("merged_at", "") if merged_at and merged_at >= since_str: activity["prs_merged"].append(pr_info) elif pr.get("created_at", "") >= since_str: activity["prs_opened"].append(pr_info) except (HTTPError, URLError) as exc: print(f"[orchestrator] Warning: Failed to fetch activity: {exc}", file=sys.stderr) return activity def load_cycle_data() -> dict: """Load cycle retrospective data if available.""" retro_file = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl" if not retro_file.exists(): return {} try: entries = [] for line in retro_file.read_text().strip().splitlines(): try: entries.append(json.loads(line)) except json.JSONDecodeError: continue # Get entries from last 24 hours since = datetime.now(timezone.utc) - timedelta(hours=24) recent = [ e for e in entries if e.get("timestamp") and datetime.fromisoformat(e["timestamp"].replace("Z", "+00:00")) >= since ] failures = [e for e in recent if not e.get("success", True)] return { "cycles_count": len(recent), "failures_count": len(failures), "failures": [ { "cycle": e.get("cycle"), "issue": e.get("issue"), "reason": e.get("reason", "Unknown"), } for e in failures[-5:] # Last 5 failures ], } except (OSError, ValueError): return {} def generate_day_summary(activity: dict, cycles: dict) -> dict: """Generate a day summary from activity data.""" return { "generated_at": datetime.now(timezone.utc).isoformat(), "lookback_hours": 24, "issues_touched": len(activity.get("issues_touched", [])), "issues_closed": len(activity.get("issues_closed", [])), "prs_merged": len(activity.get("prs_merged", [])), "prs_opened": len(activity.get("prs_opened", [])), "cycles": cycles.get("cycles_count", 0), "test_failures": cycles.get("failures_count", 0), "recent_failures": cycles.get("failures", []), } def print_day_summary(summary: dict, activity: dict) -> None: """Print a formatted day summary to stdout.""" print("=" * 60) print("📊 DAY SUMMARY (Review Mode)") print("=" * 60) print(f"Period: Last {summary['lookback_hours']} hours") print() print(f"📝 Issues touched: {summary['issues_touched']}") print(f"✅ Issues closed: {summary['issues_closed']}") print(f"🔀 PRs opened: {summary['prs_opened']}") print(f"🎉 PRs merged: {summary['prs_merged']}") print(f"🔄 Dev cycles: {summary['cycles']}") if summary["test_failures"] > 0: print(f"⚠️ Test/Build failures: {summary['test_failures']}") else: print("✅ No test/build failures") print() # Show recent failures if any if summary["recent_failures"]: print("Recent failures:") for f in summary["recent_failures"]: issue_str = f" (Issue #{f['issue']})" if f["issue"] else "" print(f" - Cycle {f['cycle']}{issue_str}: {f['reason']}") print() # Show closed issues if activity.get("issues_closed"): print("Closed issues:") for issue in activity["issues_closed"][-5:]: # Last 5 print(f" - #{issue['number']}: {issue['title'][:50]}") print() # Show merged PRs if activity.get("prs_merged"): print("Merged PRs:") for pr in activity["prs_merged"][-5:]: # Last 5 print(f" - #{pr['number']}: {pr['title'][:50]}") print() # ── Main ───────────────────────────────────────────────────────────────── def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser( description="Daily Run orchestration script — the 10-minute ritual", ) p.add_argument( "--review", "-r", action="store_true", help="Include day summary (review mode)", ) p.add_argument( "--json", "-j", action="store_true", help="Output as JSON instead of formatted text", ) p.add_argument( "--max-items", type=int, default=None, help="Override max agenda items", ) p.add_argument( "--skip-health-check", action="store_true", help="Skip the pre-flight health snapshot (not recommended)", ) p.add_argument( "--force", action="store_true", help="Continue even if health snapshot is red (overrides abort-on-red)", ) return p.parse_args() def compute_daily_run_tokens(success: bool = True) -> dict[str, Any]: """Compute token rewards for Daily Run completion. Uses the centralized token_rules configuration to calculate rewards/penalties for automation actions. Args: success: Whether the Daily Run completed successfully Returns: Token transaction details """ rules = TokenRules() if success: # Daily run completed successfully transaction = compute_token_reward("daily_run_completed", current_tokens=0) # Also compute golden path generation if agenda was created agenda_transaction = compute_token_reward("golden_path_generated", current_tokens=0) return { "daily_run": transaction, "golden_path": agenda_transaction, "total_delta": transaction.get("delta", 0) + agenda_transaction.get("delta", 0), "config_version": rules.get_config_version(), } else: # Automation failed transaction = compute_token_reward("automation_failure", current_tokens=0) return { "automation_failure": transaction, "total_delta": transaction.get("delta", 0), "config_version": rules.get_config_version(), } def run_health_snapshot(args: argparse.Namespace) -> int: """Run pre-flight health snapshot and return 0 (ok) or 1 (abort). Prints a concise summary of CI, issues, flakiness, and token economy. Returns 1 if the overall status is red AND --force was not passed. Returns 0 for green/yellow or when --force is active. On any import/runtime error the check is skipped with a warning. """ try: hs_config = _hs_load_config() hs_token = _hs_get_token(hs_config) snapshot = _generate_health_snapshot(hs_config, hs_token) except Exception as exc: # noqa: BLE001 print(f"[health] Warning: health snapshot failed ({exc}) — skipping", file=sys.stderr) return 0 # Print concise pre-flight header status_emoji = {"green": "🟢", "yellow": "🟡", "red": "🔴"}.get( snapshot.overall_status, "⚪" ) print("─" * 60) print(f"PRE-FLIGHT HEALTH CHECK {status_emoji} {snapshot.overall_status.upper()}") print("─" * 60) ci_emoji = {"pass": "✅", "fail": "❌", "unknown": "⚠️", "unavailable": "⚪"}.get( snapshot.ci.status, "⚪" ) print(f" {ci_emoji} CI: {snapshot.ci.message}") if snapshot.issues.p0_count > 0: issue_emoji = "🔴" elif snapshot.issues.p1_count > 0: issue_emoji = "🟡" else: issue_emoji = "✅" critical_str = f"{snapshot.issues.count} critical" if snapshot.issues.p0_count: critical_str += f" (P0: {snapshot.issues.p0_count})" if snapshot.issues.p1_count: critical_str += f" (P1: {snapshot.issues.p1_count})" print(f" {issue_emoji} Issues: {critical_str}") flak_emoji = {"healthy": "✅", "degraded": "🟡", "critical": "🔴", "unknown": "⚪"}.get( snapshot.flakiness.status, "⚪" ) print(f" {flak_emoji} Flakiness: {snapshot.flakiness.message}") token_emoji = {"balanced": "✅", "inflationary": "🟡", "deflationary": "🔵", "unknown": "⚪"}.get( snapshot.tokens.status, "⚪" ) print(f" {token_emoji} Tokens: {snapshot.tokens.message}") print() if snapshot.overall_status == "red" and not args.force: print( "🛑 Health status is RED — aborting Daily Run to avoid burning cycles.", file=sys.stderr, ) print( " Fix the issues above or re-run with --force to override.", file=sys.stderr, ) return 1 if snapshot.overall_status == "red": print("⚠️ Health is RED but --force passed — proceeding anyway.", file=sys.stderr) return 0 def main() -> int: args = parse_args() config = load_config() if args.max_items: config["max_agenda_items"] = args.max_items # ── Step 0: Pre-flight health snapshot ────────────────────────────────── if not args.skip_health_check: health_rc = run_health_snapshot(args) if health_rc != 0: tokens = compute_daily_run_tokens(success=False) if args.json: print(json.dumps({"error": "health_check_failed", "tokens": tokens})) return health_rc token = get_token(config) client = GiteaClient(config, token) # Check Gitea availability if not client.is_available(): error_msg = "[orchestrator] Error: Gitea API is not available" # Compute failure tokens even when unavailable tokens = compute_daily_run_tokens(success=False) if args.json: print(json.dumps({"error": error_msg, "tokens": tokens})) else: print(error_msg, file=sys.stderr) print(f"[tokens] Failure penalty: {tokens['total_delta']}", file=sys.stderr) return 1 # Fetch candidates and generate agenda candidates = fetch_candidates(client, config) agenda = generate_agenda(candidates, config) # Review mode: fetch day summary day_summary = None activity = None if args.review: activity = fetch_recent_activity(client, config) cycles = load_cycle_data() day_summary = generate_day_summary(activity, cycles) # Compute token rewards for successful completion tokens = compute_daily_run_tokens(success=True) # Output if args.json: output = {"agenda": agenda, "tokens": tokens} if day_summary: output["day_summary"] = day_summary print(json.dumps(output, indent=2)) else: print_agenda(agenda) if day_summary and activity: print_day_summary(day_summary, activity) # Show token rewards print("─" * 60) print("🪙 Token Rewards") print("─" * 60) print(f"Daily Run completed: +{tokens['daily_run']['delta']} tokens") if candidates: print(f"Golden path generated: +{tokens['golden_path']['delta']} tokens") print(f"Total: +{tokens['total_delta']} tokens") print(f"Config version: {tokens['config_version']}") return 0 if __name__ == "__main__": sys.exit(main())