Compare commits
1 Commits
step35/132
...
step35/111
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
832b23286b |
@@ -180,6 +180,89 @@ def to_mermaid(graph: dict) -> str:
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
|
||||
def transitive_closure(graph: dict) -> dict:
|
||||
"""Compute transitive closure for each node (all indirect deps)."""
|
||||
closure = {}
|
||||
# Build adjacency list
|
||||
adj = {node: set(data.get("dependencies", [])) for node, data in graph.items()}
|
||||
all_nodes = set(adj.keys()) | set().union(*adj.values())
|
||||
|
||||
for node in all_nodes:
|
||||
visited = set()
|
||||
stack = list(adj.get(node, set()))
|
||||
while stack:
|
||||
current = stack.pop()
|
||||
if current not in visited:
|
||||
visited.add(current)
|
||||
stack.extend(adj.get(current, set()))
|
||||
# Remove self-reference: a node's transitive deps should not include itself
|
||||
visited.discard(node)
|
||||
closure[node] = visited
|
||||
|
||||
return closure
|
||||
|
||||
|
||||
def find_deep_chains(graph: dict) -> list[list[str]]:
|
||||
"""Find the longest simple paths in the dependency graph (ignoring cycles)."""
|
||||
from collections import defaultdict
|
||||
|
||||
adj = {node: list(data.get("dependencies", [])) for node, data in graph.items()}
|
||||
deepest = []
|
||||
max_len = 0
|
||||
|
||||
def dfs(node: str, path: list, visited: set):
|
||||
nonlocal deepest, max_len
|
||||
# Stop if we hit a cycle (node already in path)
|
||||
if node in path:
|
||||
return
|
||||
new_path = path + [node]
|
||||
if node not in adj or not adj[node]:
|
||||
# leaf
|
||||
if len(new_path) > max_len:
|
||||
max_len = len(new_path)
|
||||
deepest = [new_path.copy()]
|
||||
elif len(new_path) == max_len:
|
||||
deepest.append(new_path.copy())
|
||||
else:
|
||||
for neighbor in adj[node]:
|
||||
dfs(neighbor, new_path.copy(), visited | {node})
|
||||
|
||||
for start in graph:
|
||||
dfs(start, [], set())
|
||||
|
||||
return deepest
|
||||
|
||||
|
||||
def format_transitive_markdown(closure: dict) -> str:
|
||||
"""Render transitive closure as a markdown table."""
|
||||
lines = ["# Transitive Dependencies\n\n"]
|
||||
lines.append("| Node | Transitive Dependencies | Count |\n")
|
||||
lines.append("|------|------------------------|-------|\n")
|
||||
for node in sorted(closure.keys()):
|
||||
deps = closure[node]
|
||||
deps_str = ", ".join(sorted(deps)) if deps else "(none)"
|
||||
lines.append(f"| {node} | {deps_str} | {len(deps)} |\n")
|
||||
return "".join(lines)
|
||||
|
||||
|
||||
def format_deep_chains_markdown(chains: list[list[str]]) -> str:
|
||||
"""Render longest dependency chains as a markdown list."""
|
||||
lines = ["# Deepest Dependency Chains\n\n"]
|
||||
if not chains:
|
||||
lines.append("No chains found.\n")
|
||||
return "".join(lines)
|
||||
max_len = max(len(c) for c in chains)
|
||||
lines.append(f"*Longest chain length:* {max_len}\n\n")
|
||||
for i, chain in enumerate(sorted(chains, key=lambda c: (-len(c), " -> ".join(c))), 1):
|
||||
lines.append(f"**Chain {i}** ({len(chain)} nodes)\n\n")
|
||||
indent = " "
|
||||
for j, node in enumerate(chain):
|
||||
arrow = " → " if j < len(chain)-1 else " • "
|
||||
lines.append(f"{indent}{arrow}{node}\n")
|
||||
lines.append("\n")
|
||||
return "".join(lines)
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Build cross-repo dependency graph")
|
||||
parser.add_argument("repos_dir", nargs="?", help="Directory containing repos")
|
||||
@@ -228,13 +311,20 @@ def main():
|
||||
elif args.format == "mermaid":
|
||||
output = to_mermaid(results)
|
||||
else:
|
||||
# Compute transitive and deep chains
|
||||
closure = transitive_closure(results)
|
||||
deep_chains = find_deep_chains(results)
|
||||
output = json.dumps({
|
||||
"repos": results,
|
||||
"cycles": cycles,
|
||||
"transitive": {node: sorted(deps) for node, deps in closure.items()},
|
||||
"deep_chains": [chain for chain in deep_chains if len(chain) > 1],
|
||||
"summary": {
|
||||
"total_repos": len(results),
|
||||
"total_deps": sum(len(r["dependencies"]) for r in results.values()),
|
||||
"cycles_found": len(cycles),
|
||||
"transitive_pairs": sum(len(deps) for deps in closure.values()),
|
||||
"longest_chain_length": max((len(c) for c in deep_chains), default=0),
|
||||
}
|
||||
}, indent=2)
|
||||
|
||||
|
||||
@@ -1,288 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Codebase Genome Diff — Detect structural changes between two versions.
|
||||
|
||||
Compares two git refs (commits, branches, tags) and produces a human-readable
|
||||
report of structural changes:
|
||||
• Added/removed/renamed files
|
||||
• Changed functions/classes (signature modifications)
|
||||
• New dependencies (imports, requirements, etc.)
|
||||
|
||||
Usage:
|
||||
python3 scripts/genome_diff.py --ref1 <commit1> --ref2 <commit2>
|
||||
python3 scripts/genome_diff.py --ref1 main --ref2 feature-branch
|
||||
python3 scripts/genome_diff.py --ref1 v1.0 --ref2 v2.0 --output report.txt
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Dict, Any, Optional
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
sys.path.insert(0, SCRIPT_DIR)
|
||||
from diff_analyzer import DiffAnalyzer, ChangeCategory
|
||||
|
||||
|
||||
@dataclass
|
||||
class FunctionChange:
|
||||
file: str
|
||||
name: str
|
||||
kind: str # 'function' or 'class'
|
||||
change_type: str # 'added' or 'removed' (simplified)
|
||||
old_line: Optional[int] = None
|
||||
new_line: Optional[int] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class DependencyChange:
|
||||
file: str
|
||||
module: str
|
||||
change_type: str # 'added' or 'removed' or 'modified'
|
||||
line: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class GenomeDiffReport:
|
||||
ref1: str
|
||||
ref2: str
|
||||
file_changes: List[Dict[str, Any]] = field(default_factory=list)
|
||||
function_changes: List[FunctionChange] = field(default_factory=list)
|
||||
dependency_changes: List[DependencyChange] = field(default_factory=list)
|
||||
total_files_changed: int = 0
|
||||
total_functions_changed: int = 0
|
||||
total_dependencies_changed: int = 0
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"ref1": self.ref1,
|
||||
"ref2": self.ref2,
|
||||
"summary": {
|
||||
"files": self.total_files_changed,
|
||||
"functions": self.total_functions_changed,
|
||||
"dependencies": self.total_dependencies_changed,
|
||||
},
|
||||
"file_changes": self.file_changes,
|
||||
"function_changes": [fc.__dict__ for fc in self.function_changes],
|
||||
"dependency_changes": [dc.__dict__ for dc in self.dependency_changes],
|
||||
}
|
||||
|
||||
def human_report(self) -> str:
|
||||
lines = []
|
||||
lines.append(f"Codebase Genome Diff: {self.ref1} → {self.ref2}")
|
||||
lines.append("=" * 60)
|
||||
lines.append(f" Files changed: {self.total_files_changed}")
|
||||
lines.append(f" Functions changed: {self.total_functions_changed}")
|
||||
lines.append(f" Dependencies changed: {self.total_dependencies_changed}")
|
||||
lines.append("")
|
||||
|
||||
for fc in self.file_changes:
|
||||
kind = []
|
||||
if fc.get('is_new'):
|
||||
kind.append("NEW")
|
||||
if fc.get('is_deleted'):
|
||||
kind.append("DELETED")
|
||||
if fc.get('is_renamed'):
|
||||
kind.append("RENAMED")
|
||||
if fc.get('is_binary'):
|
||||
kind.append("BINARY")
|
||||
kind_str = f" [{', '.join(kind)}]" if kind else ""
|
||||
lines.append(f" {fc['path']}{kind_str} (+{fc['added_lines']}/-{fc['deleted_lines']})")
|
||||
lines.append("")
|
||||
|
||||
for fc in self.function_changes:
|
||||
op = {'added': '+', 'removed': '-', 'modified': '~'}.get(fc.change_type, '?')
|
||||
lines.append(f" [{op}] {fc.file}: {fc.kind} '{fc.name}'")
|
||||
lines.append("")
|
||||
|
||||
for dc in self.dependency_changes:
|
||||
op = '+' if dc.change_type == 'added' else '-'
|
||||
lines.append(f" [{op}] {dc.file}: {dc.module}")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def run_git_diff(ref1: str, ref2: str) -> str:
|
||||
result = subprocess.run(
|
||||
['git', 'diff', '--unified=0', f'{ref1}...{ref2}'],
|
||||
capture_output=True, text=True, cwd=SCRIPT_DIR
|
||||
)
|
||||
if result.returncode not in (0, 1):
|
||||
print(f"git diff failed: {result.stderr}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
return result.stdout
|
||||
|
||||
|
||||
def extract_function_changes(diff_text: str) -> List[FunctionChange]:
|
||||
changes: List[FunctionChange] = []
|
||||
pattern = re.compile(r'^([+\-])\s*(def|class)\s+(\w+)', re.MULTILINE)
|
||||
hunk_header_re = re.compile(r'^@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@')
|
||||
current_old_line: Optional[int] = None
|
||||
current_new_line: Optional[int] = None
|
||||
|
||||
for line in diff_text.split('\n'):
|
||||
hdr = hunk_header_re.match(line)
|
||||
if hdr:
|
||||
current_old_line = int(hdr.group(1))
|
||||
current_new_line = int(hdr.group(3))
|
||||
continue
|
||||
m = pattern.match(line)
|
||||
if m:
|
||||
op = m.group(1)
|
||||
kind = m.group(2)
|
||||
name = m.group(3)
|
||||
change_type = "added" if op == '+' else "removed"
|
||||
line_num = current_new_line if change_type == "added" else current_old_line
|
||||
changes.append(FunctionChange(
|
||||
file="<unknown>",
|
||||
name=name,
|
||||
kind=kind,
|
||||
change_type=change_type,
|
||||
new_line=line_num if change_type == "added" else None,
|
||||
old_line=line_num if change_type == "removed" else None,
|
||||
))
|
||||
# Advance line counters heuristically
|
||||
if op == '-':
|
||||
if current_old_line is not None:
|
||||
current_old_line += 1
|
||||
elif op == '+':
|
||||
if current_new_line is not None:
|
||||
current_new_line += 1
|
||||
elif line.startswith(' '):
|
||||
if current_old_line is not None:
|
||||
current_old_line += 1
|
||||
if current_new_line is not None:
|
||||
current_new_line += 1
|
||||
# lines starting with other prefixes (like \\ No newline) ignored
|
||||
return changes
|
||||
|
||||
|
||||
def extract_dependency_changes(diff_text: str, analyzer: DiffAnalyzer) -> List[DependencyChange]:
|
||||
changes: List[DependencyChange] = []
|
||||
import_pattern = re.compile(
|
||||
r'^([+\-])\s*(?:import\s+([\w\.]+)|from\s+([\w\.]+)\s+import)',
|
||||
re.MULTILINE
|
||||
)
|
||||
file_diffs = analyzer._split_files(diff_text)
|
||||
for file_diff in file_diffs:
|
||||
file_match = re.search(r'^diff --git a/.*? b/(.*?)$', file_diff, re.MULTILINE)
|
||||
if not file_match:
|
||||
continue
|
||||
filepath = file_match.group(1)
|
||||
|
||||
# Scan each line for import changes
|
||||
for line in file_diff.split('\n'):
|
||||
m = import_pattern.match(line)
|
||||
if m:
|
||||
change_type = "added" if m.group(1) == '+' else "removed"
|
||||
module = m.group(2) or m.group(3)
|
||||
changes.append(DependencyChange(
|
||||
file=filepath,
|
||||
module=module,
|
||||
change_type=change_type,
|
||||
line=0
|
||||
))
|
||||
|
||||
# Detect if this file is a dependency manifest
|
||||
req_file_pattern = re.compile(
|
||||
r'^[\+\-].*?(requirements(.*?)\.txt|pyproject\.toml|setup\.py|Pipfile)'
|
||||
)
|
||||
if any(req_file_pattern.match(line) for line in file_diff.split('\n')):
|
||||
if not any(c.file == filepath and c.module == "<file>" for c in changes):
|
||||
changes.append(DependencyChange(
|
||||
file=filepath,
|
||||
module="<file>",
|
||||
change_type="modified",
|
||||
line=0
|
||||
))
|
||||
return changes
|
||||
|
||||
|
||||
def correlate_function_changes_with_files(diff_text: str, functions: List[FunctionChange]) -> List[FunctionChange]:
|
||||
result: List[FunctionChange] = []
|
||||
# Split diff into per-file sections
|
||||
file_sections: List[tuple[str, str]] = []
|
||||
current_file: Optional[str] = None
|
||||
current_lines: List[str] = []
|
||||
for line in diff_text.split('\n'):
|
||||
if line.startswith('diff --git'):
|
||||
if current_file is not None:
|
||||
file_sections.append((current_file, '\n'.join(current_lines)))
|
||||
m = re.match(r'^diff --git a/.*? b/(.*?)$', line)
|
||||
current_file = m.group(1) if m else "unknown"
|
||||
current_lines = [line]
|
||||
else:
|
||||
current_lines.append(line)
|
||||
if current_file is not None:
|
||||
file_sections.append((current_file, '\n'.join(current_lines)))
|
||||
|
||||
pattern = re.compile(r'^([+\-])\s*(def|class)\s+(\w+)', re.MULTILINE)
|
||||
for filepath, section in file_sections:
|
||||
for m in pattern.finditer(section):
|
||||
op = m.group(1)
|
||||
kind = m.group(2)
|
||||
name = m.group(3)
|
||||
change_type = "added" if op == '+' else "removed"
|
||||
result.append(FunctionChange(
|
||||
file=filepath,
|
||||
name=name,
|
||||
kind=kind,
|
||||
change_type=change_type
|
||||
))
|
||||
return result
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Codebase Genome Diff — structural changes between versions")
|
||||
parser.add_argument("--ref1", required=True, help="First git ref (commit, branch, tag)")
|
||||
parser.add_argument("--ref2", required=True, help="Second git ref")
|
||||
parser.add_argument("--output", help="Write report to file")
|
||||
parser.add_argument("--json", action="store_true", help="Output JSON instead of human report")
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
diff_text = run_git_diff(args.ref1, args.ref2)
|
||||
except Exception as e:
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if not diff_text.strip():
|
||||
print(f"No differences between {args.ref1} and {args.ref2}.")
|
||||
sys.exit(0)
|
||||
|
||||
analyzer = DiffAnalyzer()
|
||||
summary = analyzer.analyze(diff_text)
|
||||
|
||||
file_changes = [fc.to_dict() for fc in summary.files]
|
||||
func_changes = extract_function_changes(diff_text)
|
||||
func_changes = correlate_function_changes_with_files(diff_text, func_changes)
|
||||
dep_changes = extract_dependency_changes(diff_text, analyzer)
|
||||
|
||||
report = GenomeDiffReport(
|
||||
ref1=args.ref1,
|
||||
ref2=args.ref2,
|
||||
file_changes=file_changes,
|
||||
function_changes=func_changes,
|
||||
dependency_changes=dep_changes,
|
||||
total_files_changed=len(file_changes),
|
||||
total_functions_changed=len(func_changes),
|
||||
total_dependencies_changed=len(dep_changes),
|
||||
)
|
||||
|
||||
output = json.dumps(report.to_dict(), indent=2) if args.json else report.human_report()
|
||||
|
||||
if args.output:
|
||||
with open(args.output, 'w') as f:
|
||||
f.write(output + '\n')
|
||||
print(f"Report written to {args.output}")
|
||||
else:
|
||||
print(output)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
155
scripts/test_dependency_graph.py
Normal file
155
scripts/test_dependency_graph.py
Normal file
@@ -0,0 +1,155 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for dependency_graph.py — transitive closure and deep chain detection."""
|
||||
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
import tempfile
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, os.path.dirname(__file__) or ".")
|
||||
|
||||
import importlib.util
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"dg", os.path.join(os.path.dirname(__file__) or ".", "dependency_graph.py")
|
||||
)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
|
||||
transitive_closure = mod.transitive_closure
|
||||
find_deep_chains = mod.find_deep_chains
|
||||
detect_cycles = mod.detect_cycles
|
||||
|
||||
|
||||
def make_graph(edges: dict[str, list[str]]) -> dict:
|
||||
"""Build graph dict in expected format: {repo: {"dependencies": [...]}}."""
|
||||
return {
|
||||
node: {"dependencies": sorted(deps), "files_scanned": 1}
|
||||
for node, deps in edges.items()
|
||||
}
|
||||
|
||||
|
||||
def test_transitive_closure_simple_chain():
|
||||
graph = make_graph({
|
||||
"A": ["B"],
|
||||
"B": ["C"],
|
||||
"C": [],
|
||||
})
|
||||
closure = transitive_closure(graph)
|
||||
assert closure["A"] == {"B", "C"}
|
||||
assert closure["B"] == {"C"}
|
||||
assert closure["C"] == set()
|
||||
print("✅ Simple chain transitive closure")
|
||||
|
||||
|
||||
def test_transitive_closure_diamond():
|
||||
graph = make_graph({
|
||||
"A": ["B", "C"],
|
||||
"B": ["D"],
|
||||
"C": ["D"],
|
||||
"D": [],
|
||||
})
|
||||
closure = transitive_closure(graph)
|
||||
assert closure["A"] == {"B", "C", "D"}
|
||||
assert closure["B"] == {"D"}
|
||||
assert closure["C"] == {"D"}
|
||||
assert closure["D"] == set()
|
||||
print("✅ Diamond closure")
|
||||
|
||||
|
||||
def test_transitive_closure_with_cycle():
|
||||
graph = make_graph({
|
||||
"A": ["B"],
|
||||
"B": ["C"],
|
||||
"C": ["A"], # cycle
|
||||
})
|
||||
closure = transitive_closure(graph)
|
||||
assert closure["A"] == {"B", "C"}
|
||||
assert closure["B"] == {"C", "A"}
|
||||
assert closure["C"] == {"A", "B"}
|
||||
print("✅ Cycle in transitive closure")
|
||||
|
||||
|
||||
def test_find_deep_chains_simple():
|
||||
graph = make_graph({
|
||||
"A": ["B"],
|
||||
"B": ["C"],
|
||||
"C": [],
|
||||
})
|
||||
chains = find_deep_chains(graph)
|
||||
chains_sorted = sorted(chains, key=len, reverse=True)
|
||||
assert len(chains_sorted) == 1
|
||||
assert chains_sorted[0] == ["A", "B", "C"]
|
||||
print("✅ Simple deep chain")
|
||||
|
||||
|
||||
def test_find_deep_chains_multiple():
|
||||
graph = make_graph({
|
||||
"A": ["B", "C"],
|
||||
"B": ["D"],
|
||||
"C": ["E"],
|
||||
"D": [],
|
||||
"E": [],
|
||||
})
|
||||
chains = find_deep_chains(graph)
|
||||
lengths = [len(c) for c in chains]
|
||||
assert max(lengths) == 3
|
||||
print("✅ Multiple chains detected")
|
||||
|
||||
|
||||
def test_find_deep_chains_with_cycle_does_not_infinite_loop():
|
||||
graph = make_graph({
|
||||
"A": ["B"],
|
||||
"B": ["C"],
|
||||
"C": ["A"],
|
||||
})
|
||||
chains = find_deep_chains(graph)
|
||||
print(f"✅ Cycle handled: found {len(chains)} chains")
|
||||
|
||||
|
||||
def test_empty_graph():
|
||||
graph = {}
|
||||
assert transitive_closure(graph) == {}
|
||||
assert find_deep_chains(graph) == []
|
||||
print("✅ Empty graph handled")
|
||||
|
||||
|
||||
def test_detect_cycles_shorthand():
|
||||
graph = make_graph({
|
||||
"A": ["B"],
|
||||
"B": ["C"],
|
||||
"C": ["A"],
|
||||
})
|
||||
cycles = detect_cycles(graph)
|
||||
assert len(cycles) == 1
|
||||
assert set(cycles[0]) == {"A", "B", "C"}
|
||||
print("✅ Cycle detection works")
|
||||
|
||||
|
||||
def test_chain_length_reporting():
|
||||
graph = make_graph({
|
||||
"root": ["a", "b"],
|
||||
"a": ["c"],
|
||||
"b": ["d"],
|
||||
"c": ["e"],
|
||||
"d": [],
|
||||
"e": [],
|
||||
})
|
||||
chains = find_deep_chains(graph)
|
||||
max_len = max(len(c) for c in chains)
|
||||
assert max_len == 4
|
||||
print(f"✅ Longest chain length: {max_len}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_transitive_closure_simple_chain()
|
||||
test_transitive_closure_diamond()
|
||||
test_transitive_closure_with_cycle()
|
||||
test_find_deep_chains_simple()
|
||||
test_find_deep_chains_multiple()
|
||||
test_find_deep_chains_with_cycle_does_not_infinite_loop()
|
||||
test_empty_graph()
|
||||
test_detect_cycles_shorthand()
|
||||
test_chain_length_reporting()
|
||||
print("\n✅ All dependency graph tests passed")
|
||||
Reference in New Issue
Block a user