Compare commits
1 Commits
step35/132
...
step35/93-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
77e7e5daeb |
@@ -131,7 +131,10 @@ def detect_cycles(graph: dict) -> list:
|
||||
return result
|
||||
elif neighbor in rec_stack:
|
||||
cycle_start = path.index(neighbor)
|
||||
return path[cycle_start:] + [neighbor]
|
||||
cycle = path[cycle_start:]
|
||||
if cycle[-1] != neighbor:
|
||||
cycle.append(neighbor)
|
||||
return cycle
|
||||
|
||||
rec_stack.remove(node)
|
||||
return None
|
||||
|
||||
@@ -1,288 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Codebase Genome Diff — Detect structural changes between two versions.
|
||||
|
||||
Compares two git refs (commits, branches, tags) and produces a human-readable
|
||||
report of structural changes:
|
||||
• Added/removed/renamed files
|
||||
• Changed functions/classes (signature modifications)
|
||||
• New dependencies (imports, requirements, etc.)
|
||||
|
||||
Usage:
|
||||
python3 scripts/genome_diff.py --ref1 <commit1> --ref2 <commit2>
|
||||
python3 scripts/genome_diff.py --ref1 main --ref2 feature-branch
|
||||
python3 scripts/genome_diff.py --ref1 v1.0 --ref2 v2.0 --output report.txt
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Dict, Any, Optional
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
sys.path.insert(0, SCRIPT_DIR)
|
||||
from diff_analyzer import DiffAnalyzer, ChangeCategory
|
||||
|
||||
|
||||
@dataclass
|
||||
class FunctionChange:
|
||||
file: str
|
||||
name: str
|
||||
kind: str # 'function' or 'class'
|
||||
change_type: str # 'added' or 'removed' (simplified)
|
||||
old_line: Optional[int] = None
|
||||
new_line: Optional[int] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class DependencyChange:
|
||||
file: str
|
||||
module: str
|
||||
change_type: str # 'added' or 'removed' or 'modified'
|
||||
line: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class GenomeDiffReport:
|
||||
ref1: str
|
||||
ref2: str
|
||||
file_changes: List[Dict[str, Any]] = field(default_factory=list)
|
||||
function_changes: List[FunctionChange] = field(default_factory=list)
|
||||
dependency_changes: List[DependencyChange] = field(default_factory=list)
|
||||
total_files_changed: int = 0
|
||||
total_functions_changed: int = 0
|
||||
total_dependencies_changed: int = 0
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"ref1": self.ref1,
|
||||
"ref2": self.ref2,
|
||||
"summary": {
|
||||
"files": self.total_files_changed,
|
||||
"functions": self.total_functions_changed,
|
||||
"dependencies": self.total_dependencies_changed,
|
||||
},
|
||||
"file_changes": self.file_changes,
|
||||
"function_changes": [fc.__dict__ for fc in self.function_changes],
|
||||
"dependency_changes": [dc.__dict__ for dc in self.dependency_changes],
|
||||
}
|
||||
|
||||
def human_report(self) -> str:
|
||||
lines = []
|
||||
lines.append(f"Codebase Genome Diff: {self.ref1} → {self.ref2}")
|
||||
lines.append("=" * 60)
|
||||
lines.append(f" Files changed: {self.total_files_changed}")
|
||||
lines.append(f" Functions changed: {self.total_functions_changed}")
|
||||
lines.append(f" Dependencies changed: {self.total_dependencies_changed}")
|
||||
lines.append("")
|
||||
|
||||
for fc in self.file_changes:
|
||||
kind = []
|
||||
if fc.get('is_new'):
|
||||
kind.append("NEW")
|
||||
if fc.get('is_deleted'):
|
||||
kind.append("DELETED")
|
||||
if fc.get('is_renamed'):
|
||||
kind.append("RENAMED")
|
||||
if fc.get('is_binary'):
|
||||
kind.append("BINARY")
|
||||
kind_str = f" [{', '.join(kind)}]" if kind else ""
|
||||
lines.append(f" {fc['path']}{kind_str} (+{fc['added_lines']}/-{fc['deleted_lines']})")
|
||||
lines.append("")
|
||||
|
||||
for fc in self.function_changes:
|
||||
op = {'added': '+', 'removed': '-', 'modified': '~'}.get(fc.change_type, '?')
|
||||
lines.append(f" [{op}] {fc.file}: {fc.kind} '{fc.name}'")
|
||||
lines.append("")
|
||||
|
||||
for dc in self.dependency_changes:
|
||||
op = '+' if dc.change_type == 'added' else '-'
|
||||
lines.append(f" [{op}] {dc.file}: {dc.module}")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def run_git_diff(ref1: str, ref2: str) -> str:
|
||||
result = subprocess.run(
|
||||
['git', 'diff', '--unified=0', f'{ref1}...{ref2}'],
|
||||
capture_output=True, text=True, cwd=SCRIPT_DIR
|
||||
)
|
||||
if result.returncode not in (0, 1):
|
||||
print(f"git diff failed: {result.stderr}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
return result.stdout
|
||||
|
||||
|
||||
def extract_function_changes(diff_text: str) -> List[FunctionChange]:
|
||||
changes: List[FunctionChange] = []
|
||||
pattern = re.compile(r'^([+\-])\s*(def|class)\s+(\w+)', re.MULTILINE)
|
||||
hunk_header_re = re.compile(r'^@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@')
|
||||
current_old_line: Optional[int] = None
|
||||
current_new_line: Optional[int] = None
|
||||
|
||||
for line in diff_text.split('\n'):
|
||||
hdr = hunk_header_re.match(line)
|
||||
if hdr:
|
||||
current_old_line = int(hdr.group(1))
|
||||
current_new_line = int(hdr.group(3))
|
||||
continue
|
||||
m = pattern.match(line)
|
||||
if m:
|
||||
op = m.group(1)
|
||||
kind = m.group(2)
|
||||
name = m.group(3)
|
||||
change_type = "added" if op == '+' else "removed"
|
||||
line_num = current_new_line if change_type == "added" else current_old_line
|
||||
changes.append(FunctionChange(
|
||||
file="<unknown>",
|
||||
name=name,
|
||||
kind=kind,
|
||||
change_type=change_type,
|
||||
new_line=line_num if change_type == "added" else None,
|
||||
old_line=line_num if change_type == "removed" else None,
|
||||
))
|
||||
# Advance line counters heuristically
|
||||
if op == '-':
|
||||
if current_old_line is not None:
|
||||
current_old_line += 1
|
||||
elif op == '+':
|
||||
if current_new_line is not None:
|
||||
current_new_line += 1
|
||||
elif line.startswith(' '):
|
||||
if current_old_line is not None:
|
||||
current_old_line += 1
|
||||
if current_new_line is not None:
|
||||
current_new_line += 1
|
||||
# lines starting with other prefixes (like \\ No newline) ignored
|
||||
return changes
|
||||
|
||||
|
||||
def extract_dependency_changes(diff_text: str, analyzer: DiffAnalyzer) -> List[DependencyChange]:
|
||||
changes: List[DependencyChange] = []
|
||||
import_pattern = re.compile(
|
||||
r'^([+\-])\s*(?:import\s+([\w\.]+)|from\s+([\w\.]+)\s+import)',
|
||||
re.MULTILINE
|
||||
)
|
||||
file_diffs = analyzer._split_files(diff_text)
|
||||
for file_diff in file_diffs:
|
||||
file_match = re.search(r'^diff --git a/.*? b/(.*?)$', file_diff, re.MULTILINE)
|
||||
if not file_match:
|
||||
continue
|
||||
filepath = file_match.group(1)
|
||||
|
||||
# Scan each line for import changes
|
||||
for line in file_diff.split('\n'):
|
||||
m = import_pattern.match(line)
|
||||
if m:
|
||||
change_type = "added" if m.group(1) == '+' else "removed"
|
||||
module = m.group(2) or m.group(3)
|
||||
changes.append(DependencyChange(
|
||||
file=filepath,
|
||||
module=module,
|
||||
change_type=change_type,
|
||||
line=0
|
||||
))
|
||||
|
||||
# Detect if this file is a dependency manifest
|
||||
req_file_pattern = re.compile(
|
||||
r'^[\+\-].*?(requirements(.*?)\.txt|pyproject\.toml|setup\.py|Pipfile)'
|
||||
)
|
||||
if any(req_file_pattern.match(line) for line in file_diff.split('\n')):
|
||||
if not any(c.file == filepath and c.module == "<file>" for c in changes):
|
||||
changes.append(DependencyChange(
|
||||
file=filepath,
|
||||
module="<file>",
|
||||
change_type="modified",
|
||||
line=0
|
||||
))
|
||||
return changes
|
||||
|
||||
|
||||
def correlate_function_changes_with_files(diff_text: str, functions: List[FunctionChange]) -> List[FunctionChange]:
|
||||
result: List[FunctionChange] = []
|
||||
# Split diff into per-file sections
|
||||
file_sections: List[tuple[str, str]] = []
|
||||
current_file: Optional[str] = None
|
||||
current_lines: List[str] = []
|
||||
for line in diff_text.split('\n'):
|
||||
if line.startswith('diff --git'):
|
||||
if current_file is not None:
|
||||
file_sections.append((current_file, '\n'.join(current_lines)))
|
||||
m = re.match(r'^diff --git a/.*? b/(.*?)$', line)
|
||||
current_file = m.group(1) if m else "unknown"
|
||||
current_lines = [line]
|
||||
else:
|
||||
current_lines.append(line)
|
||||
if current_file is not None:
|
||||
file_sections.append((current_file, '\n'.join(current_lines)))
|
||||
|
||||
pattern = re.compile(r'^([+\-])\s*(def|class)\s+(\w+)', re.MULTILINE)
|
||||
for filepath, section in file_sections:
|
||||
for m in pattern.finditer(section):
|
||||
op = m.group(1)
|
||||
kind = m.group(2)
|
||||
name = m.group(3)
|
||||
change_type = "added" if op == '+' else "removed"
|
||||
result.append(FunctionChange(
|
||||
file=filepath,
|
||||
name=name,
|
||||
kind=kind,
|
||||
change_type=change_type
|
||||
))
|
||||
return result
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Codebase Genome Diff — structural changes between versions")
|
||||
parser.add_argument("--ref1", required=True, help="First git ref (commit, branch, tag)")
|
||||
parser.add_argument("--ref2", required=True, help="Second git ref")
|
||||
parser.add_argument("--output", help="Write report to file")
|
||||
parser.add_argument("--json", action="store_true", help="Output JSON instead of human report")
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
diff_text = run_git_diff(args.ref1, args.ref2)
|
||||
except Exception as e:
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if not diff_text.strip():
|
||||
print(f"No differences between {args.ref1} and {args.ref2}.")
|
||||
sys.exit(0)
|
||||
|
||||
analyzer = DiffAnalyzer()
|
||||
summary = analyzer.analyze(diff_text)
|
||||
|
||||
file_changes = [fc.to_dict() for fc in summary.files]
|
||||
func_changes = extract_function_changes(diff_text)
|
||||
func_changes = correlate_function_changes_with_files(diff_text, func_changes)
|
||||
dep_changes = extract_dependency_changes(diff_text, analyzer)
|
||||
|
||||
report = GenomeDiffReport(
|
||||
ref1=args.ref1,
|
||||
ref2=args.ref2,
|
||||
file_changes=file_changes,
|
||||
function_changes=func_changes,
|
||||
dependency_changes=dep_changes,
|
||||
total_files_changed=len(file_changes),
|
||||
total_functions_changed=len(func_changes),
|
||||
total_dependencies_changed=len(dep_changes),
|
||||
)
|
||||
|
||||
output = json.dumps(report.to_dict(), indent=2) if args.json else report.human_report()
|
||||
|
||||
if args.output:
|
||||
with open(args.output, 'w') as f:
|
||||
f.write(output + '\n')
|
||||
print(f"Report written to {args.output}")
|
||||
else:
|
||||
print(output)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
184
tests/test_dependency_graph.py
Normal file
184
tests/test_dependency_graph.py
Normal file
@@ -0,0 +1,184 @@
|
||||
"""
|
||||
Tests for dependency_graph — cross-repo dependency analysis.
|
||||
"""
|
||||
|
||||
import json
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
# Make scripts importable
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
|
||||
|
||||
from dependency_graph import (
|
||||
normalize_repo_name,
|
||||
scan_file_for_deps,
|
||||
detect_cycles,
|
||||
scan_repo,
|
||||
)
|
||||
|
||||
|
||||
class TestNormalizeRepoName(unittest.TestCase):
|
||||
def test_lowercase(self):
|
||||
self.assertEqual(normalize_repo_name("Hermes-Agent"), "hermes-agent")
|
||||
|
||||
def test_underscore_to_hyphen(self):
|
||||
self.assertEqual(normalize_repo_name("timmy_config"), "timmy-config")
|
||||
|
||||
def test_strip_git(self):
|
||||
self.assertEqual(normalize_repo_name("the-nexus.git"), "the-nexus")
|
||||
|
||||
|
||||
class TestScanFileForDeps(unittest.TestCase):
|
||||
def test_python_import(self):
|
||||
content = "from hermes_agent.tools import something"
|
||||
deps = scan_file_for_deps("dummy.py", content, "my-repo")
|
||||
self.assertIn("hermes-agent", deps)
|
||||
|
||||
def test_python_import_from(self):
|
||||
content = "import timmy_config.helpers"
|
||||
deps = scan_file_for_deps("dummy.py", content, "my-repo")
|
||||
self.assertIn("timmy-config", deps)
|
||||
|
||||
def test_js_require(self):
|
||||
content = 'const utils = require("the-nexus/utils");'
|
||||
deps = scan_file_for_deps("dummy.js", content, "my-repo")
|
||||
self.assertIn("the-nexus", deps)
|
||||
|
||||
def test_es6_import(self):
|
||||
content = 'import {something} from "fleet-ops";'
|
||||
deps = scan_file_for_deps("dummy.ts", content, "my-repo")
|
||||
self.assertIn("fleet-ops", deps)
|
||||
|
||||
def test_go_import(self):
|
||||
content = 'import "github.com/Timmy_Foundation/the-door/pkg"'
|
||||
deps = scan_file_for_deps("dummy.go", content, "my-repo")
|
||||
self.assertIn("the-door", deps)
|
||||
|
||||
def test_ansible_include_role(self):
|
||||
content = "- include_role:\n name: timmy-home.nginx"
|
||||
deps = scan_file_for_deps("dummy.yml", content, "my-repo")
|
||||
self.assertIn("timmy-home", deps)
|
||||
|
||||
def test_yaml_repo_reference(self):
|
||||
content = "repo: the-beacon\\ntrigger: push"
|
||||
deps = scan_file_for_deps("dummy.yaml", content, "my-repo")
|
||||
self.assertIn("the-beacon", deps)
|
||||
|
||||
def test_exclude_self_reference(self):
|
||||
content = "import hermes_agent"
|
||||
deps = scan_file_for_deps("dummy.py", content, "hermes-agent")
|
||||
self.assertNotIn("hermes-agent", deps)
|
||||
|
||||
def test_no_deps(self):
|
||||
content = "# no imports here"
|
||||
deps = scan_file_for_deps("dummy.py", content, "my-repo")
|
||||
self.assertEqual(len(deps), 0)
|
||||
|
||||
|
||||
class TestDetectCycles(unittest.TestCase):
|
||||
def test_no_cycles(self):
|
||||
graph = {
|
||||
"A": {"dependencies": ["B"]},
|
||||
"B": {"dependencies": ["C"]},
|
||||
"C": {"dependencies": []},
|
||||
}
|
||||
cycles = detect_cycles(graph)
|
||||
self.assertEqual(cycles, [])
|
||||
|
||||
def test_simple_cycle(self):
|
||||
graph = {
|
||||
"A": {"dependencies": ["B"]},
|
||||
"B": {"dependencies": ["A"]},
|
||||
}
|
||||
cycles = detect_cycles(graph)
|
||||
self.assertEqual(len(cycles), 1)
|
||||
self.assertIn("A", cycles[0])
|
||||
self.assertIn("B", cycles[0])
|
||||
|
||||
def test_three_node_cycle(self):
|
||||
graph = {
|
||||
"A": {"dependencies": ["B"]},
|
||||
"B": {"dependencies": ["C"]},
|
||||
"C": {"dependencies": ["A"]},
|
||||
}
|
||||
cycles = detect_cycles(graph)
|
||||
self.assertEqual(len(cycles), 1)
|
||||
self.assertEqual(set(cycles[0]), {"A", "B", "C"})
|
||||
|
||||
def test_multiple_independent_cycles(self):
|
||||
graph = {
|
||||
"A": {"dependencies": ["B"]},
|
||||
"B": {"dependencies": ["A"]},
|
||||
"C": {"dependencies": ["D"]},
|
||||
"D": {"dependencies": ["C"]},
|
||||
}
|
||||
cycles = detect_cycles(graph)
|
||||
self.assertEqual(len(cycles), 2)
|
||||
|
||||
def test_self_cycle(self):
|
||||
graph = {
|
||||
"A": {"dependencies": ["A"]},
|
||||
}
|
||||
cycles = detect_cycles(graph)
|
||||
self.assertEqual(len(cycles), 1)
|
||||
self.assertEqual(cycles[0], ["A"])
|
||||
|
||||
|
||||
class TestScanRepo(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmpdir = tempfile.mkdtemp()
|
||||
self.repo_path = Path(self.tmpdir) / "test-repo"
|
||||
self.repo_path.mkdir()
|
||||
|
||||
def tearDown(self):
|
||||
import shutil
|
||||
shutil.rmtree(self.tmpdir)
|
||||
|
||||
def test_scan_simple_import(self):
|
||||
# Create a Python file that imports another known repo
|
||||
(self.repo_path / "main.py").write_text("from hermes_agent import tools")
|
||||
result = scan_repo(str(self.repo_path), "test-repo")
|
||||
self.assertIn("dependencies", result)
|
||||
self.assertIn("hermes-agent", result["dependencies"])
|
||||
self.assertEqual(result["files_scanned"], 1)
|
||||
|
||||
def test_scan_multiple_files(self):
|
||||
(self.repo_path / "a.py").write_text("import timmy_config")
|
||||
(self.repo_path / "b.js").write_text('require("the-nexus")')
|
||||
(self.repo_path / "c.yml").write_text("include_role: fleet-ops")
|
||||
result = scan_repo(str(self.repo_path), "test-repo")
|
||||
deps = set(result["dependencies"])
|
||||
self.assertIn("timmy-config", deps)
|
||||
self.assertIn("the-nexus", deps)
|
||||
self.assertIn("fleet-ops", deps)
|
||||
self.assertEqual(result["files_scanned"], 3)
|
||||
|
||||
|
||||
class TestIntegration(unittest.TestCase):
|
||||
"""Integration test using real test_repos directory."""
|
||||
|
||||
def test_scan_timmy_config(self):
|
||||
repo_root = Path(__file__).parent.parent
|
||||
test_repos_dir = repo_root / "test_repos"
|
||||
if not test_repos_dir.exists():
|
||||
self.skipTest("test_repos directory not found (needs cloning)")
|
||||
|
||||
# The timmy-config repo fixture exists
|
||||
tc = test_repos_dir / "timmy-config"
|
||||
if not tc.exists():
|
||||
self.skipTest("timmy-config not cloned")
|
||||
|
||||
result = scan_repo(str(tc), "timmy-config")
|
||||
deps = set(result.get("dependencies", []))
|
||||
# timmy-config dependencies based on known codebase
|
||||
expected = {"hermes-agent", "the-nexus", "fleet-ops"}
|
||||
# At least some deps should be found
|
||||
self.assertGreater(len(deps), 0, "Should find at least one dependency")
|
||||
# Cross-check: hermes-agent definitely used
|
||||
self.assertIn("hermes-agent", deps)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user