#!/usr/bin/env python3 """ Diff Analyzer — Parse unified diffs and categorize every change. Pipeline 6.1 for Compounding Intelligence. """ import re from dataclasses import dataclass, field, asdict from enum import Enum from typing import List, Dict, Any, Optional class ChangeCategory(Enum): ADDED = "added" DELETED = "deleted" MODIFIED = "modified" MOVED = "moved" CONTEXT = "context" @dataclass class Hunk: """A single diff hunk with header, line ranges, and category.""" header: str old_start: int old_count: int new_start: int new_count: int lines: List[str] = field(default_factory=list) category: ChangeCategory = ChangeCategory.CONTEXT def to_dict(self) -> Dict[str, Any]: d = asdict(self) d["category"] = self.category.value return d @dataclass class FileChange: """A single file's changes.""" path: str old_path: Optional[str] = None # For renames hunks: List[Hunk] = field(default_factory=list) added_lines: int = 0 deleted_lines: int = 0 is_new: bool = False is_deleted: bool = False is_renamed: bool = False is_binary: bool = False def to_dict(self) -> Dict[str, Any]: return { "path": self.path, "old_path": self.old_path, "hunks": [h.to_dict() for h in self.hunks], "added_lines": self.added_lines, "deleted_lines": self.deleted_lines, "is_new": self.is_new, "is_deleted": self.is_deleted, "is_renamed": self.is_renamed, "is_binary": self.is_binary, } @dataclass class ChangeSummary: """Aggregate stats + per-file breakdown.""" files: List[FileChange] = field(default_factory=list) total_added: int = 0 total_deleted: int = 0 total_files_changed: int = 0 total_hunks: int = 0 new_files: int = 0 deleted_files: int = 0 renamed_files: int = 0 binary_files: int = 0 def to_dict(self) -> Dict[str, Any]: return { "total_files_changed": self.total_files_changed, "total_added": self.total_added, "total_deleted": self.total_deleted, "total_hunks": self.total_hunks, "new_files": self.new_files, "deleted_files": self.deleted_files, "renamed_files": self.renamed_files, "binary_files": self.binary_files, "files": [f.to_dict() for f in self.files], } class DiffAnalyzer: """Parses unified diff format and produces structured ChangeSummary.""" HUNK_HEADER_RE = re.compile(r"^@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@(.*)$") DIFF_FILE_RE = re.compile(r"^diff --git a/(.*) b/(.*)") RENAME_RE = re.compile(r"^rename from (.+)$") RENAME_TO_RE = re.compile(r"^rename to (.+)$") NEW_FILE_RE = re.compile(r"^new file mode") DELETED_FILE_RE = re.compile(r"^deleted file mode") BINARY_RE = re.compile(r"^Binary files .* differ") def analyze(self, diff_text: str) -> ChangeSummary: """Parse a unified diff and return a ChangeSummary.""" summary = ChangeSummary() if not diff_text or not diff_text.strip(): return summary # Split diff into per-file sections file_diffs = self._split_files(diff_text) for file_diff in file_diffs: fc = self._parse_file_diff(file_diff) summary.files.append(fc) summary.total_added += fc.added_lines summary.total_deleted += fc.deleted_lines summary.total_hunks += len(fc.hunks) if fc.is_new: summary.new_files += 1 if fc.is_deleted: summary.deleted_files += 1 if fc.is_renamed: summary.renamed_files += 1 if fc.is_binary: summary.binary_files += 1 summary.total_files_changed = len(summary.files) return summary def _split_files(self, diff_text: str) -> List[str]: """Split a multi-file diff into individual file diffs.""" lines = diff_text.split("\n") chunks = [] current = [] for line in lines: if line.startswith("diff --git ") and current: chunks.append("\n".join(current)) current = [line] else: current.append(line) if current: chunks.append("\n".join(current)) return chunks def _parse_file_diff(self, diff_text: str) -> FileChange: """Parse a single file's diff section.""" lines = diff_text.split("\n") fc = FileChange(path="") # Extract file paths for line in lines: m = self.DIFF_FILE_RE.match(line) if m: fc.path = m.group(2) break # Check for special states for line in lines: if self.NEW_FILE_RE.match(line): fc.is_new = True elif self.DELETED_FILE_RE.match(line): fc.is_deleted = True elif self.RENAME_RE.match(line): fc.old_path = m.group(1) if (m := self.RENAME_RE.match(line)) else None fc.is_renamed = True elif self.BINARY_RE.match(line): fc.is_binary = True return fc # No hunks for binary # Rename TO for line in lines: m = self.RENAME_TO_RE.match(line) if m and fc.is_renamed: fc.path = m.group(1) # Parse hunks current_hunk = None for line in lines: m = self.HUNK_HEADER_RE.match(line) if m: if current_hunk: self._classify_hunk(current_hunk, fc) fc.hunks.append(current_hunk) current_hunk = Hunk( header=m.group(5).strip(), old_start=int(m.group(1)), old_count=int(m.group(2) or 1), new_start=int(m.group(3)), new_count=int(m.group(4) or 1), ) elif current_hunk and (line.startswith("+") or line.startswith("-") or line.startswith(" ")): current_hunk.lines.append(line) if current_hunk: self._classify_hunk(current_hunk, fc) fc.hunks.append(current_hunk) return fc def _classify_hunk(self, hunk: Hunk, fc: FileChange): """Classify a hunk and count lines.""" added = sum(1 for l in hunk.lines if l.startswith("+")) deleted = sum(1 for l in hunk.lines if l.startswith("-")) fc.added_lines += added fc.deleted_lines += deleted if added > 0 and deleted == 0: hunk.category = ChangeCategory.ADDED elif deleted > 0 and added == 0: hunk.category = ChangeCategory.DELETED elif added > 0 and deleted > 0: hunk.category = ChangeCategory.MODIFIED else: hunk.category = ChangeCategory.CONTEXT