""" Diff Analyzer — Pipeline 6.1 Reads PR diffs and categorizes changes: new code, deleted code, modified code, moved code. Produces a change summary with line counts per category. Usage: from diff_analyzer import DiffAnalyzer analyzer = DiffAnalyzer() summary = analyzer.analyze(diff_text) """ from __future__ import annotations import re from dataclasses import dataclass, field from enum import Enum from typing import List, Optional class ChangeCategory(Enum): """Categories of code changes in a diff hunk.""" ADDED = "added" DELETED = "deleted" MODIFIED = "modified" MOVED = "moved" CONTEXT = "context" @dataclass class Hunk: """A single diff hunk with metadata.""" header: str old_start: int old_count: int new_start: int new_count: int lines: List[str] = field(default_factory=list) category: ChangeCategory = ChangeCategory.CONTEXT old_lines: int = 0 new_lines: int = 0 @dataclass class FileChange: """Changes within a single file.""" path: str old_path: Optional[str] = None # For renames is_new: bool = False is_deleted: bool = False is_renamed: bool = False hunks: List[Hunk] = field(default_factory=list) added_lines: int = 0 deleted_lines: int = 0 context_lines: int = 0 @dataclass class ChangeSummary: """Summary of all changes in a diff.""" files_changed: int = 0 files_added: int = 0 files_deleted: int = 0 files_renamed: int = 0 files_modified: int = 0 total_added: int = 0 total_deleted: int = 0 total_context: int = 0 hunks_added: int = 0 hunks_deleted: int = 0 hunks_modified: int = 0 hunks_moved: int = 0 file_changes: List[FileChange] = field(default_factory=list) def to_dict(self) -> dict: """Serialize to dict for JSON output.""" return { "files_changed": self.files_changed, "files_added": self.files_added, "files_deleted": self.files_deleted, "files_renamed": self.files_renamed, "files_modified": self.files_modified, "total_added": self.total_added, "total_deleted": self.total_deleted, "total_context": self.total_context, "hunks_added": self.hunks_added, "hunks_deleted": self.hunks_deleted, "hunks_modified": self.hunks_modified, "hunks_moved": self.hunks_moved, "files": [ { "path": fc.path, "old_path": fc.old_path, "is_new": fc.is_new, "is_deleted": fc.is_deleted, "is_renamed": fc.is_renamed, "added": fc.added_lines, "deleted": fc.deleted_lines, "context": fc.context_lines, } for fc in self.file_changes ], } # Regex for unified diff headers _HUNK_RE = re.compile( r"^@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@(.*)$" ) _FILE_HEADER_RE = re.compile(r"^diff --git a/(.*) b/(.*)$") _RENAME_RE = re.compile(r"^rename from (.+)$|^rename to (.+)$") class DiffAnalyzer: """Parses unified diffs and categorizes changes.""" def analyze(self, diff_text: str) -> ChangeSummary: """Analyze a unified diff string and return a ChangeSummary.""" summary = ChangeSummary() lines = diff_text.splitlines(keepends=False) current_file: Optional[FileChange] = None current_hunk: Optional[Hunk] = None old_path: Optional[str] = None new_path: Optional[str] = None for line in lines: # File header m = _FILE_HEADER_RE.match(line) if m: # Save previous file if current_file: self._classify_file(current_file) summary.file_changes.append(current_file) old_path = m.group(1) new_path = m.group(2) current_file = FileChange(path=new_path, old_path=old_path) current_hunk = None continue if current_file is None: continue # Detect new/deleted file markers if line.startswith("new file mode"): current_file.is_new = True continue if line.startswith("deleted file mode"): current_file.is_deleted = True continue # Detect renames rm = _RENAME_RE.match(line) if rm: current_file.is_renamed = True continue # Hunk header hm = _HUNK_RE.match(line) if hm: if current_hunk: self._classify_hunk(current_hunk) current_file.hunks.append(current_hunk) current_hunk = Hunk( header=line, old_start=int(hm.group(1)), old_count=int(hm.group(2) or 1), new_start=int(hm.group(3)), new_count=int(hm.group(4) or 1), ) continue if current_hunk is None: continue # Hunk content current_hunk.lines.append(line) if line.startswith("+"): current_hunk.new_lines += 1 current_file.added_lines += 1 elif line.startswith("-"): current_hunk.old_lines += 1 current_file.deleted_lines += 1 elif line.startswith(" "): current_file.context_lines += 1 # Finalize last hunk and file if current_hunk: self._classify_hunk(current_hunk) if current_file: current_file.hunks.append(current_hunk) if current_file: self._classify_file(current_file) summary.file_changes.append(current_file) # Aggregate summary.files_changed = len(summary.file_changes) for fc in summary.file_changes: summary.total_added += fc.added_lines summary.total_deleted += fc.deleted_lines summary.total_context += fc.context_lines if fc.is_new: summary.files_added += 1 elif fc.is_deleted: summary.files_deleted += 1 elif fc.is_renamed: summary.files_renamed += 1 else: summary.files_modified += 1 for h in fc.hunks: if h.category == ChangeCategory.ADDED: summary.hunks_added += 1 elif h.category == ChangeCategory.DELETED: summary.hunks_deleted += 1 elif h.category == ChangeCategory.MODIFIED: summary.hunks_modified += 1 elif h.category == ChangeCategory.MOVED: summary.hunks_moved += 1 return summary def _classify_hunk(self, hunk: Hunk) -> None: """Classify a hunk based on its add/delete ratio.""" if hunk.new_lines > 0 and hunk.old_lines == 0: hunk.category = ChangeCategory.ADDED elif hunk.old_lines > 0 and hunk.new_lines == 0: hunk.category = ChangeCategory.DELETED elif hunk.new_lines > 0 and hunk.old_lines > 0: hunk.category = ChangeCategory.MODIFIED else: hunk.category = ChangeCategory.CONTEXT def _classify_file(self, fc: FileChange) -> None: """Final file classification (renames already detected via headers).""" pass