Compare commits

..

2 Commits

3 changed files with 341 additions and 399 deletions

View File

@@ -1,25 +1,17 @@
#!/usr/bin/env python3
"""
Diff Analyzer — Pipeline 6.1
Diff Analyzer — Parse unified diffs and categorize every change.
Reads PR diffs and categorizes changes: new code, deleted code, modified code, moved code.
Produces a change summary with line counts per category.
Usage:
from diff_analyzer import DiffAnalyzer
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff_text)
Pipeline 6.1 for Compounding Intelligence.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from dataclasses import dataclass, field, asdict
from enum import Enum
from typing import List, Optional
from typing import List, Dict, Any, Optional
class ChangeCategory(Enum):
"""Categories of code changes in a diff hunk."""
ADDED = "added"
DELETED = "deleted"
MODIFIED = "modified"
@@ -29,7 +21,7 @@ class ChangeCategory(Enum):
@dataclass
class Hunk:
"""A single diff hunk with metadata."""
"""A single diff hunk with header, line ranges, and category."""
header: str
old_start: int
old_count: int
@@ -37,203 +29,188 @@ class Hunk:
new_count: int
lines: List[str] = field(default_factory=list)
category: ChangeCategory = ChangeCategory.CONTEXT
old_lines: int = 0
new_lines: int = 0
def to_dict(self) -> Dict[str, Any]:
d = asdict(self)
d["category"] = self.category.value
return d
@dataclass
class FileChange:
"""Changes within a single file."""
"""A single file's changes."""
path: str
old_path: Optional[str] = None # For renames
is_new: bool = False
is_deleted: bool = False
is_renamed: bool = False
hunks: List[Hunk] = field(default_factory=list)
added_lines: int = 0
deleted_lines: int = 0
context_lines: int = 0
is_new: bool = False
is_deleted: bool = False
is_renamed: bool = False
is_binary: bool = False
def to_dict(self) -> Dict[str, Any]:
return {
"path": self.path,
"old_path": self.old_path,
"hunks": [h.to_dict() for h in self.hunks],
"added_lines": self.added_lines,
"deleted_lines": self.deleted_lines,
"is_new": self.is_new,
"is_deleted": self.is_deleted,
"is_renamed": self.is_renamed,
"is_binary": self.is_binary,
}
@dataclass
class ChangeSummary:
"""Summary of all changes in a diff."""
files_changed: int = 0
files_added: int = 0
files_deleted: int = 0
files_renamed: int = 0
files_modified: int = 0
"""Aggregate stats + per-file breakdown."""
files: List[FileChange] = field(default_factory=list)
total_added: int = 0
total_deleted: int = 0
total_context: int = 0
hunks_added: int = 0
hunks_deleted: int = 0
hunks_modified: int = 0
hunks_moved: int = 0
file_changes: List[FileChange] = field(default_factory=list)
total_files_changed: int = 0
total_hunks: int = 0
new_files: int = 0
deleted_files: int = 0
renamed_files: int = 0
binary_files: int = 0
def to_dict(self) -> dict:
"""Serialize to dict for JSON output."""
def to_dict(self) -> Dict[str, Any]:
return {
"files_changed": self.files_changed,
"files_added": self.files_added,
"files_deleted": self.files_deleted,
"files_renamed": self.files_renamed,
"files_modified": self.files_modified,
"total_files_changed": self.total_files_changed,
"total_added": self.total_added,
"total_deleted": self.total_deleted,
"total_context": self.total_context,
"hunks_added": self.hunks_added,
"hunks_deleted": self.hunks_deleted,
"hunks_modified": self.hunks_modified,
"hunks_moved": self.hunks_moved,
"files": [
{
"path": fc.path,
"old_path": fc.old_path,
"is_new": fc.is_new,
"is_deleted": fc.is_deleted,
"is_renamed": fc.is_renamed,
"added": fc.added_lines,
"deleted": fc.deleted_lines,
"context": fc.context_lines,
}
for fc in self.file_changes
],
"total_hunks": self.total_hunks,
"new_files": self.new_files,
"deleted_files": self.deleted_files,
"renamed_files": self.renamed_files,
"binary_files": self.binary_files,
"files": [f.to_dict() for f in self.files],
}
# Regex for unified diff headers
_HUNK_RE = re.compile(
r"^@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@(.*)$"
)
_FILE_HEADER_RE = re.compile(r"^diff --git a/(.*) b/(.*)$")
_RENAME_RE = re.compile(r"^rename from (.+)$|^rename to (.+)$")
class DiffAnalyzer:
"""Parses unified diffs and categorizes changes."""
"""Parses unified diff format and produces structured ChangeSummary."""
HUNK_HEADER_RE = re.compile(r"^@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@(.*)$")
DIFF_FILE_RE = re.compile(r"^diff --git a/(.*) b/(.*)")
RENAME_RE = re.compile(r"^rename from (.+)$")
RENAME_TO_RE = re.compile(r"^rename to (.+)$")
NEW_FILE_RE = re.compile(r"^new file mode")
DELETED_FILE_RE = re.compile(r"^deleted file mode")
BINARY_RE = re.compile(r"^Binary files .* differ")
def analyze(self, diff_text: str) -> ChangeSummary:
"""Analyze a unified diff string and return a ChangeSummary."""
"""Parse a unified diff and return a ChangeSummary."""
summary = ChangeSummary()
lines = diff_text.splitlines(keepends=False)
if not diff_text or not diff_text.strip():
return summary
current_file: Optional[FileChange] = None
current_hunk: Optional[Hunk] = None
old_path: Optional[str] = None
new_path: Optional[str] = None
# Split diff into per-file sections
file_diffs = self._split_files(diff_text)
for line in lines:
# File header
m = _FILE_HEADER_RE.match(line)
if m:
# Save previous file
if current_file:
self._classify_file(current_file)
summary.file_changes.append(current_file)
old_path = m.group(1)
new_path = m.group(2)
current_file = FileChange(path=new_path, old_path=old_path)
current_hunk = None
continue
if current_file is None:
continue
# Detect new/deleted file markers
if line.startswith("new file mode"):
current_file.is_new = True
continue
if line.startswith("deleted file mode"):
current_file.is_deleted = True
continue
# Detect renames
rm = _RENAME_RE.match(line)
if rm:
current_file.is_renamed = True
continue
# Hunk header
hm = _HUNK_RE.match(line)
if hm:
if current_hunk:
self._classify_hunk(current_hunk)
current_file.hunks.append(current_hunk)
current_hunk = Hunk(
header=line,
old_start=int(hm.group(1)),
old_count=int(hm.group(2) or 1),
new_start=int(hm.group(3)),
new_count=int(hm.group(4) or 1),
)
continue
if current_hunk is None:
continue
# Hunk content
current_hunk.lines.append(line)
if line.startswith("+"):
current_hunk.new_lines += 1
current_file.added_lines += 1
elif line.startswith("-"):
current_hunk.old_lines += 1
current_file.deleted_lines += 1
elif line.startswith(" "):
current_file.context_lines += 1
# Finalize last hunk and file
if current_hunk:
self._classify_hunk(current_hunk)
if current_file:
current_file.hunks.append(current_hunk)
if current_file:
self._classify_file(current_file)
summary.file_changes.append(current_file)
# Aggregate
summary.files_changed = len(summary.file_changes)
for fc in summary.file_changes:
for file_diff in file_diffs:
fc = self._parse_file_diff(file_diff)
summary.files.append(fc)
summary.total_added += fc.added_lines
summary.total_deleted += fc.deleted_lines
summary.total_context += fc.context_lines
summary.total_hunks += len(fc.hunks)
if fc.is_new:
summary.files_added += 1
elif fc.is_deleted:
summary.files_deleted += 1
elif fc.is_renamed:
summary.files_renamed += 1
else:
summary.files_modified += 1
for h in fc.hunks:
if h.category == ChangeCategory.ADDED:
summary.hunks_added += 1
elif h.category == ChangeCategory.DELETED:
summary.hunks_deleted += 1
elif h.category == ChangeCategory.MODIFIED:
summary.hunks_modified += 1
elif h.category == ChangeCategory.MOVED:
summary.hunks_moved += 1
summary.new_files += 1
if fc.is_deleted:
summary.deleted_files += 1
if fc.is_renamed:
summary.renamed_files += 1
if fc.is_binary:
summary.binary_files += 1
summary.total_files_changed = len(summary.files)
return summary
def _classify_hunk(self, hunk: Hunk) -> None:
"""Classify a hunk based on its add/delete ratio."""
if hunk.new_lines > 0 and hunk.old_lines == 0:
def _split_files(self, diff_text: str) -> List[str]:
"""Split a multi-file diff into individual file diffs."""
lines = diff_text.split("\n")
chunks = []
current = []
for line in lines:
if line.startswith("diff --git ") and current:
chunks.append("\n".join(current))
current = [line]
else:
current.append(line)
if current:
chunks.append("\n".join(current))
return chunks
def _parse_file_diff(self, diff_text: str) -> FileChange:
"""Parse a single file's diff section."""
lines = diff_text.split("\n")
fc = FileChange(path="")
# Extract file paths
for line in lines:
m = self.DIFF_FILE_RE.match(line)
if m:
fc.path = m.group(2)
break
# Check for special states
for line in lines:
if self.NEW_FILE_RE.match(line):
fc.is_new = True
elif self.DELETED_FILE_RE.match(line):
fc.is_deleted = True
elif self.RENAME_RE.match(line):
fc.old_path = m.group(1) if (m := self.RENAME_RE.match(line)) else None
fc.is_renamed = True
elif self.BINARY_RE.match(line):
fc.is_binary = True
return fc # No hunks for binary
# Rename TO
for line in lines:
m = self.RENAME_TO_RE.match(line)
if m and fc.is_renamed:
fc.path = m.group(1)
# Parse hunks
current_hunk = None
for line in lines:
m = self.HUNK_HEADER_RE.match(line)
if m:
if current_hunk:
self._classify_hunk(current_hunk, fc)
fc.hunks.append(current_hunk)
current_hunk = Hunk(
header=m.group(5).strip(),
old_start=int(m.group(1)),
old_count=int(m.group(2) or 1),
new_start=int(m.group(3)),
new_count=int(m.group(4) or 1),
)
elif current_hunk and (line.startswith("+") or line.startswith("-") or line.startswith(" ")):
current_hunk.lines.append(line)
if current_hunk:
self._classify_hunk(current_hunk, fc)
fc.hunks.append(current_hunk)
return fc
def _classify_hunk(self, hunk: Hunk, fc: FileChange):
"""Classify a hunk and count lines."""
added = sum(1 for l in hunk.lines if l.startswith("+"))
deleted = sum(1 for l in hunk.lines if l.startswith("-"))
fc.added_lines += added
fc.deleted_lines += deleted
if added > 0 and deleted == 0:
hunk.category = ChangeCategory.ADDED
elif hunk.old_lines > 0 and hunk.new_lines == 0:
elif deleted > 0 and added == 0:
hunk.category = ChangeCategory.DELETED
elif hunk.new_lines > 0 and hunk.old_lines > 0:
elif added > 0 and deleted > 0:
hunk.category = ChangeCategory.MODIFIED
else:
hunk.category = ChangeCategory.CONTEXT
def _classify_file(self, fc: FileChange) -> None:
"""Final file classification (renames already detected via headers)."""
pass

View File

@@ -0,0 +1,189 @@
#!/usr/bin/env python3
"""Tests for scripts/diff_analyzer.py — 10 tests."""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__) or ".")
import importlib.util
spec = importlib.util.spec_from_file_location("da", os.path.join(os.path.dirname(__file__) or ".", "diff_analyzer.py"))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
DiffAnalyzer = mod.DiffAnalyzer
ChangeCategory = mod.ChangeCategory
SAMPLE_ADD = """diff --git a/new.py b/new.py
new file mode 100644
--- /dev/null
+++ b/new.py
@@ -0,0 +1,3 @@
+def hello():
+ print("world")
+ return True
"""
SAMPLE_DELETE = """diff --git a/old.py b/old.py
deleted file mode 100644
--- a/old.py
+++ /dev/null
@@ -1,2 +0,0 @@
-def goodbye():
- pass
"""
SAMPLE_MODIFY = """diff --git a/app.py b/app.py
--- a/app.py
+++ b/app.py
@@ -1,3 +1,4 @@
def main():
- print("old")
+ print("new")
+ print("extra")
return 0
"""
SAMPLE_RENAME = """diff --git a/old_name.py b/new_name.py
rename from old_name.py
rename to new_name.py
--- a/old_name.py
+++ b/new_name.py
@@ -1,1 +1,1 @@
-old content
+new content
"""
SAMPLE_MULTI = """diff --git a/a.py b/a.py
--- a/a.py
+++ b/a.py
@@ -1,1 +1,2 @@
existing
+added line
diff --git b/b.py b/b.py
new file mode 100644
--- /dev/null
+++ b/b.py
@@ -0,0 +1,1 @@
+new file
"""
SAMPLE_BINARY = """diff --git a/img.png b/img.png
Binary files a/img.png and b/img.png differ
"""
def test_empty():
a = DiffAnalyzer()
s = a.analyze("")
assert s.total_files_changed == 0
print("PASS: test_empty")
def test_addition():
a = DiffAnalyzer()
s = a.analyze(SAMPLE_ADD)
assert s.total_files_changed == 1
assert s.total_added == 3
assert s.total_deleted == 0
assert s.new_files == 1
assert s.files[0].hunks[0].category == ChangeCategory.ADDED
print("PASS: test_addition")
def test_deletion():
a = DiffAnalyzer()
s = a.analyze(SAMPLE_DELETE)
assert s.total_deleted == 2
assert s.deleted_files == 1
assert s.files[0].hunks[0].category == ChangeCategory.DELETED
print("PASS: test_deletion")
def test_modification():
a = DiffAnalyzer()
s = a.analyze(SAMPLE_MODIFY)
assert s.total_added == 2
assert s.total_deleted == 1
assert s.files[0].hunks[0].category == ChangeCategory.MODIFIED
print("PASS: test_modification")
def test_rename():
a = DiffAnalyzer()
s = a.analyze(SAMPLE_RENAME)
assert s.renamed_files == 1
assert s.files[0].old_path == "old_name.py"
assert s.files[0].path == "new_name.py"
assert s.files[0].is_renamed == True
print("PASS: test_rename")
def test_multiple_files():
a = DiffAnalyzer()
s = a.analyze(SAMPLE_MULTI)
assert s.total_files_changed == 2
assert s.new_files == 1
print("PASS: test_multiple_files")
def test_binary():
a = DiffAnalyzer()
s = a.analyze(SAMPLE_BINARY)
assert s.binary_files == 1
assert s.files[0].is_binary == True
assert len(s.files[0].hunks) == 0
print("PASS: test_binary")
def test_to_dict():
a = DiffAnalyzer()
s = a.analyze(SAMPLE_MODIFY)
d = s.to_dict()
assert "total_files_changed" in d
assert "files" in d
assert isinstance(d["files"], list)
print("PASS: test_to_dict")
def test_context_only():
diff = """diff --git a/f.py b/f.py
--- a/f.py
+++ b/f.py
@@ -1,3 +1,3 @@
line1
-old
+new
line3
"""
a = DiffAnalyzer()
s = a.analyze(diff)
# Has both added and deleted = MODIFIED
assert s.files[0].hunks[0].category == ChangeCategory.MODIFIED
print("PASS: test_context_only")
def test_multi_hunk():
diff = """diff --git a/f.py b/f.py
--- a/f.py
+++ b/f.py
@@ -1,1 +1,2 @@
existing
+first addition
@@ -10,1 +11,2 @@
more
+second addition
"""
a = DiffAnalyzer()
s = a.analyze(diff)
assert s.total_hunks == 2
assert s.total_added == 2
print("PASS: test_multi_hunk")
def run_all():
test_empty()
test_addition()
test_deletion()
test_modification()
test_rename()
test_multiple_files()
test_binary()
test_to_dict()
test_context_only()
test_multi_hunk()
print("\nAll 10 tests passed!")
if __name__ == "__main__":
run_all()

View File

@@ -1,224 +0,0 @@
"""Tests for diff_analyzer module."""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'scripts'))
from diff_analyzer import DiffAnalyzer, ChangeCategory
def test_parse_simple_addition():
diff = """diff --git a/foo.py b/foo.py
new file mode 100644
--- /dev/null
+++ b/foo.py
@@ -0,0 +1,3 @@
+def hello():
+ return "world"
+# end
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 1
assert summary.files_added == 1
assert summary.files_modified == 0
assert summary.total_added == 3
assert summary.total_deleted == 0
assert summary.hunks_added == 1
assert len(summary.file_changes) == 1
assert summary.file_changes[0].is_new is True
assert summary.file_changes[0].path == "foo.py"
def test_parse_simple_deletion():
diff = """diff --git a/old.py b/old.py
deleted file mode 100644
--- a/old.py
+++ /dev/null
@@ -1,2 +0,0 @@
-x = 1
-y = 2
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 1
assert summary.files_deleted == 1
assert summary.total_deleted == 2
assert summary.total_added == 0
assert summary.hunks_deleted == 1
assert summary.file_changes[0].is_deleted is True
def test_parse_modification():
diff = """diff --git a/bar.py b/bar.py
--- a/bar.py
+++ b/bar.py
@@ -10,3 +10,4 @@ def foo():
existing()
- old_call()
+ new_call()
+ extra_step()
return
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 1
assert summary.files_modified == 1
assert summary.total_added == 2 # +new_call(), +extra_step()
assert summary.total_deleted == 1 # -old_call()
assert summary.total_context == 2 # 2 context lines
assert summary.hunks_modified == 1
def test_parse_multiple_files():
diff = """diff --git a/a.py b/a.py
--- a/a.py
+++ b/a.py
@@ -1,1 +1,2 @@
existing
+added
diff --git a/b.py b/b.py
new file mode 100644
--- /dev/null
+++ b/b.py
@@ -0,0 +1,1 @@
+new file
diff --git a/c.py b/c.py
deleted file mode 100644
--- a/c.py
+++ /dev/null
@@ -1,1 +0,0 @@
-gone
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 3
assert summary.files_added == 1
assert summary.files_deleted == 1
assert summary.files_modified == 1
assert summary.total_added == 2
assert summary.total_deleted == 1
def test_parse_rename():
diff = """diff --git a/old_name.py b/new_name.py
rename from old_name.py
rename to new_name.py
--- a/old_name.py
+++ b/new_name.py
@@ -1,1 +1,1 @@
-old_func()
+new_func()
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 1
assert summary.files_renamed == 1
assert summary.file_changes[0].is_renamed is True
assert summary.file_changes[0].old_path == "old_name.py"
assert summary.file_changes[0].path == "new_name.py"
def test_parse_mixed_hunks():
"""A file with one add hunk and one delete hunk."""
diff = """diff --git a/mixed.py b/mixed.py
--- a/mixed.py
+++ b/mixed.py
@@ -5,0 +6,2 @@
+new_line_1
+new_line_2
@@ -20,2 +22,0 @@
-removed_1
-removed_2
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 1
assert summary.hunks_added == 1
assert summary.hunks_deleted == 1
assert summary.total_added == 2
assert summary.total_deleted == 2
def test_empty_diff():
analyzer = DiffAnalyzer()
summary = analyzer.analyze("")
assert summary.files_changed == 0
assert summary.total_added == 0
assert summary.total_deleted == 0
def test_to_dict():
diff = """diff --git a/test.py b/test.py
new file mode 100644
--- /dev/null
+++ b/test.py
@@ -0,0 +1,2 @@
+line1
+line2
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
d = summary.to_dict()
assert d["files_changed"] == 1
assert d["files_added"] == 1
assert d["total_added"] == 2
assert d["total_deleted"] == 0
assert len(d["files"]) == 1
assert d["files"][0]["path"] == "test.py"
assert d["files"][0]["is_new"] is True
def test_context_only_hunk():
"""A hunk with only context lines (rare but possible)."""
diff = """diff --git a/noop.py b/noop.py
--- a/noop.py
+++ b/noop.py
@@ -5,3 +5,3 @@
context1
context2
context3
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.total_context == 3
assert summary.total_added == 0
assert summary.total_deleted == 0
def test_binary_files_skipped():
"""Binary file diffs have no content lines — just headers."""
diff = """diff --git a/image.png b/image.png
--- a/image.png
+++ b/image.png
Binary files a/image.png and b/image.png differ
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 1
assert summary.total_added == 0
assert summary.total_deleted == 0
if __name__ == "__main__":
test_parse_simple_addition()
test_parse_simple_deletion()
test_parse_modification()
test_parse_multiple_files()
test_parse_rename()
test_parse_mixed_hunks()
test_empty_diff()
test_to_dict()
test_context_only_hunk()
test_binary_files_skipped()
print("All 10 tests passed.")