Compare commits

..

1 Commits

Author SHA1 Message Date
Step35
1470b44c3b feat: add codebase genome diff script for structural change detection
Some checks failed
Test / pytest (pull_request) Failing after 9s
Introduces genome_diff.py — a tool for detecting structural changes between
two git refs: file-level changes, function/class signature modifications,
and dependency import changes.

Addresses #132.
2026-04-26 09:46:04 -04:00
3 changed files with 288 additions and 383 deletions

288
scripts/genome_diff.py Executable file
View File

@@ -0,0 +1,288 @@
#!/usr/bin/env python3
"""
Codebase Genome Diff — Detect structural changes between two versions.
Compares two git refs (commits, branches, tags) and produces a human-readable
report of structural changes:
• Added/removed/renamed files
• Changed functions/classes (signature modifications)
• New dependencies (imports, requirements, etc.)
Usage:
python3 scripts/genome_diff.py --ref1 <commit1> --ref2 <commit2>
python3 scripts/genome_diff.py --ref1 main --ref2 feature-branch
python3 scripts/genome_diff.py --ref1 v1.0 --ref2 v2.0 --output report.txt
"""
import argparse
import json
import os
import re
import subprocess
import sys
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, SCRIPT_DIR)
from diff_analyzer import DiffAnalyzer, ChangeCategory
@dataclass
class FunctionChange:
file: str
name: str
kind: str # 'function' or 'class'
change_type: str # 'added' or 'removed' (simplified)
old_line: Optional[int] = None
new_line: Optional[int] = None
@dataclass
class DependencyChange:
file: str
module: str
change_type: str # 'added' or 'removed' or 'modified'
line: int = 0
@dataclass
class GenomeDiffReport:
ref1: str
ref2: str
file_changes: List[Dict[str, Any]] = field(default_factory=list)
function_changes: List[FunctionChange] = field(default_factory=list)
dependency_changes: List[DependencyChange] = field(default_factory=list)
total_files_changed: int = 0
total_functions_changed: int = 0
total_dependencies_changed: int = 0
def to_dict(self) -> Dict[str, Any]:
return {
"ref1": self.ref1,
"ref2": self.ref2,
"summary": {
"files": self.total_files_changed,
"functions": self.total_functions_changed,
"dependencies": self.total_dependencies_changed,
},
"file_changes": self.file_changes,
"function_changes": [fc.__dict__ for fc in self.function_changes],
"dependency_changes": [dc.__dict__ for dc in self.dependency_changes],
}
def human_report(self) -> str:
lines = []
lines.append(f"Codebase Genome Diff: {self.ref1}{self.ref2}")
lines.append("=" * 60)
lines.append(f" Files changed: {self.total_files_changed}")
lines.append(f" Functions changed: {self.total_functions_changed}")
lines.append(f" Dependencies changed: {self.total_dependencies_changed}")
lines.append("")
for fc in self.file_changes:
kind = []
if fc.get('is_new'):
kind.append("NEW")
if fc.get('is_deleted'):
kind.append("DELETED")
if fc.get('is_renamed'):
kind.append("RENAMED")
if fc.get('is_binary'):
kind.append("BINARY")
kind_str = f" [{', '.join(kind)}]" if kind else ""
lines.append(f" {fc['path']}{kind_str} (+{fc['added_lines']}/-{fc['deleted_lines']})")
lines.append("")
for fc in self.function_changes:
op = {'added': '+', 'removed': '-', 'modified': '~'}.get(fc.change_type, '?')
lines.append(f" [{op}] {fc.file}: {fc.kind} '{fc.name}'")
lines.append("")
for dc in self.dependency_changes:
op = '+' if dc.change_type == 'added' else '-'
lines.append(f" [{op}] {dc.file}: {dc.module}")
lines.append("")
return "\n".join(lines)
def run_git_diff(ref1: str, ref2: str) -> str:
result = subprocess.run(
['git', 'diff', '--unified=0', f'{ref1}...{ref2}'],
capture_output=True, text=True, cwd=SCRIPT_DIR
)
if result.returncode not in (0, 1):
print(f"git diff failed: {result.stderr}", file=sys.stderr)
sys.exit(1)
return result.stdout
def extract_function_changes(diff_text: str) -> List[FunctionChange]:
changes: List[FunctionChange] = []
pattern = re.compile(r'^([+\-])\s*(def|class)\s+(\w+)', re.MULTILINE)
hunk_header_re = re.compile(r'^@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@')
current_old_line: Optional[int] = None
current_new_line: Optional[int] = None
for line in diff_text.split('\n'):
hdr = hunk_header_re.match(line)
if hdr:
current_old_line = int(hdr.group(1))
current_new_line = int(hdr.group(3))
continue
m = pattern.match(line)
if m:
op = m.group(1)
kind = m.group(2)
name = m.group(3)
change_type = "added" if op == '+' else "removed"
line_num = current_new_line if change_type == "added" else current_old_line
changes.append(FunctionChange(
file="<unknown>",
name=name,
kind=kind,
change_type=change_type,
new_line=line_num if change_type == "added" else None,
old_line=line_num if change_type == "removed" else None,
))
# Advance line counters heuristically
if op == '-':
if current_old_line is not None:
current_old_line += 1
elif op == '+':
if current_new_line is not None:
current_new_line += 1
elif line.startswith(' '):
if current_old_line is not None:
current_old_line += 1
if current_new_line is not None:
current_new_line += 1
# lines starting with other prefixes (like \\ No newline) ignored
return changes
def extract_dependency_changes(diff_text: str, analyzer: DiffAnalyzer) -> List[DependencyChange]:
changes: List[DependencyChange] = []
import_pattern = re.compile(
r'^([+\-])\s*(?:import\s+([\w\.]+)|from\s+([\w\.]+)\s+import)',
re.MULTILINE
)
file_diffs = analyzer._split_files(diff_text)
for file_diff in file_diffs:
file_match = re.search(r'^diff --git a/.*? b/(.*?)$', file_diff, re.MULTILINE)
if not file_match:
continue
filepath = file_match.group(1)
# Scan each line for import changes
for line in file_diff.split('\n'):
m = import_pattern.match(line)
if m:
change_type = "added" if m.group(1) == '+' else "removed"
module = m.group(2) or m.group(3)
changes.append(DependencyChange(
file=filepath,
module=module,
change_type=change_type,
line=0
))
# Detect if this file is a dependency manifest
req_file_pattern = re.compile(
r'^[\+\-].*?(requirements(.*?)\.txt|pyproject\.toml|setup\.py|Pipfile)'
)
if any(req_file_pattern.match(line) for line in file_diff.split('\n')):
if not any(c.file == filepath and c.module == "<file>" for c in changes):
changes.append(DependencyChange(
file=filepath,
module="<file>",
change_type="modified",
line=0
))
return changes
def correlate_function_changes_with_files(diff_text: str, functions: List[FunctionChange]) -> List[FunctionChange]:
result: List[FunctionChange] = []
# Split diff into per-file sections
file_sections: List[tuple[str, str]] = []
current_file: Optional[str] = None
current_lines: List[str] = []
for line in diff_text.split('\n'):
if line.startswith('diff --git'):
if current_file is not None:
file_sections.append((current_file, '\n'.join(current_lines)))
m = re.match(r'^diff --git a/.*? b/(.*?)$', line)
current_file = m.group(1) if m else "unknown"
current_lines = [line]
else:
current_lines.append(line)
if current_file is not None:
file_sections.append((current_file, '\n'.join(current_lines)))
pattern = re.compile(r'^([+\-])\s*(def|class)\s+(\w+)', re.MULTILINE)
for filepath, section in file_sections:
for m in pattern.finditer(section):
op = m.group(1)
kind = m.group(2)
name = m.group(3)
change_type = "added" if op == '+' else "removed"
result.append(FunctionChange(
file=filepath,
name=name,
kind=kind,
change_type=change_type
))
return result
def main():
parser = argparse.ArgumentParser(description="Codebase Genome Diff — structural changes between versions")
parser.add_argument("--ref1", required=True, help="First git ref (commit, branch, tag)")
parser.add_argument("--ref2", required=True, help="Second git ref")
parser.add_argument("--output", help="Write report to file")
parser.add_argument("--json", action="store_true", help="Output JSON instead of human report")
args = parser.parse_args()
try:
diff_text = run_git_diff(args.ref1, args.ref2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if not diff_text.strip():
print(f"No differences between {args.ref1} and {args.ref2}.")
sys.exit(0)
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff_text)
file_changes = [fc.to_dict() for fc in summary.files]
func_changes = extract_function_changes(diff_text)
func_changes = correlate_function_changes_with_files(diff_text, func_changes)
dep_changes = extract_dependency_changes(diff_text, analyzer)
report = GenomeDiffReport(
ref1=args.ref1,
ref2=args.ref2,
file_changes=file_changes,
function_changes=func_changes,
dependency_changes=dep_changes,
total_files_changed=len(file_changes),
total_functions_changed=len(func_changes),
total_dependencies_changed=len(dep_changes),
)
output = json.dumps(report.to_dict(), indent=2) if args.json else report.human_report()
if args.output:
with open(args.output, 'w') as f:
f.write(output + '\n')
print(f"Report written to {args.output}")
else:
print(output)
if __name__ == '__main__':
main()

View File

@@ -1,258 +0,0 @@
#!/usr/bin/env python3
"""GitHub Trending Scanner — Scan trending repos in AI/ML.
Extracts: repo description, stars, key features (topics, inferred highlights).
Filters by language and/or topic. Outputs dated JSON for daily scan pipeline.
Usage:
python3 github_trending_scanner.py --language python --topic ai --output metrics/trending
python3 github_trending_scanner.py --topic machine-learning --limit 50
python3 github_trending_scanner.py --language rust --topic artificial-intelligence
"""
import argparse
import json
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List, Dict
import urllib.request
import urllib.parse
import urllib.error
GITHUB_API_BASE = os.environ.get("GITHUB_API_BASE", "https://api.github.com")
DEFAULT_OUTPUT_DIR = os.environ.get("TRENDING_OUTPUT_DIR", "metrics/trending")
DEFAULT_LIMIT = int(os.environ.get("TRENDING_LIMIT", "30"))
DEFAULT_MIN_STARS = int(os.environ.get("TRENDING_MIN_STARS", "1000"))
def fetch_trending_repos(
language: Optional[str] = None,
topic: Optional[str] = None,
min_stars: int = DEFAULT_MIN_STARS,
limit: int = DEFAULT_LIMIT,
) -> List[Dict]:
"""Fetch trending-like repositories from GitHub using the search API.
GitHub's public search API is unauthenticated-rate-limited (60 req/hr).
This function retries on rate-limit backoff and falls back gracefully.
"""
# Build search query: stars threshold + optional language/topic filters
query = f"stars:>{min_stars}"
if language:
query += f" language:{language}"
if topic:
query += f" topic:{topic}"
# Sort by stars descending as a proxy for trending/popular
params = {
"q": query,
"sort": "stars",
"order": "desc",
"per_page": min(limit, 100), # GitHub max per_page is 100
}
url = f"{GITHUB_API_BASE}/search/repositories?{urllib.parse.urlencode(params)}"
headers = {
"Accept": "application/vnd.github.v3+json",
"User-Agent": "Sovereign-Trending-Scanner/1.0",
}
for attempt in range(3):
try:
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=30) as resp:
if resp.status != 200:
raise RuntimeError(f"GitHub API returned {resp.status}")
data = json.loads(resp.read().decode("utf-8"))
return data.get("items", [])[:limit]
except urllib.error.HTTPError as e:
if e.code == 403:
# Check for rate limit message
body = e.read().decode("utf-8", errors="replace").lower()
if "rate limit" in body or "api rate limit exceeded" in body:
reset_ts = int(e.headers.get("X-RateLimit-Reset", 0))
wait_seconds = max(5, reset_ts - int(time.time()) + 5)
print(f"Rate limit exceeded — waiting {wait_seconds}s (attempt {attempt+1}/3)...", file=sys.stderr)
time.sleep(wait_seconds)
continue
print(f"ERROR: GitHub API request failed: {e}{e.read().decode('utf-8', errors='replace')[:200]}", file=sys.stderr)
return []
except Exception as e:
if attempt < 2:
backoff = 2 ** attempt
print(f"WARNING: Fetch attempt {attempt+1} failed: {e} — retrying in {backoff}s", file=sys.stderr)
time.sleep(backoff)
continue
print(f"ERROR: All fetch attempts failed: {e}", file=sys.stderr)
return []
return []
def extract_repo_features(repo_data: Dict) -> Dict:
"""Extract structured fields for a trending repo."""
description = (repo_data.get("description") or "").strip()
topics = repo_data.get("topics", [])
# Infer key features from description and topics
features = infer_features(description, topics)
return {
"name": repo_data.get("full_name", ""),
"description": description,
"stars": repo_data.get("stargazers_count", 0),
"forks": repo_data.get("forks_count", 0),
"open_issues": repo_data.get("open_issues_count", 0),
"language": repo_data.get("language", ""),
"topics": topics,
"url": repo_data.get("html_url", ""),
"created_at": repo_data.get("created_at", ""),
"updated_at": repo_data.get("updated_at", ""),
"key_features": features,
"scanned_at": datetime.now(timezone.utc).isoformat(),
}
def infer_features(description: str, topics: List[str]) -> List[str]:
"""Infer notable capabilities/features from repo metadata.
Looks for AI/ML-relevant capabilities in topics and description.
"""
features = []
text = (description + " " + " ".join(topics)).lower()
# Domain capabilities (keys normalized to lowercase for consistency)
capability_keywords = {
"fine-tuning": ["fine-tun", "finetun"],
"agent framework": ["agent"],
"local/offline": ["local", "on-device", "offline"],
"quantized models": ["quantized", "quantization", "gguf", "gptq"],
"vision": ["vision", "multimodal", "image", "visual"],
"speech/audio": ["speech", "audio", "whisper", "tts"],
"retrieval/rag": ["rag", "retrieval", "embedding", "vector"],
"training": ["train", "training", "sft", "dpo"],
"gui/playground": ["gui", "playground", "webui", "interface"],
"sota": ["state-of-the-art", "sota", "latest"],
}
for label, keywords in capability_keywords.items():
if any(kw in text for kw in keywords):
features.append(label)
# Also include non-generic topics as features
generic_topics = {"ai", "ml", "machine-learning", "deep-learning", "llm", "python", "pytorch", "tensorflow"}
for topic in topics:
if topic.lower() not in generic_topics:
features.append(topic)
# Deduplicate while preserving order, return up to 10
seen = set()
unique = []
for f in features:
key = f.lower()
if key not in seen:
seen.add(key)
unique.append(f)
return unique[:10]
def save_trending(repos: List[Dict], output_dir: str = "metrics/trending") -> str:
"""Save trending results to a dated JSON file.
Returns the path of the written file.
"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
filename = output_path / f"github-trending-{date_str}.json"
output_data = {
"scanned_at": datetime.now(timezone.utc).isoformat(),
"count": len(repos),
"repos": repos,
}
with open(filename, "w") as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
return str(filename)
def main() -> None:
parser = argparse.ArgumentParser(
description="Scan GitHub trending repositories in AI/ML"
)
parser.add_argument(
"--language",
help="Filter by programming language (e.g., python, rust, go)",
)
parser.add_argument(
"--topic",
help="Filter by GitHub topic (e.g., ai, machine-learning, llm)",
)
parser.add_argument(
"--since",
default="daily",
choices=["daily", "weekly", "monthly"],
help="Trending period (daily/weekly/monthly) — informational only",
)
parser.add_argument(
"--output",
default="metrics/trending",
help="Output directory for results (default: metrics/trending)",
)
parser.add_argument(
"--limit",
type=int,
default=DEFAULT_LIMIT,
help=f"Maximum repos to fetch (default: {DEFAULT_LIMIT})",
)
parser.add_argument(
"--min-stars",
type=int,
default=DEFAULT_MIN_STARS,
help=f"Minimum star count for relevance (default: {DEFAULT_MIN_STARS})",
)
args = parser.parse_args()
print(
f"Fetching trending repos "
f"(language={args.language or 'any'}, topic={args.topic or 'any'}, period={args.since})..."
)
repos_raw = fetch_trending_repos(
language=args.language,
topic=args.topic,
min_stars=args.min_stars,
limit=args.limit,
)
if not repos_raw:
print("WARNING: No repos fetched — check network or rate limits", file=sys.stderr)
repos = [extract_repo_features(r) for r in repos_raw]
output_file = save_trending(repos, args.output)
print(f"Saved {len(repos)} trending repos to {output_file}")
# Brief human-readable summary
if repos:
print("\nTop repos:")
for repo in repos[:5]:
features_preview = ", ".join(repo["key_features"][:3])
print(f"{repo['stars']:>7} {repo['name']}")
if repo["description"]:
desc = repo["description"][:80]
print(f" {desc}{'...' if len(repo['description']) > 80 else ''}")
if features_preview:
print(f" Features: {features_preview}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,125 +0,0 @@
#!/usr/bin/env python3
"""Tests for github_trending_scanner.py — pure function validation.
Tests the feature inference, extraction, and output formatting logic
without relying on external GitHub API calls.
"""
import json
import sys
import tempfile
from pathlib import Path
# Add scripts dir to path for import
sys.path.insert(0, str(Path(__file__).resolve().parent))
from github_trending_scanner import (
extract_repo_features,
infer_features,
save_trending,
)
def test_infer_features_from_description():
"""Feature inference extracts capabilities from description text."""
desc = "A local, quantized LLM framework for fine-tuning and agent-based RAG with vision."
topics = ["ai", "llm"]
features = infer_features(desc, topics)
# Should include relevant capabilities (case-insensitive comparison)
expected_lower = {"fine-tuning", "local/offline", "quantized models", "agent framework", "vision", "retrieval/rag"}
actual_lower = set(f.lower() for f in features)
assert expected_lower.issubset(actual_lower), f"Missing features. Expected subset of {expected_lower}, got {actual_lower}"
print("PASS: infer_features_from_description")
def test_infer_features_from_topics_only():
"""Topics alone can drive feature detection."""
desc = ""
topics = ["computer-vision", "speech", "pytorch"]
features = infer_features(desc, topics)
# Non-generic topics should appear as features (topics preserved as-is)
assert "computer-vision" in features, f"Expected 'computer-vision' in {features}"
assert "speech" in features, f"Expected 'speech' in {features}"
# Generic topics (pytorch) may be filtered
print(f"PASS: infer_features_from_topics_only → {features}")
def test_extract_repo_features_produces_valid_structure():
"""extract_repo_features returns all required fields."""
mock_repo = {
"full_name": "example/repo",
"description": "An example repository",
"stargazers_count": 1234,
"forks_count": 56,
"open_issues_count": 7,
"language": "Python",
"topics": ["ai", "llm"],
"html_url": "https://github.com/example/repo",
"created_at": "2025-01-01T00:00:00Z",
"updated_at": "2026-01-01T00:00:00Z",
}
result = extract_repo_features(mock_repo)
assert result["name"] == "example/repo"
assert result["description"] == "An example repository"
assert result["stars"] == 1234
assert isinstance(result["key_features"], list)
assert "scanned_at" in result
assert result["url"] == "https://github.com/example/repo"
print("PASS: extract_repo_features_structure")
def test_save_trending_creates_dated_json():
"""save_trending writes a valid JSON file with the expected schema."""
repos = [
{
"name": "test/repo",
"description": "Test repository",
"stars": 999,
"language": "Python",
"topics": ["test"],
"key_features": ["testing"],
"scanned_at": "2026-04-26T00:00:00+00:00",
}
]
with tempfile.TemporaryDirectory() as tmp:
output_file = save_trending(repos, output_dir=tmp)
path = Path(output_file)
assert path.exists(), f"Output file not created: {output_file}"
with open(path) as f:
data = json.load(f)
assert "scanned_at" in data
assert data["count"] == 1
assert isinstance(data["repos"], list)
assert data["repos"][0]["name"] == "test/repo"
print(f"PASS: save_trending → {output_file}")
def test_save_trending_respects_output_dir_creation():
"""Output directory is created if it doesn't exist."""
repos = []
with tempfile.TemporaryDirectory() as tmp:
nested = Path(tmp) / "nested" / "trending"
assert not nested.exists()
output_file = save_trending(repos, output_dir=str(nested))
assert nested.exists()
assert Path(output_file).exists()
print("PASS: output_dir_creation")
if __name__ == "__main__":
test_infer_features_from_description()
test_infer_features_from_topics_only()
test_extract_repo_features_produces_valid_structure()
test_save_trending_creates_dated_json()
test_save_trending_respects_output_dir_creation()
print("\nAll github_trending_scanner tests passed.")