diff --git a/scripts/release_note_analyzer.py b/scripts/release_note_analyzer.py new file mode 100755 index 0000000..8a3e565 --- /dev/null +++ b/scripts/release_note_analyzer.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 +""" +Release Note Analyzer — Monitor dependency releases and extract structured insights. + +Fetches GitHub releases for configured repositories, parses changelogs, +categorizes changes, and flags breaking changes. + +Usage: + python3 scripts/release_note_analyzer.py --repos owner/repo1,owner/repo2 + python3 scripts/release_note_analyzer.py --repos numpy/numpy --limit 5 + python3 scripts/release_note_analyzer.py --repos owner/repo --output metrics/releases.json + python3 scripts/release_note_analyzer.py --repos owner/repo --token $GITHUB_TOKEN + +Output: + JSON with per-release structure: version, date, url, categories (features, fixes, breaking), raw_body +""" + +import argparse +import json +import re +import sys +from datetime import datetime, timezone +from typing import Dict, List, Any, Optional +from dataclasses import dataclass, field, asdict +import os + + +@dataclass +class ReleaseAnalysis: + version: str + date: str + url: str + categories: Dict[str, List[str]] = field(default_factory=dict) + breaking_change_flags: List[str] = field(default_factory=list) + raw_body: str = "" + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +def fetch_github_releases(repo: str, token: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]: + """Fetch latest releases from GitHub API.""" + import urllib.request + import urllib.error + + url = f"https://api.github.com/repos/{repo}/releases?per_page={limit}" + headers = {"Accept": "application/vnd.github.v3+json"} + if token: + headers["Authorization"] = f"token {token}" + + req = urllib.request.Request(url, headers=headers) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + data = json.loads(resp.read()) + return data + except urllib.error.HTTPError as e: + print(f"Error fetching releases for {repo}: HTTP {e.code}", file=sys.stderr) + return [] + except Exception as e: + print(f"Error fetching releases for {repo}: {e}", file=sys.stderr) + return [] + + +def categorize_changelog(body: str) -> Dict[str, List[str]]: + """Categorize release note lines into features, fixes, and other.""" + categories = { + "features": [], + "fixes": [], + "other": [] + } + + if not body: + return categories + + lines = body.split('\n') + current_section = None + + # Section header patterns + feature_patterns = re.compile(r'^(?:features?|new|add|enhancement)s?', re.IGNORECASE) + fix_patterns = re.compile(r'^(?:fix(?:es|ed)?|bug|patch|correction)', re.IGNORECASE) + + for line in lines: + stripped = line.strip() + if not stripped: + continue + + # Check for section headers (e.g., "### Features", "## Added") + header_match = re.match(r'^#{1,3}\s+(.+)$', stripped) + if header_match: + header = header_match.group(1).lower() + if feature_patterns.search(header): + current_section = "features" + elif fix_patterns.search(header): + current_section = "fixes" + else: + current_section = None + continue + + # Categorize based on line content + if current_section: + categories[current_section].append(stripped) + else: + # Infer from keywords + if re.search(r'^(?:added|new|feature|introdu)', stripped, re.IGNORECASE): + categories["features"].append(stripped) + elif re.search(r'^(?:fix|bug|patch|resolved)', stripped, re.IGNORECASE): + categories["fixes"].append(stripped) + else: + categories["other"].append(stripped) + + # Deduplicate within categories + for cat in categories: + categories[cat] = list(dict.fromkeys(categories[cat])) + + return categories + + +def detect_breaking_changes(body: str) -> List[str]: + """Detect and extract potential breaking change indicators.""" + breaking_indicators = [] + lines = body.split('\n') + + # Keywords that suggest breaking changes + breaking_keywords = re.compile( + r'\b(?:BREAKING|breaking\s+change|backward\s+incompatible|' + r'removed\s+.*?API|deprecated.*?removed|' + r'major\s+version|' + r'not\s+backward\s+compatible)\b', + re.IGNORECASE + ) + + for line in lines: + if breaking_keywords.search(line): + breaking_indicators.append(line.strip()) + + return breaking_indicators + + +def analyze_releases( repos: List[str], token: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]: + """Fetch and analyze releases for all configured repos.""" + all_releases = [] + + for repo in repos: + repo = repo.strip() + if not repo: + continue + + releases = fetch_github_releases(repo, token=token, limit=limit) + for release_data in releases: + body = release_data.get('body') or "" + tag = release_data.get('tag_name', 'unknown') + date = release_data.get('published_at', '') + url = release_data.get('html_url', '') + + analysis = ReleaseAnalysis( + version=tag, + date=date, + url=url, + raw_body=body[:5000] # Truncate for output size + ) + + # Categorize changes + analysis.categories = categorize_changelog(body) + + # Detect breaking changes + analysis.breaking_change_flags = detect_breaking_changes(body) + + all_releases.append(analysis.to_dict()) + + return all_releases + + +def main(): + parser = argparse.ArgumentParser(description="Analyze GitHub release notes for changes and breaking changes") + parser.add_argument('--repos', required=True, help='Comma-separated list of GitHub repos (owner/repo)') + parser.add_argument('--token', help='GitHub API token (or set GITHUB_TOKEN env var)') + parser.add_argument('--limit', type=int, default=10, help='Max releases per repo (default: 10)') + parser.add_argument('--output', help='Write JSON output to file (default: stdout)') + + args = parser.parse_args() + + repos = [r.strip() for r in args.repos.split(',')] + token = args.token or os.environ.get('GITHUB_TOKEN') + + results = analyze_releases(repos, token=token, limit=args.limit) + + output = { + "generated_at": datetime.now(timezone.utc).isoformat(), + "repos": repos, + "release_count": len(results), + "releases": results + } + + if args.output: + with open(args.output, 'w') as f: + json.dump(output, f, indent=2) + print(f"Wrote {len(results)} releases to {args.output}") + else: + print(json.dumps(output, indent=2)) + + +if __name__ == '__main__': + main() diff --git a/tests/test_release_note_analyzer.py b/tests/test_release_note_analyzer.py new file mode 100644 index 0000000..1ea82f2 --- /dev/null +++ b/tests/test_release_note_analyzer.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +"""Tests for scripts/release_note_analyzer.py""" + +import json +import os +import sys +import tempfile + +sys.path.insert(0, os.path.join(os.path.dirname(__file__) or ".", "..")) +import importlib.util +spec = importlib.util.spec_from_file_location( + "release_note_analyzer", + os.path.join(os.path.dirname(__file__) or ".", "..", "scripts", "release_note_analyzer.py") +) +mod = importlib.util.module_from_spec(spec) +spec.loader.exec_module(mod) + +categorize_changelog = mod.categorize_changelog +detect_breaking_changes = mod.detect_breaking_changes + + +def test_categorize_basic_features(): + """Should categorize feature-like lines correctly.""" + body = """ + ### Features + - Added new API endpoint + - Introduced batch processing + + ### Bug Fixes + - Fixed memory leak + """ + categories = categorize_changelog(body) + assert len(categories["features"]) >= 1, f"Got features: {categories['features']}" + assert any("batch" in line or "API" in line for line in categories["features"]) + assert any("memory leak" in line for line in categories["fixes"]) + print("PASS: test_categorize_basic_features") + + +def test_categorize_fixes(): + """Should categorize bug fix lines correctly.""" + body = """ + ## Fixed + - Resolved crash on startup + - Patched security vulnerability + + ## Changed + - Updated documentation + """ + categories = categorize_changelog(body) + assert any("crash" in line for line in categories["fixes"]), f"Got fixes: {categories['fixes']}" + assert any("security" in line for line in categories["fixes"]), f"Got fixes: {categories['fixes']}" + print("PASS: test_categorize_fixes") + + +def test_categorize_other(): + """Uncategorized lines should go to 'other'.""" + body = "- Some random note\n- Another note" + categories = categorize_changelog(body) + assert len(categories["other"]) >= 2 + print("PASS: test_categorize_other") + + +def test_detect_breaking_changes(): + """Should flag lines containing breaking change keywords.""" + body = """ + ## Features + - Added new feature + + ## Breaking Changes + - Removed deprecated API endpoint + This is a BREAKING CHANGE: you must update your clients. + + We also removed support for Python 3.8. + """ + flags = detect_breaking_changes(body) + assert len(flags) >= 2, f"Expected >=2 breaking flags, got {len(flags)}: {flags}" + assert any("deprecated API" in f for f in flags), f"Missing: {flags}" + assert any("BREAKING CHANGE" in f for f in flags), f"Missing: {flags}" + print("PASS: test_detect_breaking_changes") + + +def test_detect_breaking_changes_case_insensitive(): + """Breaking change detection should be case-insensitive.""" + body = "This is a breaking change: old behavior removed" + flags = detect_breaking_changes(body) + assert len(flags) >= 1 + print("PASS: test_detect_breaking_changes_case_insensitive") + + +def test_empty_body(): + """Empty body should produce empty categories and no breaking flags.""" + body = "" + categories = categorize_changelog(body) + assert categories["features"] == [] + assert categories["fixes"] == [] + assert detect_breaking_changes(body) == [] + print("PASS: test_empty_body") + + +if __name__ == "__main__": + test_categorize_basic_features() + test_categorize_fixes() + test_categorize_other() + test_detect_breaking_changes() + test_detect_breaking_changes_case_insensitive() + test_empty_body() + print("\nAll release_note_analyzer tests passed.")