Files
timmy-config/wizards/allegro-primus/git_tools/worktree_manager.py
2026-03-31 20:02:01 +00:00

311 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Worktree Manager for Allegro-Primus
Handles git worktree operations for parallel task execution
"""
import os
import re
import shutil
import subprocess
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional, Dict
from dataclasses import dataclass
@dataclass
class WorktreeInfo:
"""Represents a git worktree"""
path: str
branch: str
commit: str
is_detached: bool
is_main: bool
last_modified: datetime
class WorktreeManager:
"""Manages git worktrees for parallel development"""
def __init__(self, repo_root: str = None):
self.repo_root = repo_root or self._find_repo_root()
self.worktrees_dir = Path(self.repo_root) / ".worktrees"
self.worktrees_dir.mkdir(exist_ok=True)
def _find_repo_root(self) -> str:
"""Find the git repository root"""
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True
)
if result.returncode != 0:
raise RuntimeError("Not in a git repository")
return result.stdout.strip()
def _run_git(self, args: List[str], cwd: str = None) -> subprocess.CompletedProcess:
"""Run a git command"""
return subprocess.run(
["git"] + args,
capture_output=True,
text=True,
cwd=cwd or self.repo_root
)
def generate_branch_name(self, issue_number: str, description: str = None) -> str:
"""Generate a branch name from issue number and optional description"""
# Clean up description for branch name
if description:
# Remove special chars, replace spaces with hyphens
clean_desc = re.sub(r'[^\w\s-]', '', description.lower())
clean_desc = re.sub(r'[-\s]+', '-', clean_desc).strip('-')
return f"ap-issue-{issue_number}-{clean_desc[:40]}"
return f"ap-issue-{issue_number}"
def create_worktree(self, issue_number: str,
description: str = None,
base_branch: str = "main") -> str:
"""
Create a new worktree for an issue
Args:
issue_number: The issue/PR number
description: Optional description for branch naming
base_branch: Branch to base the new worktree on
Returns:
Path to the new worktree
"""
branch_name = self.generate_branch_name(issue_number, description)
worktree_path = self.worktrees_dir / f"issue-{issue_number}"
# Check if we're in a fresh repo without origin
remote_check = self._run_git(["remote"])
has_remote = bool(remote_check.stdout.strip())
if has_remote:
# Ensure base branch exists locally
self._run_git(["fetch", "origin", base_branch])
# Create branch if it doesn't exist
branch_check = self._run_git(["branch", "--list", branch_name])
if not branch_check.stdout.strip():
# Create branch from base without checking it out
current = self._run_git(["rev-parse", "HEAD"]).stdout.strip()
result = self._run_git(
["branch", branch_name, current]
)
if result.returncode != 0:
raise RuntimeError(f"Failed to create branch: {result.stderr}")
# Create worktree
if worktree_path.exists():
# Remove existing worktree for this issue
self.remove_worktree(issue_number)
result = self._run_git([
"worktree", "add",
str(worktree_path),
branch_name
])
if result.returncode != 0:
raise RuntimeError(f"Failed to create worktree: {result.stderr}")
# Create a marker file with metadata
marker = worktree_path / ".ap-worktree"
marker.write_text(f"issue: {issue_number}\n")
marker.write_text(f"branch: {branch_name}\n")
marker.write_text(f"created: {datetime.now().isoformat()}\n")
return str(worktree_path)
def list_worktrees(self) -> List[WorktreeInfo]:
"""List all active worktrees with details"""
result = self._run_git(["worktree", "list", "--porcelain"])
worktrees = []
current = {}
for line in result.stdout.split('\n'):
line = line.strip()
if not line and current:
worktrees.append(self._parse_worktree(current))
current = {}
elif line.startswith('worktree '):
current['path'] = line[9:]
elif line.startswith('HEAD '):
current['commit'] = line[5:]
elif line.startswith('branch '):
current['branch'] = line[7:].replace('refs/heads/', '')
elif line == 'detached':
current['detached'] = True
elif line == 'bare':
current['bare'] = True
if current:
worktrees.append(self._parse_worktree(current))
return worktrees
def _parse_worktree(self, data: Dict) -> WorktreeInfo:
"""Parse worktree data from git output"""
path = data.get('path', '')
is_main = path == self.repo_root
# Get last modified time
try:
mtime = os.path.getmtime(path)
last_modified = datetime.fromtimestamp(mtime)
except OSError:
last_modified = datetime.now()
return WorktreeInfo(
path=path,
branch=data.get('branch', 'detached'),
commit=data.get('commit', ''),
is_detached=data.get('detached', False),
is_main=is_main,
last_modified=last_modified
)
def get_worktree_for_issue(self, issue_number: str) -> Optional[str]:
"""Get the path for a specific issue's worktree"""
worktree_path = self.worktrees_dir / f"issue-{issue_number}"
if worktree_path.exists():
return str(worktree_path)
return None
def remove_worktree(self, issue_number: str) -> bool:
"""Remove a worktree for an issue"""
worktree_path = self.worktrees_dir / f"issue-{issue_number}"
if not worktree_path.exists():
return False
# Remove worktree from git
result = self._run_git(["worktree", "remove", "-f", str(worktree_path)])
# Clean up directory if still exists
if worktree_path.exists():
shutil.rmtree(worktree_path, ignore_errors=True)
return result.returncode == 0
def clean_stale_worktrees(self, max_age_days: int = 7) -> List[str]:
"""
Remove worktrees older than max_age_days
Returns:
List of removed worktree paths
"""
worktrees = self.list_worktrees()
removed = []
cutoff = datetime.now() - timedelta(days=max_age_days)
for wt in worktrees:
if wt.is_main:
continue
if wt.last_modified < cutoff:
issue_match = re.search(r'issue-(\d+)', wt.path)
if issue_match:
issue_number = issue_match.group(1)
if self.remove_worktree(issue_number):
removed.append(wt.path)
return removed
def sync_changes(self, source_issue: str, target_issue: str = None):
"""
Sync changes from one worktree to another or to main
Args:
source_issue: Issue number to sync from
target_issue: Issue number to sync to (None = main)
"""
source_path = self.get_worktree_for_issue(source_issue)
if not source_path:
raise ValueError(f"No worktree found for issue {source_issue}")
# Get current branch in source
result = self._run_git(["branch", "--show-current"], cwd=source_path)
branch = result.stdout.strip()
# Push changes
result = self._run_git(["push", "origin", branch], cwd=source_path)
if target_issue:
# Pull into target
target_path = self.get_worktree_for_issue(target_issue)
if target_path:
self._run_git(["fetch", "origin", branch], cwd=target_path)
self._run_git(["merge", f"origin/{branch}"], cwd=target_path)
else:
# Pull into current branch (main/master)
current_branch = self._run_git(["branch", "--show-current"]).stdout.strip()
self._run_git(["fetch", "origin"])
self._run_git(["merge", f"origin/{branch}"])
def get_worktree_status(self, issue_number: str) -> Dict:
"""Get detailed status of a worktree"""
path = self.get_worktree_for_issue(issue_number)
if not path:
return {"exists": False}
# Check for uncommitted changes
status_result = self._run_git(["status", "--porcelain"], cwd=path)
has_changes = bool(status_result.stdout.strip())
# Get last commit
log_result = self._run_git(["log", "-1", "--oneline"], cwd=path)
last_commit = log_result.stdout.strip()
# Check branch
branch_result = self._run_git(["branch", "--show-current"], cwd=path)
branch = branch_result.stdout.strip()
return {
"exists": True,
"path": path,
"branch": branch,
"has_uncommitted_changes": has_changes,
"last_commit": last_commit,
"files_changed": status_result.stdout.strip().split('\n') if has_changes else []
}
if __name__ == "__main__":
# Simple CLI test
import sys
wm = WorktreeManager()
if len(sys.argv) < 2:
print("Usage: worktree_manager.py <list|create|clean|remove>")
sys.exit(1)
cmd = sys.argv[1]
if cmd == "list":
worktrees = wm.list_worktrees()
for wt in worktrees:
print(f"{wt.path}: {wt.branch} ({'main' if wt.is_main else 'worktree'})")
elif cmd == "create" and len(sys.argv) >= 3:
issue = sys.argv[2]
desc = sys.argv[3] if len(sys.argv) > 3 else None
path = wm.create_worktree(issue, desc)
print(f"Created worktree at: {path}")
elif cmd == "clean":
removed = wm.clean_stale_worktrees()
print(f"Removed {len(removed)} stale worktrees")
for r in removed:
print(f" - {r}")
elif cmd == "remove" and len(sys.argv) >= 3:
wm.remove_worktree(sys.argv[2])
print(f"Removed worktree for issue {sys.argv[2]}")