Compare commits

..

1 Commits

Author SHA1 Message Date
TERRA
fe4a04145a feat: add diff analyzer for PR change categorization (closes #118) 2026-04-14 23:25:15 -04:00
3 changed files with 463 additions and 282 deletions

View File

@@ -1,282 +0,0 @@
#!/usr/bin/env python3
"""
Dead Code Detector for Python Codebases
AST-based analysis to find defined but never-called functions and classes.
Excludes entry points, plugin hooks, __init__ exports.
Usage:
python3 scripts/dead_code_detector.py /path/to/repo/
python3 scripts/dead_code_detector.py hermes-agent/ --format json
python3 scripts/dead_code_detector.py . --exclude tests/,venv/
Output: file:line, function/class name, last git author (if available)
"""
import argparse
import ast
import json
import os
import subprocess
import sys
from collections import defaultdict
from pathlib import Path
from typing import Optional
# Names that are expected to be unused (entry points, protocol methods, etc.)
SAFE_UNUSED_PATTERNS = {
# Python dunders
"__init__", "__str__", "__repr__", "__eq__", "__hash__", "__len__",
"__getitem__", "__setitem__", "__contains__", "__iter__", "__next__",
"__enter__", "__exit__", "__call__", "__bool__", "__del__",
"__post_init__", "__class_getitem__",
# Common entry points
"main", "app", "handler", "setup", "teardown", "fixture",
# pytest
"conftest", "test_", "pytest_", # prefix patterns
# Protocols / abstract
"abstractmethod", "abc_",
}
def is_safe_unused(name: str, filepath: str) -> bool:
"""Check if an unused name is expected to be unused."""
# Test files are exempt
if "test" in filepath.lower():
return True
# Known patterns
for pattern in SAFE_UNUSED_PATTERNS:
if name.startswith(pattern) or name == pattern:
return True
# __init__.py exports are often unused internally
if filepath.endswith("__init__.py"):
return True
return False
def get_git_blame(filepath: str, lineno: int) -> Optional[str]:
"""Get last author of a line via git blame."""
try:
result = subprocess.run(
["git", "blame", "-L", f"{lineno},{lineno}", "--porcelain", filepath],
capture_output=True, text=True, timeout=5
)
for line in result.stdout.split("\n"):
if line.startswith("author "):
return line[7:]
except:
pass
return None
class DefinitionCollector(ast.NodeVisitor):
"""Collect all function and class definitions."""
def __init__(self):
self.definitions = [] # (name, type, lineno, filepath)
def visit_FunctionDef(self, node):
self.definitions.append((node.name, "function", node.lineno))
self.generic_visit(node)
def visit_AsyncFunctionDef(self, node):
self.definitions.append((node.name, "async_function", node.lineno))
self.generic_visit(node)
def visit_ClassDef(self, node):
self.definitions.append((node.name, "class", node.lineno))
self.generic_visit(node)
class NameUsageCollector(ast.NodeVisitor):
"""Collect all name references (calls, imports, attribute access)."""
def __init__(self):
self.names = set()
self.calls = set()
self.imports = set()
def visit_Name(self, node):
self.names.add(node.id)
self.generic_visit(node)
def visit_Attribute(self, node):
if isinstance(node.value, ast.Name):
self.names.add(node.value.id)
self.generic_visit(node)
def visit_Call(self, node):
if isinstance(node.func, ast.Name):
self.calls.add(node.func.id)
elif isinstance(node.func, ast.Attribute):
if isinstance(node.func.value, ast.Name):
self.names.add(node.func.value.id)
self.calls.add(node.func.attr)
self.generic_visit(node)
def visit_Import(self, node):
for alias in node.names:
self.imports.add(alias.asname or alias.name)
self.generic_visit(node)
def visit_ImportFrom(self, node):
for alias in node.names:
self.imports.add(alias.asname or alias.name)
self.generic_visit(node)
def analyze_file(filepath: str) -> dict:
"""Analyze a single Python file for dead code."""
path = Path(filepath)
try:
content = path.read_text()
tree = ast.parse(content, filename=str(filepath))
except (SyntaxError, UnicodeDecodeError):
return {"error": f"Could not parse {filepath}"}
# Collect definitions
def_collector = DefinitionCollector()
def_collector.visit(tree)
definitions = def_collector.definitions
# Collect usage
usage_collector = NameUsageCollector()
usage_collector.visit(tree)
used_names = usage_collector.names | usage_collector.calls | usage_collector.imports
# Also scan the entire repo for references to this file's definitions
# (this is done at the repo level, not file level)
dead = []
for name, def_type, lineno in definitions:
if name.startswith("_") and not name.startswith("__"):
# Private functions — might be used externally, less likely dead
pass
if name not in used_names:
if not is_safe_unused(name, filepath):
dead.append({
"name": name,
"type": def_type,
"file": filepath,
"line": lineno,
})
return {"definitions": len(definitions), "dead": dead}
def scan_repo(repo_path: str, exclude_patterns: list = None) -> dict:
"""Scan an entire repo for dead code."""
path = Path(repo_path)
exclude = exclude_patterns or ["venv", ".venv", "node_modules", "__pycache__",
".git", "dist", "build", ".tox", "vendor"]
all_definitions = {} # name -> [{file, line, type}]
all_files = []
dead_code = []
# First pass: collect all definitions across repo
for fpath in path.rglob("*.py"):
parts = fpath.parts
if any(ex in parts for ex in exclude):
continue
if fpath.name.startswith("."):
continue
try:
content = fpath.read_text(errors="ignore")
tree = ast.parse(content, filename=str(fpath))
except:
continue
all_files.append(str(fpath))
collector = DefinitionCollector()
collector.visit(tree)
for name, def_type, lineno in collector.definitions:
rel_path = str(fpath.relative_to(path))
if name not in all_definitions:
all_definitions[name] = []
all_definitions[name].append({
"file": rel_path,
"line": lineno,
"type": def_type,
})
# Second pass: check each name for usage across entire repo
all_used_names = set()
for fpath_str in all_files:
try:
content = Path(fpath_str).read_text(errors="ignore")
tree = ast.parse(content)
except:
continue
usage = NameUsageCollector()
usage.visit(tree)
all_used_names.update(usage.names)
all_used_names.update(usage.calls)
all_used_names.update(usage.imports)
# Find dead code
for name, locations in all_definitions.items():
if name not in all_used_names:
for loc in locations:
if not is_safe_unused(name, loc["file"]):
dead_code.append({
"name": name,
"type": loc["type"],
"file": loc["file"],
"line": loc["line"],
})
return {
"repo": path.name,
"files_scanned": len(all_files),
"total_definitions": sum(len(v) for v in all_definitions.values()),
"dead_code_count": len(dead_code),
"dead_code": sorted(dead_code, key=lambda x: (x["file"], x["line"])),
}
def main():
parser = argparse.ArgumentParser(description="Find dead code in Python codebases")
parser.add_argument("repo", help="Repository path to scan")
parser.add_argument("--format", choices=["text", "json"], default="text")
parser.add_argument("--exclude", help="Comma-separated patterns to exclude")
parser.add_argument("--git-blame", action="store_true", help="Include git blame info")
args = parser.parse_args()
exclude = args.exclude.split(",") if args.exclude else None
result = scan_repo(args.repo, exclude)
if args.format == "json":
print(json.dumps(result, indent=2))
else:
print(f"Dead Code Report: {result['repo']}")
print(f"Files scanned: {result['files_scanned']}")
print(f"Total definitions: {result['total_definitions']}")
print(f"Dead code found: {result['dead_code_count']}")
print()
if result["dead_code"]:
print(f"{'File':<45} {'Line':>4} {'Type':<10} {'Name'}")
print("-" * 85)
for item in result["dead_code"]:
author = ""
if args.git_blame:
author = get_git_blame(
os.path.join(args.repo, item["file"]),
item["line"]
) or ""
author = f" ({author})" if author else ""
print(f"{item['file']:<45} {item['line']:>4} {item['type']:<10} {item['name']}{author}")
else:
print("No dead code detected!")
if __name__ == "__main__":
main()

239
scripts/diff_analyzer.py Normal file
View File

@@ -0,0 +1,239 @@
"""
Diff Analyzer — Pipeline 6.1
Reads PR diffs and categorizes changes: new code, deleted code, modified code, moved code.
Produces a change summary with line counts per category.
Usage:
from diff_analyzer import DiffAnalyzer
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff_text)
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional
class ChangeCategory(Enum):
"""Categories of code changes in a diff hunk."""
ADDED = "added"
DELETED = "deleted"
MODIFIED = "modified"
MOVED = "moved"
CONTEXT = "context"
@dataclass
class Hunk:
"""A single diff hunk with metadata."""
header: str
old_start: int
old_count: int
new_start: int
new_count: int
lines: List[str] = field(default_factory=list)
category: ChangeCategory = ChangeCategory.CONTEXT
old_lines: int = 0
new_lines: int = 0
@dataclass
class FileChange:
"""Changes within a single file."""
path: str
old_path: Optional[str] = None # For renames
is_new: bool = False
is_deleted: bool = False
is_renamed: bool = False
hunks: List[Hunk] = field(default_factory=list)
added_lines: int = 0
deleted_lines: int = 0
context_lines: int = 0
@dataclass
class ChangeSummary:
"""Summary of all changes in a diff."""
files_changed: int = 0
files_added: int = 0
files_deleted: int = 0
files_renamed: int = 0
files_modified: int = 0
total_added: int = 0
total_deleted: int = 0
total_context: int = 0
hunks_added: int = 0
hunks_deleted: int = 0
hunks_modified: int = 0
hunks_moved: int = 0
file_changes: List[FileChange] = field(default_factory=list)
def to_dict(self) -> dict:
"""Serialize to dict for JSON output."""
return {
"files_changed": self.files_changed,
"files_added": self.files_added,
"files_deleted": self.files_deleted,
"files_renamed": self.files_renamed,
"files_modified": self.files_modified,
"total_added": self.total_added,
"total_deleted": self.total_deleted,
"total_context": self.total_context,
"hunks_added": self.hunks_added,
"hunks_deleted": self.hunks_deleted,
"hunks_modified": self.hunks_modified,
"hunks_moved": self.hunks_moved,
"files": [
{
"path": fc.path,
"old_path": fc.old_path,
"is_new": fc.is_new,
"is_deleted": fc.is_deleted,
"is_renamed": fc.is_renamed,
"added": fc.added_lines,
"deleted": fc.deleted_lines,
"context": fc.context_lines,
}
for fc in self.file_changes
],
}
# Regex for unified diff headers
_HUNK_RE = re.compile(
r"^@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@(.*)$"
)
_FILE_HEADER_RE = re.compile(r"^diff --git a/(.*) b/(.*)$")
_RENAME_RE = re.compile(r"^rename from (.+)$|^rename to (.+)$")
class DiffAnalyzer:
"""Parses unified diffs and categorizes changes."""
def analyze(self, diff_text: str) -> ChangeSummary:
"""Analyze a unified diff string and return a ChangeSummary."""
summary = ChangeSummary()
lines = diff_text.splitlines(keepends=False)
current_file: Optional[FileChange] = None
current_hunk: Optional[Hunk] = None
old_path: Optional[str] = None
new_path: Optional[str] = None
for line in lines:
# File header
m = _FILE_HEADER_RE.match(line)
if m:
# Save previous file
if current_file:
self._classify_file(current_file)
summary.file_changes.append(current_file)
old_path = m.group(1)
new_path = m.group(2)
current_file = FileChange(path=new_path, old_path=old_path)
current_hunk = None
continue
if current_file is None:
continue
# Detect new/deleted file markers
if line.startswith("new file mode"):
current_file.is_new = True
continue
if line.startswith("deleted file mode"):
current_file.is_deleted = True
continue
# Detect renames
rm = _RENAME_RE.match(line)
if rm:
current_file.is_renamed = True
continue
# Hunk header
hm = _HUNK_RE.match(line)
if hm:
if current_hunk:
self._classify_hunk(current_hunk)
current_file.hunks.append(current_hunk)
current_hunk = Hunk(
header=line,
old_start=int(hm.group(1)),
old_count=int(hm.group(2) or 1),
new_start=int(hm.group(3)),
new_count=int(hm.group(4) or 1),
)
continue
if current_hunk is None:
continue
# Hunk content
current_hunk.lines.append(line)
if line.startswith("+"):
current_hunk.new_lines += 1
current_file.added_lines += 1
elif line.startswith("-"):
current_hunk.old_lines += 1
current_file.deleted_lines += 1
elif line.startswith(" "):
current_file.context_lines += 1
# Finalize last hunk and file
if current_hunk:
self._classify_hunk(current_hunk)
if current_file:
current_file.hunks.append(current_hunk)
if current_file:
self._classify_file(current_file)
summary.file_changes.append(current_file)
# Aggregate
summary.files_changed = len(summary.file_changes)
for fc in summary.file_changes:
summary.total_added += fc.added_lines
summary.total_deleted += fc.deleted_lines
summary.total_context += fc.context_lines
if fc.is_new:
summary.files_added += 1
elif fc.is_deleted:
summary.files_deleted += 1
elif fc.is_renamed:
summary.files_renamed += 1
else:
summary.files_modified += 1
for h in fc.hunks:
if h.category == ChangeCategory.ADDED:
summary.hunks_added += 1
elif h.category == ChangeCategory.DELETED:
summary.hunks_deleted += 1
elif h.category == ChangeCategory.MODIFIED:
summary.hunks_modified += 1
elif h.category == ChangeCategory.MOVED:
summary.hunks_moved += 1
return summary
def _classify_hunk(self, hunk: Hunk) -> None:
"""Classify a hunk based on its add/delete ratio."""
if hunk.new_lines > 0 and hunk.old_lines == 0:
hunk.category = ChangeCategory.ADDED
elif hunk.old_lines > 0 and hunk.new_lines == 0:
hunk.category = ChangeCategory.DELETED
elif hunk.new_lines > 0 and hunk.old_lines > 0:
hunk.category = ChangeCategory.MODIFIED
else:
hunk.category = ChangeCategory.CONTEXT
def _classify_file(self, fc: FileChange) -> None:
"""Final file classification (renames already detected via headers)."""
pass

224
tests/test_diff_analyzer.py Normal file
View File

@@ -0,0 +1,224 @@
"""Tests for diff_analyzer module."""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'scripts'))
from diff_analyzer import DiffAnalyzer, ChangeCategory
def test_parse_simple_addition():
diff = """diff --git a/foo.py b/foo.py
new file mode 100644
--- /dev/null
+++ b/foo.py
@@ -0,0 +1,3 @@
+def hello():
+ return "world"
+# end
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 1
assert summary.files_added == 1
assert summary.files_modified == 0
assert summary.total_added == 3
assert summary.total_deleted == 0
assert summary.hunks_added == 1
assert len(summary.file_changes) == 1
assert summary.file_changes[0].is_new is True
assert summary.file_changes[0].path == "foo.py"
def test_parse_simple_deletion():
diff = """diff --git a/old.py b/old.py
deleted file mode 100644
--- a/old.py
+++ /dev/null
@@ -1,2 +0,0 @@
-x = 1
-y = 2
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 1
assert summary.files_deleted == 1
assert summary.total_deleted == 2
assert summary.total_added == 0
assert summary.hunks_deleted == 1
assert summary.file_changes[0].is_deleted is True
def test_parse_modification():
diff = """diff --git a/bar.py b/bar.py
--- a/bar.py
+++ b/bar.py
@@ -10,3 +10,4 @@ def foo():
existing()
- old_call()
+ new_call()
+ extra_step()
return
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 1
assert summary.files_modified == 1
assert summary.total_added == 2 # +new_call(), +extra_step()
assert summary.total_deleted == 1 # -old_call()
assert summary.total_context == 2 # 2 context lines
assert summary.hunks_modified == 1
def test_parse_multiple_files():
diff = """diff --git a/a.py b/a.py
--- a/a.py
+++ b/a.py
@@ -1,1 +1,2 @@
existing
+added
diff --git a/b.py b/b.py
new file mode 100644
--- /dev/null
+++ b/b.py
@@ -0,0 +1,1 @@
+new file
diff --git a/c.py b/c.py
deleted file mode 100644
--- a/c.py
+++ /dev/null
@@ -1,1 +0,0 @@
-gone
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 3
assert summary.files_added == 1
assert summary.files_deleted == 1
assert summary.files_modified == 1
assert summary.total_added == 2
assert summary.total_deleted == 1
def test_parse_rename():
diff = """diff --git a/old_name.py b/new_name.py
rename from old_name.py
rename to new_name.py
--- a/old_name.py
+++ b/new_name.py
@@ -1,1 +1,1 @@
-old_func()
+new_func()
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 1
assert summary.files_renamed == 1
assert summary.file_changes[0].is_renamed is True
assert summary.file_changes[0].old_path == "old_name.py"
assert summary.file_changes[0].path == "new_name.py"
def test_parse_mixed_hunks():
"""A file with one add hunk and one delete hunk."""
diff = """diff --git a/mixed.py b/mixed.py
--- a/mixed.py
+++ b/mixed.py
@@ -5,0 +6,2 @@
+new_line_1
+new_line_2
@@ -20,2 +22,0 @@
-removed_1
-removed_2
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 1
assert summary.hunks_added == 1
assert summary.hunks_deleted == 1
assert summary.total_added == 2
assert summary.total_deleted == 2
def test_empty_diff():
analyzer = DiffAnalyzer()
summary = analyzer.analyze("")
assert summary.files_changed == 0
assert summary.total_added == 0
assert summary.total_deleted == 0
def test_to_dict():
diff = """diff --git a/test.py b/test.py
new file mode 100644
--- /dev/null
+++ b/test.py
@@ -0,0 +1,2 @@
+line1
+line2
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
d = summary.to_dict()
assert d["files_changed"] == 1
assert d["files_added"] == 1
assert d["total_added"] == 2
assert d["total_deleted"] == 0
assert len(d["files"]) == 1
assert d["files"][0]["path"] == "test.py"
assert d["files"][0]["is_new"] is True
def test_context_only_hunk():
"""A hunk with only context lines (rare but possible)."""
diff = """diff --git a/noop.py b/noop.py
--- a/noop.py
+++ b/noop.py
@@ -5,3 +5,3 @@
context1
context2
context3
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.total_context == 3
assert summary.total_added == 0
assert summary.total_deleted == 0
def test_binary_files_skipped():
"""Binary file diffs have no content lines — just headers."""
diff = """diff --git a/image.png b/image.png
--- a/image.png
+++ b/image.png
Binary files a/image.png and b/image.png differ
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 1
assert summary.total_added == 0
assert summary.total_deleted == 0
if __name__ == "__main__":
test_parse_simple_addition()
test_parse_simple_deletion()
test_parse_modification()
test_parse_multiple_files()
test_parse_rename()
test_parse_mixed_hunks()
test_empty_diff()
test_to_dict()
test_context_only_hunk()
test_binary_files_skipped()
print("All 10 tests passed.")