diff --git a/scripts/diff_analyzer.py b/scripts/diff_analyzer.py new file mode 100644 index 0000000..ce2b673 --- /dev/null +++ b/scripts/diff_analyzer.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python3 +""" +Diff Analyzer — Parse unified diffs and categorize every change. + +Pipeline 6.1 for Compounding Intelligence. +""" + +import re +from dataclasses import dataclass, field, asdict +from enum import Enum +from typing import List, Dict, Any, Optional + + +class ChangeCategory(Enum): + ADDED = "added" + DELETED = "deleted" + MODIFIED = "modified" + MOVED = "moved" + CONTEXT = "context" + + +@dataclass +class Hunk: + """A single diff hunk with header, line ranges, and category.""" + header: str + old_start: int + old_count: int + new_start: int + new_count: int + lines: List[str] = field(default_factory=list) + category: ChangeCategory = ChangeCategory.CONTEXT + + def to_dict(self) -> Dict[str, Any]: + d = asdict(self) + d["category"] = self.category.value + return d + + +@dataclass +class FileChange: + """A single file's changes.""" + path: str + old_path: Optional[str] = None # For renames + hunks: List[Hunk] = field(default_factory=list) + added_lines: int = 0 + deleted_lines: int = 0 + is_new: bool = False + is_deleted: bool = False + is_renamed: bool = False + is_binary: bool = False + + def to_dict(self) -> Dict[str, Any]: + return { + "path": self.path, + "old_path": self.old_path, + "hunks": [h.to_dict() for h in self.hunks], + "added_lines": self.added_lines, + "deleted_lines": self.deleted_lines, + "is_new": self.is_new, + "is_deleted": self.is_deleted, + "is_renamed": self.is_renamed, + "is_binary": self.is_binary, + } + + +@dataclass +class ChangeSummary: + """Aggregate stats + per-file breakdown.""" + files: List[FileChange] = field(default_factory=list) + total_added: int = 0 + total_deleted: int = 0 + total_files_changed: int = 0 + total_hunks: int = 0 + new_files: int = 0 + deleted_files: int = 0 + renamed_files: int = 0 + binary_files: int = 0 + + def to_dict(self) -> Dict[str, Any]: + return { + "total_files_changed": self.total_files_changed, + "total_added": self.total_added, + "total_deleted": self.total_deleted, + "total_hunks": self.total_hunks, + "new_files": self.new_files, + "deleted_files": self.deleted_files, + "renamed_files": self.renamed_files, + "binary_files": self.binary_files, + "files": [f.to_dict() for f in self.files], + } + + +class DiffAnalyzer: + """Parses unified diff format and produces structured ChangeSummary.""" + + HUNK_HEADER_RE = re.compile(r"^@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@(.*)$") + DIFF_FILE_RE = re.compile(r"^diff --git a/(.*) b/(.*)") + RENAME_RE = re.compile(r"^rename from (.+)$") + RENAME_TO_RE = re.compile(r"^rename to (.+)$") + NEW_FILE_RE = re.compile(r"^new file mode") + DELETED_FILE_RE = re.compile(r"^deleted file mode") + BINARY_RE = re.compile(r"^Binary files .* differ") + + def analyze(self, diff_text: str) -> ChangeSummary: + """Parse a unified diff and return a ChangeSummary.""" + summary = ChangeSummary() + if not diff_text or not diff_text.strip(): + return summary + + # Split diff into per-file sections + file_diffs = self._split_files(diff_text) + + for file_diff in file_diffs: + fc = self._parse_file_diff(file_diff) + summary.files.append(fc) + summary.total_added += fc.added_lines + summary.total_deleted += fc.deleted_lines + summary.total_hunks += len(fc.hunks) + if fc.is_new: + summary.new_files += 1 + if fc.is_deleted: + summary.deleted_files += 1 + if fc.is_renamed: + summary.renamed_files += 1 + if fc.is_binary: + summary.binary_files += 1 + + summary.total_files_changed = len(summary.files) + return summary + + def _split_files(self, diff_text: str) -> List[str]: + """Split a multi-file diff into individual file diffs.""" + lines = diff_text.split("\n") + chunks = [] + current = [] + for line in lines: + if line.startswith("diff --git ") and current: + chunks.append("\n".join(current)) + current = [line] + else: + current.append(line) + if current: + chunks.append("\n".join(current)) + return chunks + + def _parse_file_diff(self, diff_text: str) -> FileChange: + """Parse a single file's diff section.""" + lines = diff_text.split("\n") + fc = FileChange(path="") + + # Extract file paths + for line in lines: + m = self.DIFF_FILE_RE.match(line) + if m: + fc.path = m.group(2) + break + + # Check for special states + for line in lines: + if self.NEW_FILE_RE.match(line): + fc.is_new = True + elif self.DELETED_FILE_RE.match(line): + fc.is_deleted = True + elif self.RENAME_RE.match(line): + fc.old_path = m.group(1) if (m := self.RENAME_RE.match(line)) else None + fc.is_renamed = True + elif self.BINARY_RE.match(line): + fc.is_binary = True + return fc # No hunks for binary + + # Rename TO + for line in lines: + m = self.RENAME_TO_RE.match(line) + if m and fc.is_renamed: + fc.path = m.group(1) + + # Parse hunks + current_hunk = None + for line in lines: + m = self.HUNK_HEADER_RE.match(line) + if m: + if current_hunk: + self._classify_hunk(current_hunk, fc) + fc.hunks.append(current_hunk) + current_hunk = Hunk( + header=m.group(5).strip(), + old_start=int(m.group(1)), + old_count=int(m.group(2) or 1), + new_start=int(m.group(3)), + new_count=int(m.group(4) or 1), + ) + elif current_hunk and (line.startswith("+") or line.startswith("-") or line.startswith(" ")): + current_hunk.lines.append(line) + + if current_hunk: + self._classify_hunk(current_hunk, fc) + fc.hunks.append(current_hunk) + + return fc + + def _classify_hunk(self, hunk: Hunk, fc: FileChange): + """Classify a hunk and count lines.""" + added = sum(1 for l in hunk.lines if l.startswith("+")) + deleted = sum(1 for l in hunk.lines if l.startswith("-")) + + fc.added_lines += added + fc.deleted_lines += deleted + + if added > 0 and deleted == 0: + hunk.category = ChangeCategory.ADDED + elif deleted > 0 and added == 0: + hunk.category = ChangeCategory.DELETED + elif added > 0 and deleted > 0: + hunk.category = ChangeCategory.MODIFIED + else: + hunk.category = ChangeCategory.CONTEXT