forked from Rockachopa/Timmy-time-dashboard
257 lines
8.2 KiB
Python
257 lines
8.2 KiB
Python
"""Git Hand — version-control operations for Timmy.
|
|
|
|
Provides git capabilities with:
|
|
- Safe defaults (no force-push without explicit override)
|
|
- Approval gates for destructive operations
|
|
- Structured result parsing
|
|
- Working-directory pinning to repo root
|
|
|
|
Follows project conventions:
|
|
- Config via ``from config import settings``
|
|
- Singleton pattern for module-level import
|
|
- Graceful degradation: log error, return fallback, never crash
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
|
|
from config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Operations that require explicit confirmation before execution
|
|
DESTRUCTIVE_OPS = frozenset(
|
|
{
|
|
"push --force",
|
|
"push -f",
|
|
"reset --hard",
|
|
"clean -fd",
|
|
"clean -f",
|
|
"branch -D",
|
|
"checkout -- .",
|
|
"restore .",
|
|
}
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class GitResult:
|
|
"""Result from a git operation."""
|
|
|
|
operation: str
|
|
success: bool
|
|
output: str = ""
|
|
error: str = ""
|
|
latency_ms: float = 0.0
|
|
requires_confirmation: bool = False
|
|
metadata: dict = field(default_factory=dict)
|
|
|
|
|
|
class GitHand:
|
|
"""Git operations hand for Timmy.
|
|
|
|
All methods degrade gracefully — if git is not available or the
|
|
command fails, the hand returns a ``GitResult(success=False)``
|
|
rather than raising.
|
|
"""
|
|
|
|
def __init__(self, repo_dir: str | None = None, timeout: int = 60) -> None:
|
|
self._repo_dir = repo_dir or settings.repo_root or None
|
|
self._timeout = timeout
|
|
logger.info("GitHand initialised — repo=%s", self._repo_dir)
|
|
|
|
def _is_destructive(self, args: str) -> bool:
|
|
"""Check if a git operation is destructive."""
|
|
for op in DESTRUCTIVE_OPS:
|
|
if op in args:
|
|
return True
|
|
return False
|
|
|
|
async def _exec_subprocess(
|
|
self,
|
|
args: str,
|
|
timeout: int,
|
|
) -> tuple[bytes, bytes, int]:
|
|
"""Run git as a subprocess, return (stdout, stderr, returncode).
|
|
|
|
Raises TimeoutError if the process exceeds *timeout* seconds.
|
|
"""
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"git",
|
|
*args.split(),
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
cwd=self._repo_dir,
|
|
)
|
|
try:
|
|
stdout, stderr = await asyncio.wait_for(
|
|
proc.communicate(),
|
|
timeout=timeout,
|
|
)
|
|
except TimeoutError:
|
|
proc.kill()
|
|
await proc.wait()
|
|
raise
|
|
return stdout, stderr, proc.returncode or 0
|
|
|
|
@staticmethod
|
|
def _parse_output(
|
|
command: str,
|
|
stdout_bytes: bytes,
|
|
stderr_bytes: bytes,
|
|
returncode: int | None,
|
|
latency_ms: float,
|
|
) -> GitResult:
|
|
"""Decode subprocess output into a GitResult."""
|
|
exit_code = returncode or 0
|
|
stdout = stdout_bytes.decode("utf-8", errors="replace").strip()
|
|
stderr = stderr_bytes.decode("utf-8", errors="replace").strip()
|
|
return GitResult(
|
|
operation=command,
|
|
success=exit_code == 0,
|
|
output=stdout,
|
|
error=stderr if exit_code != 0 else "",
|
|
latency_ms=latency_ms,
|
|
)
|
|
|
|
async def run(
|
|
self,
|
|
args: str,
|
|
timeout: int | None = None,
|
|
allow_destructive: bool = False,
|
|
) -> GitResult:
|
|
"""Execute a git command.
|
|
|
|
Args:
|
|
args: Git arguments (e.g. "status", "log --oneline -5").
|
|
timeout: Override default timeout (seconds).
|
|
allow_destructive: Must be True to run destructive ops.
|
|
|
|
Returns:
|
|
GitResult with output or error details.
|
|
"""
|
|
start = time.time()
|
|
command = f"git {args}"
|
|
|
|
# Gate destructive operations
|
|
if self._is_destructive(args) and not allow_destructive:
|
|
return GitResult(
|
|
operation=command,
|
|
success=False,
|
|
error=(
|
|
f"Destructive operation blocked: '{command}'. "
|
|
"Set allow_destructive=True to override."
|
|
),
|
|
requires_confirmation=True,
|
|
latency_ms=(time.time() - start) * 1000,
|
|
)
|
|
|
|
effective_timeout = timeout or self._timeout
|
|
|
|
try:
|
|
stdout_bytes, stderr_bytes, returncode = await self._exec_subprocess(
|
|
args,
|
|
effective_timeout,
|
|
)
|
|
except TimeoutError:
|
|
latency = (time.time() - start) * 1000
|
|
logger.warning("Git command timed out after %ds: %s", effective_timeout, command)
|
|
return GitResult(
|
|
operation=command,
|
|
success=False,
|
|
error=f"Command timed out after {effective_timeout}s",
|
|
latency_ms=latency,
|
|
)
|
|
except FileNotFoundError:
|
|
latency = (time.time() - start) * 1000
|
|
logger.warning("git binary not found")
|
|
return GitResult(
|
|
operation=command,
|
|
success=False,
|
|
error="git binary not found on PATH",
|
|
latency_ms=latency,
|
|
)
|
|
except Exception as exc:
|
|
latency = (time.time() - start) * 1000
|
|
logger.warning("Git command failed: %s — %s", command, exc)
|
|
return GitResult(
|
|
operation=command,
|
|
success=False,
|
|
error=str(exc),
|
|
latency_ms=latency,
|
|
)
|
|
|
|
return self._parse_output(
|
|
command,
|
|
stdout_bytes,
|
|
stderr_bytes,
|
|
returncode=returncode,
|
|
latency_ms=(time.time() - start) * 1000,
|
|
)
|
|
|
|
# ── Convenience wrappers ─────────────────────────────────────────────────
|
|
|
|
async def status(self) -> GitResult:
|
|
"""Run ``git status --short``."""
|
|
return await self.run("status --short")
|
|
|
|
async def log(self, count: int = 10) -> GitResult:
|
|
"""Run ``git log --oneline``."""
|
|
return await self.run(f"log --oneline -{count}")
|
|
|
|
async def diff(self, staged: bool = False) -> GitResult:
|
|
"""Run ``git diff`` (or ``git diff --cached`` for staged)."""
|
|
args = "diff --cached" if staged else "diff"
|
|
return await self.run(args)
|
|
|
|
async def add(self, paths: str = ".") -> GitResult:
|
|
"""Stage files."""
|
|
return await self.run(f"add {paths}")
|
|
|
|
async def commit(self, message: str) -> GitResult:
|
|
"""Create a commit with the given message."""
|
|
# Use -- to prevent message from being interpreted as flags
|
|
return await self.run(f"commit -m {message!r}")
|
|
|
|
async def checkout_branch(self, branch: str, create: bool = False) -> GitResult:
|
|
"""Checkout (or create) a branch."""
|
|
flag = "-b" if create else ""
|
|
return await self.run(f"checkout {flag} {branch}".strip())
|
|
|
|
async def push(
|
|
self, remote: str = "origin", branch: str = "", force: bool = False
|
|
) -> GitResult:
|
|
"""Push to remote. Force-push requires explicit opt-in."""
|
|
args = f"push -u {remote} {branch}".strip()
|
|
if force:
|
|
args = f"push --force {remote} {branch}".strip()
|
|
return await self.run(args, allow_destructive=True)
|
|
return await self.run(args)
|
|
|
|
async def clone(self, url: str, dest: str = "") -> GitResult:
|
|
"""Clone a repository."""
|
|
args = f"clone {url}"
|
|
if dest:
|
|
args += f" {dest}"
|
|
return await self.run(args, timeout=120)
|
|
|
|
async def pull(self, remote: str = "origin", branch: str = "") -> GitResult:
|
|
"""Pull from remote."""
|
|
args = f"pull {remote} {branch}".strip()
|
|
return await self.run(args)
|
|
|
|
def info(self) -> dict:
|
|
"""Return a status summary for the dashboard."""
|
|
return {
|
|
"repo_dir": self._repo_dir,
|
|
"default_timeout": self._timeout,
|
|
}
|
|
|
|
|
|
# ── Module-level singleton ──────────────────────────────────────────────────
|
|
git_hand = GitHand()
|