diff --git a/scripts/diff_analyzer.py b/scripts/diff_analyzer.py new file mode 100644 index 0000000..fd90e8f --- /dev/null +++ b/scripts/diff_analyzer.py @@ -0,0 +1,239 @@ +""" +Diff Analyzer — Pipeline 6.1 + +Reads PR diffs and categorizes changes: new code, deleted code, modified code, moved code. +Produces a change summary with line counts per category. + +Usage: + from diff_analyzer import DiffAnalyzer + analyzer = DiffAnalyzer() + summary = analyzer.analyze(diff_text) +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from enum import Enum +from typing import List, Optional + + +class ChangeCategory(Enum): + """Categories of code changes in a diff hunk.""" + ADDED = "added" + DELETED = "deleted" + MODIFIED = "modified" + MOVED = "moved" + CONTEXT = "context" + + +@dataclass +class Hunk: + """A single diff hunk with metadata.""" + header: str + old_start: int + old_count: int + new_start: int + new_count: int + lines: List[str] = field(default_factory=list) + category: ChangeCategory = ChangeCategory.CONTEXT + old_lines: int = 0 + new_lines: int = 0 + + +@dataclass +class FileChange: + """Changes within a single file.""" + path: str + old_path: Optional[str] = None # For renames + is_new: bool = False + is_deleted: bool = False + is_renamed: bool = False + hunks: List[Hunk] = field(default_factory=list) + added_lines: int = 0 + deleted_lines: int = 0 + context_lines: int = 0 + + +@dataclass +class ChangeSummary: + """Summary of all changes in a diff.""" + files_changed: int = 0 + files_added: int = 0 + files_deleted: int = 0 + files_renamed: int = 0 + files_modified: int = 0 + total_added: int = 0 + total_deleted: int = 0 + total_context: int = 0 + hunks_added: int = 0 + hunks_deleted: int = 0 + hunks_modified: int = 0 + hunks_moved: int = 0 + file_changes: List[FileChange] = field(default_factory=list) + + def to_dict(self) -> dict: + """Serialize to dict for JSON output.""" + return { + "files_changed": self.files_changed, + "files_added": self.files_added, + "files_deleted": self.files_deleted, + "files_renamed": self.files_renamed, + "files_modified": self.files_modified, + "total_added": self.total_added, + "total_deleted": self.total_deleted, + "total_context": self.total_context, + "hunks_added": self.hunks_added, + "hunks_deleted": self.hunks_deleted, + "hunks_modified": self.hunks_modified, + "hunks_moved": self.hunks_moved, + "files": [ + { + "path": fc.path, + "old_path": fc.old_path, + "is_new": fc.is_new, + "is_deleted": fc.is_deleted, + "is_renamed": fc.is_renamed, + "added": fc.added_lines, + "deleted": fc.deleted_lines, + "context": fc.context_lines, + } + for fc in self.file_changes + ], + } + + +# Regex for unified diff headers +_HUNK_RE = re.compile( + r"^@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@(.*)$" +) +_FILE_HEADER_RE = re.compile(r"^diff --git a/(.*) b/(.*)$") +_RENAME_RE = re.compile(r"^rename from (.+)$|^rename to (.+)$") + + +class DiffAnalyzer: + """Parses unified diffs and categorizes changes.""" + + def analyze(self, diff_text: str) -> ChangeSummary: + """Analyze a unified diff string and return a ChangeSummary.""" + summary = ChangeSummary() + lines = diff_text.splitlines(keepends=False) + + current_file: Optional[FileChange] = None + current_hunk: Optional[Hunk] = None + old_path: Optional[str] = None + new_path: Optional[str] = None + + for line in lines: + # File header + m = _FILE_HEADER_RE.match(line) + if m: + # Save previous file + if current_file: + self._classify_file(current_file) + summary.file_changes.append(current_file) + + old_path = m.group(1) + new_path = m.group(2) + current_file = FileChange(path=new_path, old_path=old_path) + current_hunk = None + continue + + if current_file is None: + continue + + # Detect new/deleted file markers + if line.startswith("new file mode"): + current_file.is_new = True + continue + if line.startswith("deleted file mode"): + current_file.is_deleted = True + continue + + # Detect renames + rm = _RENAME_RE.match(line) + if rm: + current_file.is_renamed = True + continue + + # Hunk header + hm = _HUNK_RE.match(line) + if hm: + if current_hunk: + self._classify_hunk(current_hunk) + current_file.hunks.append(current_hunk) + + current_hunk = Hunk( + header=line, + old_start=int(hm.group(1)), + old_count=int(hm.group(2) or 1), + new_start=int(hm.group(3)), + new_count=int(hm.group(4) or 1), + ) + continue + + if current_hunk is None: + continue + + # Hunk content + current_hunk.lines.append(line) + + if line.startswith("+"): + current_hunk.new_lines += 1 + current_file.added_lines += 1 + elif line.startswith("-"): + current_hunk.old_lines += 1 + current_file.deleted_lines += 1 + elif line.startswith(" "): + current_file.context_lines += 1 + + # Finalize last hunk and file + if current_hunk: + self._classify_hunk(current_hunk) + if current_file: + current_file.hunks.append(current_hunk) + if current_file: + self._classify_file(current_file) + summary.file_changes.append(current_file) + + # Aggregate + summary.files_changed = len(summary.file_changes) + for fc in summary.file_changes: + summary.total_added += fc.added_lines + summary.total_deleted += fc.deleted_lines + summary.total_context += fc.context_lines + if fc.is_new: + summary.files_added += 1 + elif fc.is_deleted: + summary.files_deleted += 1 + elif fc.is_renamed: + summary.files_renamed += 1 + else: + summary.files_modified += 1 + + for h in fc.hunks: + if h.category == ChangeCategory.ADDED: + summary.hunks_added += 1 + elif h.category == ChangeCategory.DELETED: + summary.hunks_deleted += 1 + elif h.category == ChangeCategory.MODIFIED: + summary.hunks_modified += 1 + elif h.category == ChangeCategory.MOVED: + summary.hunks_moved += 1 + + return summary + + def _classify_hunk(self, hunk: Hunk) -> None: + """Classify a hunk based on its add/delete ratio.""" + if hunk.new_lines > 0 and hunk.old_lines == 0: + hunk.category = ChangeCategory.ADDED + elif hunk.old_lines > 0 and hunk.new_lines == 0: + hunk.category = ChangeCategory.DELETED + elif hunk.new_lines > 0 and hunk.old_lines > 0: + hunk.category = ChangeCategory.MODIFIED + else: + hunk.category = ChangeCategory.CONTEXT + + def _classify_file(self, fc: FileChange) -> None: + """Final file classification (renames already detected via headers).""" + pass diff --git a/tests/test_diff_analyzer.py b/tests/test_diff_analyzer.py new file mode 100644 index 0000000..91b3dc5 --- /dev/null +++ b/tests/test_diff_analyzer.py @@ -0,0 +1,224 @@ +"""Tests for diff_analyzer module.""" + +import sys +import os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'scripts')) + +from diff_analyzer import DiffAnalyzer, ChangeCategory + + +def test_parse_simple_addition(): + diff = """diff --git a/foo.py b/foo.py +new file mode 100644 +--- /dev/null ++++ b/foo.py +@@ -0,0 +1,3 @@ ++def hello(): ++ return "world" ++# end +""" + analyzer = DiffAnalyzer() + summary = analyzer.analyze(diff) + + assert summary.files_changed == 1 + assert summary.files_added == 1 + assert summary.files_modified == 0 + assert summary.total_added == 3 + assert summary.total_deleted == 0 + assert summary.hunks_added == 1 + assert len(summary.file_changes) == 1 + assert summary.file_changes[0].is_new is True + assert summary.file_changes[0].path == "foo.py" + + +def test_parse_simple_deletion(): + diff = """diff --git a/old.py b/old.py +deleted file mode 100644 +--- a/old.py ++++ /dev/null +@@ -1,2 +0,0 @@ +-x = 1 +-y = 2 +""" + analyzer = DiffAnalyzer() + summary = analyzer.analyze(diff) + + assert summary.files_changed == 1 + assert summary.files_deleted == 1 + assert summary.total_deleted == 2 + assert summary.total_added == 0 + assert summary.hunks_deleted == 1 + assert summary.file_changes[0].is_deleted is True + + +def test_parse_modification(): + diff = """diff --git a/bar.py b/bar.py +--- a/bar.py ++++ b/bar.py +@@ -10,3 +10,4 @@ def foo(): + existing() +- old_call() ++ new_call() ++ extra_step() + return +""" + analyzer = DiffAnalyzer() + summary = analyzer.analyze(diff) + + assert summary.files_changed == 1 + assert summary.files_modified == 1 + assert summary.total_added == 2 # +new_call(), +extra_step() + assert summary.total_deleted == 1 # -old_call() + assert summary.total_context == 2 # 2 context lines + assert summary.hunks_modified == 1 + + +def test_parse_multiple_files(): + diff = """diff --git a/a.py b/a.py +--- a/a.py ++++ b/a.py +@@ -1,1 +1,2 @@ + existing ++added +diff --git a/b.py b/b.py +new file mode 100644 +--- /dev/null ++++ b/b.py +@@ -0,0 +1,1 @@ ++new file +diff --git a/c.py b/c.py +deleted file mode 100644 +--- a/c.py ++++ /dev/null +@@ -1,1 +0,0 @@ +-gone +""" + analyzer = DiffAnalyzer() + summary = analyzer.analyze(diff) + + assert summary.files_changed == 3 + assert summary.files_added == 1 + assert summary.files_deleted == 1 + assert summary.files_modified == 1 + assert summary.total_added == 2 + assert summary.total_deleted == 1 + + +def test_parse_rename(): + diff = """diff --git a/old_name.py b/new_name.py +rename from old_name.py +rename to new_name.py +--- a/old_name.py ++++ b/new_name.py +@@ -1,1 +1,1 @@ +-old_func() ++new_func() +""" + analyzer = DiffAnalyzer() + summary = analyzer.analyze(diff) + + assert summary.files_changed == 1 + assert summary.files_renamed == 1 + assert summary.file_changes[0].is_renamed is True + assert summary.file_changes[0].old_path == "old_name.py" + assert summary.file_changes[0].path == "new_name.py" + + +def test_parse_mixed_hunks(): + """A file with one add hunk and one delete hunk.""" + diff = """diff --git a/mixed.py b/mixed.py +--- a/mixed.py ++++ b/mixed.py +@@ -5,0 +6,2 @@ ++new_line_1 ++new_line_2 +@@ -20,2 +22,0 @@ +-removed_1 +-removed_2 +""" + analyzer = DiffAnalyzer() + summary = analyzer.analyze(diff) + + assert summary.files_changed == 1 + assert summary.hunks_added == 1 + assert summary.hunks_deleted == 1 + assert summary.total_added == 2 + assert summary.total_deleted == 2 + + +def test_empty_diff(): + analyzer = DiffAnalyzer() + summary = analyzer.analyze("") + + assert summary.files_changed == 0 + assert summary.total_added == 0 + assert summary.total_deleted == 0 + + +def test_to_dict(): + diff = """diff --git a/test.py b/test.py +new file mode 100644 +--- /dev/null ++++ b/test.py +@@ -0,0 +1,2 @@ ++line1 ++line2 +""" + analyzer = DiffAnalyzer() + summary = analyzer.analyze(diff) + d = summary.to_dict() + + assert d["files_changed"] == 1 + assert d["files_added"] == 1 + assert d["total_added"] == 2 + assert d["total_deleted"] == 0 + assert len(d["files"]) == 1 + assert d["files"][0]["path"] == "test.py" + assert d["files"][0]["is_new"] is True + + +def test_context_only_hunk(): + """A hunk with only context lines (rare but possible).""" + diff = """diff --git a/noop.py b/noop.py +--- a/noop.py ++++ b/noop.py +@@ -5,3 +5,3 @@ + context1 + context2 + context3 +""" + analyzer = DiffAnalyzer() + summary = analyzer.analyze(diff) + + assert summary.total_context == 3 + assert summary.total_added == 0 + assert summary.total_deleted == 0 + + +def test_binary_files_skipped(): + """Binary file diffs have no content lines — just headers.""" + diff = """diff --git a/image.png b/image.png +--- a/image.png ++++ b/image.png +Binary files a/image.png and b/image.png differ +""" + analyzer = DiffAnalyzer() + summary = analyzer.analyze(diff) + + assert summary.files_changed == 1 + assert summary.total_added == 0 + assert summary.total_deleted == 0 + + +if __name__ == "__main__": + test_parse_simple_addition() + test_parse_simple_deletion() + test_parse_modification() + test_parse_multiple_files() + test_parse_rename() + test_parse_mixed_hunks() + test_empty_diff() + test_to_dict() + test_context_only_hunk() + test_binary_files_skipped() + print("All 10 tests passed.")