Compare commits

..

1 Commits

Author SHA1 Message Date
Step35
1470b44c3b feat: add codebase genome diff script for structural change detection
Some checks failed
Test / pytest (pull_request) Failing after 9s
Introduces genome_diff.py — a tool for detecting structural changes between
two git refs: file-level changes, function/class signature modifications,
and dependency import changes.

Addresses #132.
2026-04-26 09:46:04 -04:00
3 changed files with 288 additions and 360 deletions

View File

@@ -1,308 +0,0 @@
#!/usr/bin/env python3
"""
Dependency Inventory — Scan repos and list third-party dependencies.
Reads: package.json, requirements.txt, go.mod, Cargo.toml, pyproject.toml
Extracts: package name, version constraint, source file/repo
Outputs: JSON (default) or markdown table
Usage:
python3 scripts/dependency_inventory.py --repos-dir ~/repos/
python3 scripts/dependency_inventory.py --repos ~/repo1,~/repo2 --format markdown
"""
import argparse
import json
import os
import re
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional
# Mapping of file pattern to canonical parser name
MANIFEST_PATTERNS = {
'requirements.txt': 'requirements',
'package.json': 'npm',
'pyproject.toml': 'pyproject',
'go.mod': 'go',
'Cargo.toml': 'cargo',
}
# Parser registry
PARSERS = {}
def register_parser(name: str):
"""Decorator to register a parser function."""
def decorator(fn):
PARSERS[name] = fn
return fn
return decorator
# ─── Parsers ────────────────────────────────────────────────────────────────
@register_parser('requirements')
def parse_requirements(content: str) -> List[Dict[str, str]]:
"""Parse requirements.txt — one requirement per line."""
deps = []
for line in content.splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
pkg_spec = re.split(r'[ ;#]', line)[0].strip()
if '>=' in pkg_spec:
name, ver = pkg_spec.split('>=', 1)
elif '==' in pkg_spec:
name, ver = pkg_spec.split('==', 1)
elif '<=' in pkg_spec:
name, ver = pkg_spec.split('<=', 1)
elif '~=' in pkg_spec:
name, ver = pkg_spec.split('~=', 1)
elif '>' in pkg_spec:
name, ver = pkg_spec.split('>', 1)
elif '<' in pkg_spec:
name, ver = pkg_spec.split('<', 1)
elif '=' in pkg_spec:
name, ver = pkg_spec.split('=', 1)
else:
name, ver = pkg_spec, ''
deps.append({
'package': name.strip(),
'version': ver.strip(),
'constraint': line[len(name):].strip()
})
return deps
@register_parser('npm')
def parse_package_json(content: str) -> List[Dict[str, str]]:
"""Parse package.json dependencies."""
try:
data = json.loads(content)
except json.JSONDecodeError:
return []
deps = []
for section in ('dependencies', 'devDependencies', 'peerDependencies', 'optionalDependencies'):
for name, ver in data.get(section, {}).items():
deps.append({
'package': name,
'version': ver,
'constraint': ver,
'type': section
})
return deps
@register_parser('pyproject')
def parse_pyproject_toml(content: str) -> List[Dict[str, str]]:
"""Parse pyproject.toml [project] dependencies."""
deps = []
in_deps = False
dep_buffer = ''
for line in content.splitlines():
stripped = line.strip()
if stripped.startswith('dependencies = ['):
in_deps = True
remainder = stripped.split('=', 1)[1].strip()
dep_buffer = remainder[1:] if remainder.startswith('[') else remainder
continue
if in_deps:
if stripped.startswith(']'):
in_deps = False
continue
dep_buffer += ' ' + line
dep_buffer = dep_buffer.strip().rstrip(',')
for match in re.finditer(r'"([^"]+)"', dep_buffer):
spec = match.group(1)
m = re.match(r'^([a-zA-Z0-9_.-]+)\s*([<>=!~]+)?\s*(.*)$', spec)
if m:
name, op, ver = m.groups()
deps.append({
'package': name,
'version': (ver or '').strip(),
'constraint': spec
})
return deps
@register_parser('go')
def parse_go_mod(content: str) -> List[Dict[str, str]]:
"""Parse go.mod — require statements."""
deps = []
for line in content.splitlines():
line = line.strip()
if line.startswith('require ') and not line.startswith('require ('):
parts = line.split()
if len(parts) >= 3:
mod, ver = parts[1], parts[2]
deps.append({'package': mod, 'version': ver, 'constraint': ver})
elif line.startswith('\t') and '/' in line:
parts = line.strip().split()
if len(parts) >= 2:
mod, ver = parts[0], parts[1]
deps.append({'package': mod, 'version': ver, 'constraint': ver})
return deps
@register_parser('cargo')
def parse_cargo_toml(content: str) -> List[Dict[str, str]]:
"""Parse [dependencies] section from Cargo.toml."""
deps = []
in_deps = False
for line in content.splitlines():
stripped = line.strip()
if stripped in ('[dependencies]', '[dependencies]'):
in_deps = True
continue
if stripped.startswith('['):
in_deps = False
continue
if in_deps and '=' in stripped:
name_part, ver_part = stripped.split('=', 1)
name = name_part.strip()
ver = ver_part.strip().strip('"').strip("'")
deps.append({'package': name, 'version': ver, 'constraint': ver})
return deps
# ─── File Discovery ─────────────────────────────────────────────────────────
def find_manifest_files(root: Path) -> Dict[str, List[Path]]:
"""Find all manifest files under root."""
found = {k: [] for k in MANIFEST_PATTERNS}
for pattern in MANIFEST_PATTERNS:
for path in root.rglob(pattern):
if not any(skip in str(path) for skip in ('.git', 'node_modules', '__pycache__', '.venv', 'venv')):
found[pattern].append(path)
return found
# ─── Main Scanner ────────────────────────────────────────────────────────────
def scan_repo(repo_path: Path) -> Dict[str, Any]:
"""Scan a single repo directory for dependency manifests."""
repo_name = repo_path.name
found = find_manifest_files(repo_path)
all_deps: List[Dict[str, str]] = []
files_scanned = 0
for pattern, paths in found.items():
parser_name = MANIFEST_PATTERNS[pattern]
# Map parser_name to function
if parser_name == 'requirements':
parser = parse_requirements
elif parser_name == 'npm':
parser = parse_package_json
elif parser_name == 'pyproject':
parser = parse_pyproject_toml
elif parser_name == 'go':
parser = parse_go_mod
elif parser_name == 'cargo':
parser = parse_cargo_toml
else:
continue
for fp in paths:
try:
content = fp.read_text(encoding='utf-8', errors='replace')
files_scanned += 1
rel = fp.relative_to(repo_path)
for dep in parser(content):
dep['source'] = pattern
dep['file'] = str(rel)
dep['repo'] = repo_name
all_deps.append(dep)
except Exception as e:
print(f" [WARN] Could not parse {fp}: {e}", file=sys.stderr)
return {
'repo': repo_name,
'path': str(repo_path),
'files_scanned': files_scanned,
'dependencies': all_deps,
'dependency_count': len(all_deps),
}
def scan_repos(repos: List[Path]) -> Dict[str, Any]:
"""Scan multiple repos and aggregate."""
results = {}
total_deps = 0
total_files = 0
for repo in repos:
if not repo.is_dir():
print(f"[WARN] Skipping {repo}: not a directory", file=sys.stderr)
continue
print(f"Scanning {repo.name}...", file=sys.stderr)
result = scan_repo(repo)
results[repo.name] = result
total_deps += result['dependency_count']
total_files += result['files_scanned']
return {
'repos': results,
'summary': {
'total_repos': len(results),
'total_files_scanned': total_files,
'total_dependencies': total_deps,
}
}
# ─── Output ─────────────────────────────────────────────────────────────────
def output_json(data: Dict[str, Any], out_path: Optional[Path] = None) -> None:
text = json.dumps(data, indent=2)
if out_path:
out_path.write_text(text)
print(f"Written: {out_path}", file=sys.stderr)
else:
print(text)
def output_markdown(data: Dict[str, Any], out_path: Optional[Path] = None) -> None:
lines = []
lines.append("# Dependency Inventory")
lines.append("\nGenerated: *(TODO: add timestamp)*")
lines.append(f"\n**Summary:** {data['summary']['total_dependencies']} dependencies across {data['summary']['total_repos']} repos")
lines.append("")
lines.append("| Repo | File | Package | Version |")
lines.append("|------|------|---------|---------|")
for repo_name, rdata in sorted(data['repos'].items()):
for dep in sorted(rdata['dependencies'], key=lambda d: d['package']):
lines.append(f"| {repo_name} | {dep['file']} | {dep['package']} | {dep['version']} |")
text = '\n'.join(lines) + '\n'
if out_path:
out_path.write_text(text)
print(f"Written: {out_path}", file=sys.stderr)
else:
print(text)
# ─── CLI Entry ────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Generate org-wide dependency inventory")
parser.add_argument('--repos-dir', help='Directory containing multiple repos')
parser.add_argument('--repos', help='Comma-separated list of repo paths')
parser.add_argument('--output', '-o', help='Output file (default: stdout)')
parser.add_argument('--format', choices=['json', 'markdown'], default='json',
help='Output format (default: json)')
args = parser.parse_args()
if args.repos:
repo_paths = [Path(p.strip()).expanduser() for p in args.repos.split(',')]
elif args.repos_dir:
base = Path(args.repos_dir).expanduser()
repo_paths = [p for p in base.iterdir() if p.is_dir() and not p.name.startswith('.')]
else:
repo_paths = [Path(__file__).resolve().parent.parent]
out_path = Path(args.output).expanduser() if args.output else None
data = scan_repos(repo_paths)
if args.format == 'json':
output_json(data, out_path)
else:
output_markdown(data, out_path)
if __name__ == '__main__':
main()

288
scripts/genome_diff.py Executable file
View File

@@ -0,0 +1,288 @@
#!/usr/bin/env python3
"""
Codebase Genome Diff — Detect structural changes between two versions.
Compares two git refs (commits, branches, tags) and produces a human-readable
report of structural changes:
• Added/removed/renamed files
• Changed functions/classes (signature modifications)
• New dependencies (imports, requirements, etc.)
Usage:
python3 scripts/genome_diff.py --ref1 <commit1> --ref2 <commit2>
python3 scripts/genome_diff.py --ref1 main --ref2 feature-branch
python3 scripts/genome_diff.py --ref1 v1.0 --ref2 v2.0 --output report.txt
"""
import argparse
import json
import os
import re
import subprocess
import sys
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, SCRIPT_DIR)
from diff_analyzer import DiffAnalyzer, ChangeCategory
@dataclass
class FunctionChange:
file: str
name: str
kind: str # 'function' or 'class'
change_type: str # 'added' or 'removed' (simplified)
old_line: Optional[int] = None
new_line: Optional[int] = None
@dataclass
class DependencyChange:
file: str
module: str
change_type: str # 'added' or 'removed' or 'modified'
line: int = 0
@dataclass
class GenomeDiffReport:
ref1: str
ref2: str
file_changes: List[Dict[str, Any]] = field(default_factory=list)
function_changes: List[FunctionChange] = field(default_factory=list)
dependency_changes: List[DependencyChange] = field(default_factory=list)
total_files_changed: int = 0
total_functions_changed: int = 0
total_dependencies_changed: int = 0
def to_dict(self) -> Dict[str, Any]:
return {
"ref1": self.ref1,
"ref2": self.ref2,
"summary": {
"files": self.total_files_changed,
"functions": self.total_functions_changed,
"dependencies": self.total_dependencies_changed,
},
"file_changes": self.file_changes,
"function_changes": [fc.__dict__ for fc in self.function_changes],
"dependency_changes": [dc.__dict__ for dc in self.dependency_changes],
}
def human_report(self) -> str:
lines = []
lines.append(f"Codebase Genome Diff: {self.ref1}{self.ref2}")
lines.append("=" * 60)
lines.append(f" Files changed: {self.total_files_changed}")
lines.append(f" Functions changed: {self.total_functions_changed}")
lines.append(f" Dependencies changed: {self.total_dependencies_changed}")
lines.append("")
for fc in self.file_changes:
kind = []
if fc.get('is_new'):
kind.append("NEW")
if fc.get('is_deleted'):
kind.append("DELETED")
if fc.get('is_renamed'):
kind.append("RENAMED")
if fc.get('is_binary'):
kind.append("BINARY")
kind_str = f" [{', '.join(kind)}]" if kind else ""
lines.append(f" {fc['path']}{kind_str} (+{fc['added_lines']}/-{fc['deleted_lines']})")
lines.append("")
for fc in self.function_changes:
op = {'added': '+', 'removed': '-', 'modified': '~'}.get(fc.change_type, '?')
lines.append(f" [{op}] {fc.file}: {fc.kind} '{fc.name}'")
lines.append("")
for dc in self.dependency_changes:
op = '+' if dc.change_type == 'added' else '-'
lines.append(f" [{op}] {dc.file}: {dc.module}")
lines.append("")
return "\n".join(lines)
def run_git_diff(ref1: str, ref2: str) -> str:
result = subprocess.run(
['git', 'diff', '--unified=0', f'{ref1}...{ref2}'],
capture_output=True, text=True, cwd=SCRIPT_DIR
)
if result.returncode not in (0, 1):
print(f"git diff failed: {result.stderr}", file=sys.stderr)
sys.exit(1)
return result.stdout
def extract_function_changes(diff_text: str) -> List[FunctionChange]:
changes: List[FunctionChange] = []
pattern = re.compile(r'^([+\-])\s*(def|class)\s+(\w+)', re.MULTILINE)
hunk_header_re = re.compile(r'^@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@')
current_old_line: Optional[int] = None
current_new_line: Optional[int] = None
for line in diff_text.split('\n'):
hdr = hunk_header_re.match(line)
if hdr:
current_old_line = int(hdr.group(1))
current_new_line = int(hdr.group(3))
continue
m = pattern.match(line)
if m:
op = m.group(1)
kind = m.group(2)
name = m.group(3)
change_type = "added" if op == '+' else "removed"
line_num = current_new_line if change_type == "added" else current_old_line
changes.append(FunctionChange(
file="<unknown>",
name=name,
kind=kind,
change_type=change_type,
new_line=line_num if change_type == "added" else None,
old_line=line_num if change_type == "removed" else None,
))
# Advance line counters heuristically
if op == '-':
if current_old_line is not None:
current_old_line += 1
elif op == '+':
if current_new_line is not None:
current_new_line += 1
elif line.startswith(' '):
if current_old_line is not None:
current_old_line += 1
if current_new_line is not None:
current_new_line += 1
# lines starting with other prefixes (like \\ No newline) ignored
return changes
def extract_dependency_changes(diff_text: str, analyzer: DiffAnalyzer) -> List[DependencyChange]:
changes: List[DependencyChange] = []
import_pattern = re.compile(
r'^([+\-])\s*(?:import\s+([\w\.]+)|from\s+([\w\.]+)\s+import)',
re.MULTILINE
)
file_diffs = analyzer._split_files(diff_text)
for file_diff in file_diffs:
file_match = re.search(r'^diff --git a/.*? b/(.*?)$', file_diff, re.MULTILINE)
if not file_match:
continue
filepath = file_match.group(1)
# Scan each line for import changes
for line in file_diff.split('\n'):
m = import_pattern.match(line)
if m:
change_type = "added" if m.group(1) == '+' else "removed"
module = m.group(2) or m.group(3)
changes.append(DependencyChange(
file=filepath,
module=module,
change_type=change_type,
line=0
))
# Detect if this file is a dependency manifest
req_file_pattern = re.compile(
r'^[\+\-].*?(requirements(.*?)\.txt|pyproject\.toml|setup\.py|Pipfile)'
)
if any(req_file_pattern.match(line) for line in file_diff.split('\n')):
if not any(c.file == filepath and c.module == "<file>" for c in changes):
changes.append(DependencyChange(
file=filepath,
module="<file>",
change_type="modified",
line=0
))
return changes
def correlate_function_changes_with_files(diff_text: str, functions: List[FunctionChange]) -> List[FunctionChange]:
result: List[FunctionChange] = []
# Split diff into per-file sections
file_sections: List[tuple[str, str]] = []
current_file: Optional[str] = None
current_lines: List[str] = []
for line in diff_text.split('\n'):
if line.startswith('diff --git'):
if current_file is not None:
file_sections.append((current_file, '\n'.join(current_lines)))
m = re.match(r'^diff --git a/.*? b/(.*?)$', line)
current_file = m.group(1) if m else "unknown"
current_lines = [line]
else:
current_lines.append(line)
if current_file is not None:
file_sections.append((current_file, '\n'.join(current_lines)))
pattern = re.compile(r'^([+\-])\s*(def|class)\s+(\w+)', re.MULTILINE)
for filepath, section in file_sections:
for m in pattern.finditer(section):
op = m.group(1)
kind = m.group(2)
name = m.group(3)
change_type = "added" if op == '+' else "removed"
result.append(FunctionChange(
file=filepath,
name=name,
kind=kind,
change_type=change_type
))
return result
def main():
parser = argparse.ArgumentParser(description="Codebase Genome Diff — structural changes between versions")
parser.add_argument("--ref1", required=True, help="First git ref (commit, branch, tag)")
parser.add_argument("--ref2", required=True, help="Second git ref")
parser.add_argument("--output", help="Write report to file")
parser.add_argument("--json", action="store_true", help="Output JSON instead of human report")
args = parser.parse_args()
try:
diff_text = run_git_diff(args.ref1, args.ref2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if not diff_text.strip():
print(f"No differences between {args.ref1} and {args.ref2}.")
sys.exit(0)
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff_text)
file_changes = [fc.to_dict() for fc in summary.files]
func_changes = extract_function_changes(diff_text)
func_changes = correlate_function_changes_with_files(diff_text, func_changes)
dep_changes = extract_dependency_changes(diff_text, analyzer)
report = GenomeDiffReport(
ref1=args.ref1,
ref2=args.ref2,
file_changes=file_changes,
function_changes=func_changes,
dependency_changes=dep_changes,
total_files_changed=len(file_changes),
total_functions_changed=len(func_changes),
total_dependencies_changed=len(dep_changes),
)
output = json.dumps(report.to_dict(), indent=2) if args.json else report.human_report()
if args.output:
with open(args.output, 'w') as f:
f.write(output + '\n')
print(f"Report written to {args.output}")
else:
print(output)
if __name__ == '__main__':
main()

View File

@@ -1,52 +0,0 @@
"""
Tests for scripts/dependency_inventory.py
"""
import unittest
import json
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from scripts.dependency_inventory import (
parse_requirements,
parse_package_json,
parse_pyproject_toml,
scan_repo,
)
class TestParseRequirements(unittest.TestCase):
def test_parses_simple_requirement(self):
result = parse_requirements("requests>=2.33.0")
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["package"], "requests")
def test_parses_version_range(self):
result = parse_requirements("pytest>=8,<9")
self.assertEqual(result[0]["package"], "pytest")
class TestParsePackageJson(unittest.TestCase):
def test_parses_dependencies(self):
content = json.dumps({"name": "test", "dependencies": {"react": "^18.2.0"}})
result = parse_package_json(content)
self.assertTrue(any(d["package"] == "react" for d in result))
class TestParsePyprojectToml(unittest.TestCase):
def test_parses_project_dependencies(self):
content = "\n[project]\nname = \"test\"\ndependencies = [\n \"openai>=2.21.0,<3\",\n]"
result = parse_pyproject_toml(content)
self.assertEqual(len(result), 1)
class TestScanRepo(unittest.TestCase):
def test_scans_local_repo(self):
result = scan_repo(Path(__file__).resolve().parents[1])
self.assertGreater(result["dependency_count"], 0)
if __name__ == "__main__":
unittest.main()