Files
timmy-config/wizards/allegro-primus/repo_manager.py
2026-03-31 20:02:01 +00:00

501 lines
15 KiB
Python

"""
Repository Manager for Allegro-Primus
Handles git operations: clone, branch, commit, push, and PR creation.
"""
import os
import re
import logging
import subprocess
from pathlib import Path
from typing import Optional, List, Tuple, Dict, Any
from dataclasses import dataclass
from urllib.parse import urlparse
from gitea_client import GiteaClient
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class GitConfig:
"""Git configuration for commits."""
name: str
email: str
class RepoManager:
"""Manages git repositories and integrates with Gitea API."""
def __init__(
self,
base_path: str = "./repos",
gitea_client: Optional[GiteaClient] = None,
git_config: Optional[GitConfig] = None
):
"""
Initialize RepoManager.
Args:
base_path: Directory to clone repositories into
gitea_client: GiteaClient instance for API operations
git_config: Git user configuration
"""
self.base_path = Path(base_path).resolve()
self.base_path.mkdir(parents=True, exist_ok=True)
self.gitea = gitea_client or GiteaClient()
self.git_config = git_config or self._get_default_git_config()
logger.info(f"RepoManager initialized at {self.base_path}")
def _get_default_git_config(self) -> GitConfig:
"""Get git config from environment or defaults."""
return GitConfig(
name=os.getenv("GIT_USER_NAME", "Allegro-Primus"),
email=os.getenv("GIT_USER_EMAIL", "ap@allegro-primus.local")
)
def _run_git(
self,
repo_path: Path,
args: List[str],
check: bool = True,
capture_output: bool = True
) -> Tuple[int, str, str]:
"""
Execute git command in repository.
Returns:
Tuple of (returncode, stdout, stderr)
"""
cmd = ["git", "-C", str(repo_path)] + args
logger.debug(f"Running: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=capture_output,
text=True,
check=False
)
if check and result.returncode != 0:
raise RuntimeError(
f"Git command failed: {' '.join(args)}\n"
f"stderr: {result.stderr}"
)
return result.returncode, result.stdout, result.stderr
def _get_auth_url(self, owner: str, repo: str) -> str:
"""Get authenticated clone URL."""
# Parse base URL to inject token
parsed = urlparse(self.gitea.base_url)
token = self.gitea.token
# Build auth URL: http://token@host:port/owner/repo.git
auth_netloc = f"{token}@{parsed.netloc}"
auth_url = f"{parsed.scheme}://{auth_netloc}/{owner}/{repo}.git"
return auth_url
def clone(
self,
owner: str,
repo: str,
branch: Optional[str] = None,
depth: Optional[int] = None
) -> Path:
"""
Clone a repository.
Args:
owner: Repository owner
repo: Repository name
branch: Specific branch to clone
depth: Shallow clone depth
Returns:
Path to cloned repository
"""
repo_path = self.base_path / f"{owner}_{repo}"
# Remove existing if present
if repo_path.exists():
logger.info(f"Removing existing clone at {repo_path}")
import shutil
shutil.rmtree(repo_path)
auth_url = self._get_auth_url(owner, repo)
args = ["clone"]
if branch:
args.extend(["--branch", branch, "--single-branch"])
if depth:
args.extend(["--depth", str(depth)])
args.extend([auth_url, str(repo_path)])
# Run clone from base_path
result = subprocess.run(
["git"] + args,
capture_output=True,
text=True,
check=False,
cwd=self.base_path
)
if result.returncode != 0:
raise RuntimeError(f"Clone failed: {result.stderr}")
# Configure git user
self._configure_git_user(repo_path)
logger.info(f"Cloned {owner}/{repo} to {repo_path}")
return repo_path
def _configure_git_user(self, repo_path: Path) -> None:
"""Set git user name and email for the repository."""
self._run_git(repo_path, ["config", "user.name", self.git_config.name])
self._run_git(repo_path, ["config", "user.email", self.git_config.email])
def ensure_repo(self, owner: str, repo: str, branch: Optional[str] = None) -> Path:
"""
Ensure repository exists locally, clone if needed.
Args:
owner: Repository owner
repo: Repository name
branch: Default branch
Returns:
Path to repository
"""
repo_path = self.base_path / f"{owner}_{repo}"
if repo_path.exists():
logger.info(f"Using existing clone at {repo_path}")
# Update remote URL in case token changed
auth_url = self._get_auth_url(owner, repo)
self._run_git(repo_path, ["remote", "set-url", "origin", auth_url])
# Fetch latest
self._run_git(repo_path, ["fetch", "origin"])
return repo_path
return self.clone(owner, repo, branch)
def create_branch(
self,
owner: str,
repo: str,
branch_name: str,
base_branch: str = "main"
) -> Path:
"""
Create and checkout a new branch.
Args:
owner: Repository owner
repo: Repository name
branch_name: New branch name
base_branch: Branch to create from
Returns:
Path to repository
"""
repo_path = self.ensure_repo(owner, repo)
# Ensure clean state
self._run_git(repo_path, ["checkout", base_branch])
self._run_git(repo_path, ["pull", "origin", base_branch])
# Create and checkout new branch
self._run_git(repo_path, ["checkout", "-b", branch_name])
logger.info(f"Created branch {branch_name} from {base_branch}")
return repo_path
def commit_changes(
self,
owner: str,
repo: str,
message: str,
files: Optional[List[str]] = None,
allow_empty: bool = False
) -> str:
"""
Commit changes in repository.
Args:
owner: Repository owner
repo: Repository name
message: Commit message
files: Specific files to stage (None = all changes)
allow_empty: Allow empty commits
Returns:
Commit hash
"""
repo_path = self.ensure_repo(owner, repo)
# Stage files
if files:
for f in files:
self._run_git(repo_path, ["add", f])
else:
self._run_git(repo_path, ["add", "-A"])
# Commit
args = ["commit", "-m", message]
if allow_empty:
args.append("--allow-empty")
self._run_git(repo_path, args)
# Get commit hash
_, stdout, _ = self._run_git(repo_path, ["rev-parse", "HEAD"])
commit_hash = stdout.strip()
logger.info(f"Created commit {commit_hash[:8]}: {message.split(chr(10))[0]}")
return commit_hash
def push(
self,
owner: str,
repo: str,
branch: str,
force: bool = False
) -> None:
"""
Push branch to origin.
Args:
owner: Repository owner
repo: Repository name
branch: Branch to push
force: Force push
"""
repo_path = self.ensure_repo(owner, repo)
args = ["push", "origin", branch]
if force:
args.append("--force")
self._run_git(repo_path, args)
logger.info(f"Pushed {branch} to origin")
def pull(self, owner: str, repo: str, branch: str = "main") -> None:
"""Pull latest changes from origin."""
repo_path = self.ensure_repo(owner, repo)
self._run_git(repo_path, ["pull", "origin", branch])
logger.info(f"Pulled {branch} from origin")
def get_current_branch(self, owner: str, repo: str) -> str:
"""Get current git branch."""
repo_path = self.ensure_repo(owner, repo)
_, stdout, _ = self._run_git(repo_path, ["branch", "--show-current"])
return stdout.strip()
def get_status(self, owner: str, repo: str) -> Dict[str, Any]:
"""Get repository status."""
repo_path = self.ensure_repo(owner, repo)
# Check for uncommitted changes
_, stdout, _ = self._run_git(repo_path, ["status", "--porcelain"])
has_changes = bool(stdout.strip())
# Get current branch
branch = self.get_current_branch(owner, repo)
# Get last commit
_, commit, _ = self._run_git(
repo_path, ["log", "-1", "--format=%H %s"]
)
commit_hash, commit_msg = commit.strip().split(" ", 1) if commit else ("", "")
return {
"path": str(repo_path),
"branch": branch,
"has_uncommitted_changes": has_changes,
"last_commit_hash": commit_hash,
"last_commit_message": commit_msg
}
def open_pull_request(
self,
owner: str,
repo: str,
title: str,
head_branch: str,
base_branch: str = "main",
body: str = "",
draft: bool = False
) -> Dict[str, Any]:
"""
Create a pull request via Gitea API.
Args:
owner: Repository owner
repo: Repository name
title: PR title
head_branch: Branch with changes
base_branch: Target branch
body: PR description
draft: Create as draft
Returns:
Created PR data
"""
return self.gitea.create_pull_request(
owner=owner,
repo=repo,
title=title,
head=head_branch,
base=base_branch,
body=body,
draft=draft
)
def commit_and_push(
self,
owner: str,
repo: str,
branch: str,
message: str,
files: Optional[List[str]] = None
) -> Tuple[str, None]:
"""
Commit changes and push in one operation.
Returns:
Tuple of (commit_hash, None)
"""
commit_hash = self.commit_changes(owner, repo, message, files)
self.push(owner, repo, branch)
return commit_hash, None
def create_work_branch(
self,
owner: str,
repo: str,
issue_number: int,
description: str = ""
) -> str:
"""
Create a standardized branch name for working on an issue.
Args:
owner: Repository owner
repo: Repository name
issue_number: Issue number
description: Short description for branch name
Returns:
Branch name
"""
# Sanitize description for branch name
if description:
desc = re.sub(r'[^a-zA-Z0-9_-]', '-', description.lower())
desc = re.sub(r'-+', '-', desc).strip('-')[:30]
branch_name = f"ap-issue-{issue_number}-{desc}"
else:
branch_name = f"ap-issue-{issue_number}"
self.create_branch(owner, repo, branch_name)
return branch_name
def apply_patch(
self,
owner: str,
repo: str,
patch_content: str,
source: str = "api"
) -> bool:
"""
Apply a patch to the repository.
Args:
owner: Repository owner
repo: Repository name
patch_content: Patch content as string
source: Source identifier for logging
Returns:
True if applied successfully
"""
repo_path = self.ensure_repo(owner, repo)
# Write patch to temporary file
patch_file = repo_path / ".temp_patch"
patch_file.write_text(patch_content)
try:
# Try to apply patch
code, stdout, stderr = self._run_git(
repo_path,
["apply", "--check", str(patch_file)],
check=False
)
if code != 0:
logger.error(f"Patch check failed: {stderr}")
return False
# Apply for real
self._run_git(repo_path, ["apply", str(patch_file)])
logger.info(f"Applied patch from {source}")
return True
finally:
patch_file.unlink(missing_ok=True)
def reset_hard(self, owner: str, repo: str, ref: str = "HEAD") -> None:
"""Hard reset repository to ref."""
repo_path = self.ensure_repo(owner, repo)
self._run_git(repo_path, ["reset", "--hard", ref])
logger.info(f"Hard reset to {ref}")
def clean(self, owner: str, repo: str) -> None:
"""Remove untracked files."""
repo_path = self.ensure_repo(owner, repo)
self._run_git(repo_path, ["clean", "-fd"])
logger.info("Cleaned untracked files")
def list_branches(self, owner: str, repo: str) -> List[str]:
"""List all branches in repository."""
repo_path = self.ensure_repo(owner, repo)
_, stdout, _ = self._run_git(
repo_path,
["branch", "-a", "--format=%(refname:short)"]
)
return [b.strip() for b in stdout.split("\n") if b.strip()]
# ==================== Example Usage ====================
if __name__ == "__main__":
# Initialize manager
manager = RepoManager(base_path="./repos")
# Example workflow
owner = "myuser"
repo = "myrepo"
# Clone or update repo
manager.ensure_repo(owner, repo)
# Create work branch
branch = manager.create_work_branch(owner, repo, 42, "fix-login-bug")
# Make changes (imagine file edits here)
# ... edit files ...
# Commit and push
# manager.commit_and_push(owner, repo, branch, "Fix login bug (#42)")
# Create PR
# pr = manager.open_pull_request(
# owner, repo,
# title="Fix login bug",
# head_branch=branch,
# body="Closes #42"
# )