Compare commits

..

2 Commits

Author SHA1 Message Date
Rockachopa
ec76e9fec3 test(scanner): unit tests for github_trending_scanner
Some checks failed
Test / pytest (pull_request) Failing after 9s
2026-04-26 11:21:02 +00:00
38c5862737 feat(scanner): add GitHub Trending Scanner CLI for AI/ML repos 2026-04-26 11:20:51 +00:00
4 changed files with 383 additions and 324 deletions

View File

@@ -0,0 +1,258 @@
#!/usr/bin/env python3
"""GitHub Trending Scanner — Scan trending repos in AI/ML.
Extracts: repo description, stars, key features (topics, inferred highlights).
Filters by language and/or topic. Outputs dated JSON for daily scan pipeline.
Usage:
python3 github_trending_scanner.py --language python --topic ai --output metrics/trending
python3 github_trending_scanner.py --topic machine-learning --limit 50
python3 github_trending_scanner.py --language rust --topic artificial-intelligence
"""
import argparse
import json
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List, Dict
import urllib.request
import urllib.parse
import urllib.error
GITHUB_API_BASE = os.environ.get("GITHUB_API_BASE", "https://api.github.com")
DEFAULT_OUTPUT_DIR = os.environ.get("TRENDING_OUTPUT_DIR", "metrics/trending")
DEFAULT_LIMIT = int(os.environ.get("TRENDING_LIMIT", "30"))
DEFAULT_MIN_STARS = int(os.environ.get("TRENDING_MIN_STARS", "1000"))
def fetch_trending_repos(
language: Optional[str] = None,
topic: Optional[str] = None,
min_stars: int = DEFAULT_MIN_STARS,
limit: int = DEFAULT_LIMIT,
) -> List[Dict]:
"""Fetch trending-like repositories from GitHub using the search API.
GitHub's public search API is unauthenticated-rate-limited (60 req/hr).
This function retries on rate-limit backoff and falls back gracefully.
"""
# Build search query: stars threshold + optional language/topic filters
query = f"stars:>{min_stars}"
if language:
query += f" language:{language}"
if topic:
query += f" topic:{topic}"
# Sort by stars descending as a proxy for trending/popular
params = {
"q": query,
"sort": "stars",
"order": "desc",
"per_page": min(limit, 100), # GitHub max per_page is 100
}
url = f"{GITHUB_API_BASE}/search/repositories?{urllib.parse.urlencode(params)}"
headers = {
"Accept": "application/vnd.github.v3+json",
"User-Agent": "Sovereign-Trending-Scanner/1.0",
}
for attempt in range(3):
try:
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=30) as resp:
if resp.status != 200:
raise RuntimeError(f"GitHub API returned {resp.status}")
data = json.loads(resp.read().decode("utf-8"))
return data.get("items", [])[:limit]
except urllib.error.HTTPError as e:
if e.code == 403:
# Check for rate limit message
body = e.read().decode("utf-8", errors="replace").lower()
if "rate limit" in body or "api rate limit exceeded" in body:
reset_ts = int(e.headers.get("X-RateLimit-Reset", 0))
wait_seconds = max(5, reset_ts - int(time.time()) + 5)
print(f"Rate limit exceeded — waiting {wait_seconds}s (attempt {attempt+1}/3)...", file=sys.stderr)
time.sleep(wait_seconds)
continue
print(f"ERROR: GitHub API request failed: {e}{e.read().decode('utf-8', errors='replace')[:200]}", file=sys.stderr)
return []
except Exception as e:
if attempt < 2:
backoff = 2 ** attempt
print(f"WARNING: Fetch attempt {attempt+1} failed: {e} — retrying in {backoff}s", file=sys.stderr)
time.sleep(backoff)
continue
print(f"ERROR: All fetch attempts failed: {e}", file=sys.stderr)
return []
return []
def extract_repo_features(repo_data: Dict) -> Dict:
"""Extract structured fields for a trending repo."""
description = (repo_data.get("description") or "").strip()
topics = repo_data.get("topics", [])
# Infer key features from description and topics
features = infer_features(description, topics)
return {
"name": repo_data.get("full_name", ""),
"description": description,
"stars": repo_data.get("stargazers_count", 0),
"forks": repo_data.get("forks_count", 0),
"open_issues": repo_data.get("open_issues_count", 0),
"language": repo_data.get("language", ""),
"topics": topics,
"url": repo_data.get("html_url", ""),
"created_at": repo_data.get("created_at", ""),
"updated_at": repo_data.get("updated_at", ""),
"key_features": features,
"scanned_at": datetime.now(timezone.utc).isoformat(),
}
def infer_features(description: str, topics: List[str]) -> List[str]:
"""Infer notable capabilities/features from repo metadata.
Looks for AI/ML-relevant capabilities in topics and description.
"""
features = []
text = (description + " " + " ".join(topics)).lower()
# Domain capabilities (keys normalized to lowercase for consistency)
capability_keywords = {
"fine-tuning": ["fine-tun", "finetun"],
"agent framework": ["agent"],
"local/offline": ["local", "on-device", "offline"],
"quantized models": ["quantized", "quantization", "gguf", "gptq"],
"vision": ["vision", "multimodal", "image", "visual"],
"speech/audio": ["speech", "audio", "whisper", "tts"],
"retrieval/rag": ["rag", "retrieval", "embedding", "vector"],
"training": ["train", "training", "sft", "dpo"],
"gui/playground": ["gui", "playground", "webui", "interface"],
"sota": ["state-of-the-art", "sota", "latest"],
}
for label, keywords in capability_keywords.items():
if any(kw in text for kw in keywords):
features.append(label)
# Also include non-generic topics as features
generic_topics = {"ai", "ml", "machine-learning", "deep-learning", "llm", "python", "pytorch", "tensorflow"}
for topic in topics:
if topic.lower() not in generic_topics:
features.append(topic)
# Deduplicate while preserving order, return up to 10
seen = set()
unique = []
for f in features:
key = f.lower()
if key not in seen:
seen.add(key)
unique.append(f)
return unique[:10]
def save_trending(repos: List[Dict], output_dir: str = "metrics/trending") -> str:
"""Save trending results to a dated JSON file.
Returns the path of the written file.
"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
filename = output_path / f"github-trending-{date_str}.json"
output_data = {
"scanned_at": datetime.now(timezone.utc).isoformat(),
"count": len(repos),
"repos": repos,
}
with open(filename, "w") as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
return str(filename)
def main() -> None:
parser = argparse.ArgumentParser(
description="Scan GitHub trending repositories in AI/ML"
)
parser.add_argument(
"--language",
help="Filter by programming language (e.g., python, rust, go)",
)
parser.add_argument(
"--topic",
help="Filter by GitHub topic (e.g., ai, machine-learning, llm)",
)
parser.add_argument(
"--since",
default="daily",
choices=["daily", "weekly", "monthly"],
help="Trending period (daily/weekly/monthly) — informational only",
)
parser.add_argument(
"--output",
default="metrics/trending",
help="Output directory for results (default: metrics/trending)",
)
parser.add_argument(
"--limit",
type=int,
default=DEFAULT_LIMIT,
help=f"Maximum repos to fetch (default: {DEFAULT_LIMIT})",
)
parser.add_argument(
"--min-stars",
type=int,
default=DEFAULT_MIN_STARS,
help=f"Minimum star count for relevance (default: {DEFAULT_MIN_STARS})",
)
args = parser.parse_args()
print(
f"Fetching trending repos "
f"(language={args.language or 'any'}, topic={args.topic or 'any'}, period={args.since})..."
)
repos_raw = fetch_trending_repos(
language=args.language,
topic=args.topic,
min_stars=args.min_stars,
limit=args.limit,
)
if not repos_raw:
print("WARNING: No repos fetched — check network or rate limits", file=sys.stderr)
repos = [extract_repo_features(r) for r in repos_raw]
output_file = save_trending(repos, args.output)
print(f"Saved {len(repos)} trending repos to {output_file}")
# Brief human-readable summary
if repos:
print("\nTop repos:")
for repo in repos[:5]:
features_preview = ", ".join(repo["key_features"][:3])
print(f"{repo['stars']:>7} {repo['name']}")
if repo["description"]:
desc = repo["description"][:80]
print(f" {desc}{'...' if len(repo['description']) > 80 else ''}")
if features_preview:
print(f" Features: {features_preview}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,271 +0,0 @@
#!/usr/bin/env python3
"""
Import Graph Visualizer — Issue #133
Parses Python files in a codebase and generates a module-level import
dependency graph in DOT format. Detects circular imports.
Usage:
python3 scripts/import_graph.py /path/to/hermes-agent
python3 scripts/import_graph.py /path/to/hermes-agent --output deps.dot
python3 scripts/import_graph.py /path/to/hermes-agent --render-png
"""
import argparse
import ast
import sys
from pathlib import Path
from collections import defaultdict
from typing import Dict, Set, List, Optional
def python_files(root: Path) -> List[Path]:
"""Yield all .py files under root, excluding common noise dirs."""
exlude_dirs = {'.git', '__pycache__', '.venv', 'venv', 'node_modules', 'dist', 'build', '.tox'}
for path in root.rglob('*.py'):
if any(part in exlude_dirs for part in path.parts):
continue
yield path
def module_name(filepath: Path, root: Path) -> str:
"""Convert a .py file path to its dotted module name relative to root."""
rel = filepath.relative_to(root)
parts = list(rel.parts)
if parts[-1] == '__init__.py':
parts = parts[:-1] # package __init__ → the package itself
elif parts[-1].endswith('.py'):
parts[-1] = parts[-1][:-3] # strip .py
# Remove any __pycache__ segments
parts = [p for p in parts if p != '__pycache__']
return '.'.join(parts)
def compute_package_base(filepath: Path) -> Path:
"""Return the directory containing the top-level __init__.py for this file's package.
For a file at a/b/c/d.py, return a/b/c if c is a package, else a/b, else a."""
parent = filepath.parent
while parent != parent.parent: # while we can go up
if (parent / '__init__.py').exists():
parent = parent.parent
else:
break
return parent
def resolve_import(from_node: ast.ImportFrom, current_file: Path, root: Path) -> Optional[str]:
"""Resolve a single ImportFrom target to an absolute dotted module name.
Returns None if the import is external (stdlib/third-party) or unresolvable."""
level = from_node.level # 0 = absolute, >0 = relative
imported = from_node.module # may be None for `from . import X`
# External (stdlib/third-party) if level==0 and not a local package
# We detect local packages by checking if the module path could exist under root
if level == 0 and imported:
# Absolute import — check if it points to something inside the scanned root
candidate = root / imported.replace('.', '/')
if candidate.exists() or (candidate / '__init__.py').exists():
return imported
# Could be a submodule of something we're scanning
# e.g. from hermes.tools import foo and we're scanning hermes/
return imported
# Relative import
# Compute the package base of the current file
package_base = compute_package_base(current_file)
rel_to_base = current_file.parent.relative_to(package_base) if package_base != current_file.parent else Path()
if level == 1: # from . import X or from .X import Y
target_package = current_file.parent
else: # level >= 2: from ..X import Y etc.
up = level - 1
target_package = current_file.parent
for _ in range(up):
if target_package != target_package.parent:
target_package = target_package.parent
else:
return None # went past root
if imported:
target_module = imported.replace('.', '/')
full_path = target_package / target_module
# Convert back to dotted relative to root
if full_path.exists() or (full_path.with_suffix('.py')).exists() or (full_path / '__init__.py').exists():
try:
rel = full_path.relative_to(root)
parts = list(rel.parts)
if (full_path / '__init__.py').exists():
pass # keep all parts
elif full_path.is_file() and full_path.name.endswith('.py'):
parts[-1] = parts[-1][:-3]
return '.'.join(parts)
except ValueError:
pass
return None
else:
# from . import X — target_package is the package itself
try:
rel = target_package.relative_to(root)
return '.'.join(rel.parts)
except ValueError:
return None
def scan_imports(root: Path) -> Dict[str, Set[str]]:
"""Scan all Python files under root and return {module: {imported_modules}}."""
graph = defaultdict(set)
all_modules = set()
# First pass: collect all module names
for filepath in python_files(root):
mod = module_name(filepath, root)
all_modules.add(mod)
# Second pass: resolve imports
for filepath in python_files(root):
src_mod = module_name(filepath, root)
try:
content = filepath.read_text(errors='ignore')
tree = ast.parse(content, filename=str(filepath))
except Exception:
continue
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
name = alias.name.split('.')[0] # top-level package only
# If name matches a local module, add edge
if any(m.startswith(name) for m in all_modules):
graph[src_mod].add(name)
elif isinstance(node, ast.ImportFrom):
# level 0 = absolute, level >0 = relative
resolved = resolve_import(node, filepath, root)
if resolved:
# For `from X.Y import Z`, the dependency is on X.Y
graph[src_mod].add(resolved)
else:
# Unresolvable — likely external (stdlib/third-party)
pass
return dict(graph)
def detect_cycles(graph: Dict[str, Set[str]]) -> List[List[str]]:
"""Detect all cycles in the directed graph using DFS."""
cycles = []
visited = set()
rec_stack = set()
path = []
def dfs(node: str):
visited.add(node)
rec_stack.add(node)
path.append(node)
for neighbor in sorted(graph.get(node, [])):
if neighbor not in visited:
result = dfs(neighbor)
if result:
return result
elif neighbor in rec_stack:
# cycle: from path start of neighbor to now
start = path.index(neighbor)
return path[start:] + [neighbor]
path.pop()
rec_stack.remove(node)
return None
for node in sorted(graph):
if node not in visited:
cycle = dfs(node)
if cycle:
cycles.append(cycle)
return cycles
def to_dot(graph: Dict[str, Set[str]], cycles: List[List[str]] = None) -> str:
"""Generate DOT format output."""
cycle_nodes = set()
if cycles:
for cycle in cycles:
cycle_nodes.update(cycle)
lines = ['digraph import_graph {']
lines.append(' rankdir=LR;')
lines.append(' node [shape=box, style=filled, fontname="Helvetica"];')
lines.append(' edge [arrowhead=vee];')
lines.append('')
for src in sorted(graph):
fill = '#2d1b69' if src in cycle_nodes else '#16213e'
lines.append(f' "{src}" [fillcolor="{fill}"];')
for src, deps in sorted(graph.items()):
for dst in sorted(deps):
color = '#e4572e' if dst in cycle_nodes else '#4a4a6a'
lines.append(f' "{src}" -> "{dst}" [color="{color}"];')
lines.append('}')
return '\n'.join(lines)
def main():
parser = argparse.ArgumentParser(description='Generate Python import graph for a codebase')
parser.add_argument('path', help='Path to Python project (e.g. hermes-agent directory)')
parser.add_argument('--output', '-o', help='Write DOT to file instead of stdout')
parser.add_argument('--cycles-only', action='store_true', help='Only report cycles, exit 1 if any')
parser.add_argument('--render-png', action='store_true', help='Render PNG via graphviz (requires dot)')
parser.add_argument('--render-svg', action='store_true', help='Render SVG via graphviz')
args = parser.parse_args()
root = Path(args.path).resolve()
if not root.is_dir():
print(f"Error: {root} is not a directory", file=sys.stderr)
sys.exit(1)
print(f"Scanning {root}...", file=sys.stderr)
graph = scan_imports(root)
cycles = detect_cycles(graph)
if args.cycles_only:
if cycles:
print("CIRCULAR DEPENDENCIES:", file=sys.stderr)
for cycle in cycles:
print(f" {''.join(cycle)}", file=sys.stderr)
sys.exit(1)
else:
print("No circular dependencies found.", file=sys.stderr)
sys.exit(0)
# Prepare output
output = to_dot(graph, cycles)
if args.output:
Path(args.output).write_text(output)
print(f"DOT written to {args.output}", file=sys.stderr)
# Optional rendering
if args.render_png or args.render_svg:
import subprocess
out_path = Path(args.output)
if args.render_png:
png_out = out_path.with_suffix('.png')
subprocess.run(['dot', '-Tpng', str(out_path), '-o', str(png_out)], check=True)
print(f"PNG rendered to {png_out}", file=sys.stderr)
if args.render_svg:
svg_out = out_path.with_suffix('.svg')
subprocess.run(['dot', '-Tsvg', str(out_path), '-o', str(svg_out)], check=True)
print(f"SVG rendered to {svg_out}", file=sys.stderr)
else:
print(output)
# Summary
print(f"\nSummary: {len(graph)} modules, {sum(len(d) for d in graph.values())} import edges, {len(cycles)} cycles",
file=sys.stderr)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,125 @@
#!/usr/bin/env python3
"""Tests for github_trending_scanner.py — pure function validation.
Tests the feature inference, extraction, and output formatting logic
without relying on external GitHub API calls.
"""
import json
import sys
import tempfile
from pathlib import Path
# Add scripts dir to path for import
sys.path.insert(0, str(Path(__file__).resolve().parent))
from github_trending_scanner import (
extract_repo_features,
infer_features,
save_trending,
)
def test_infer_features_from_description():
"""Feature inference extracts capabilities from description text."""
desc = "A local, quantized LLM framework for fine-tuning and agent-based RAG with vision."
topics = ["ai", "llm"]
features = infer_features(desc, topics)
# Should include relevant capabilities (case-insensitive comparison)
expected_lower = {"fine-tuning", "local/offline", "quantized models", "agent framework", "vision", "retrieval/rag"}
actual_lower = set(f.lower() for f in features)
assert expected_lower.issubset(actual_lower), f"Missing features. Expected subset of {expected_lower}, got {actual_lower}"
print("PASS: infer_features_from_description")
def test_infer_features_from_topics_only():
"""Topics alone can drive feature detection."""
desc = ""
topics = ["computer-vision", "speech", "pytorch"]
features = infer_features(desc, topics)
# Non-generic topics should appear as features (topics preserved as-is)
assert "computer-vision" in features, f"Expected 'computer-vision' in {features}"
assert "speech" in features, f"Expected 'speech' in {features}"
# Generic topics (pytorch) may be filtered
print(f"PASS: infer_features_from_topics_only → {features}")
def test_extract_repo_features_produces_valid_structure():
"""extract_repo_features returns all required fields."""
mock_repo = {
"full_name": "example/repo",
"description": "An example repository",
"stargazers_count": 1234,
"forks_count": 56,
"open_issues_count": 7,
"language": "Python",
"topics": ["ai", "llm"],
"html_url": "https://github.com/example/repo",
"created_at": "2025-01-01T00:00:00Z",
"updated_at": "2026-01-01T00:00:00Z",
}
result = extract_repo_features(mock_repo)
assert result["name"] == "example/repo"
assert result["description"] == "An example repository"
assert result["stars"] == 1234
assert isinstance(result["key_features"], list)
assert "scanned_at" in result
assert result["url"] == "https://github.com/example/repo"
print("PASS: extract_repo_features_structure")
def test_save_trending_creates_dated_json():
"""save_trending writes a valid JSON file with the expected schema."""
repos = [
{
"name": "test/repo",
"description": "Test repository",
"stars": 999,
"language": "Python",
"topics": ["test"],
"key_features": ["testing"],
"scanned_at": "2026-04-26T00:00:00+00:00",
}
]
with tempfile.TemporaryDirectory() as tmp:
output_file = save_trending(repos, output_dir=tmp)
path = Path(output_file)
assert path.exists(), f"Output file not created: {output_file}"
with open(path) as f:
data = json.load(f)
assert "scanned_at" in data
assert data["count"] == 1
assert isinstance(data["repos"], list)
assert data["repos"][0]["name"] == "test/repo"
print(f"PASS: save_trending → {output_file}")
def test_save_trending_respects_output_dir_creation():
"""Output directory is created if it doesn't exist."""
repos = []
with tempfile.TemporaryDirectory() as tmp:
nested = Path(tmp) / "nested" / "trending"
assert not nested.exists()
output_file = save_trending(repos, output_dir=str(nested))
assert nested.exists()
assert Path(output_file).exists()
print("PASS: output_dir_creation")
if __name__ == "__main__":
test_infer_features_from_description()
test_infer_features_from_topics_only()
test_extract_repo_features_produces_valid_structure()
test_save_trending_creates_dated_json()
test_save_trending_respects_output_dir_creation()
print("\nAll github_trending_scanner tests passed.")

View File

@@ -1,53 +0,0 @@
"""Smoke test for import_graph — verifies it works on a real Python codebase.
We run import_graph.py against the compounding-intelligence repo itself
and validate that DOT output is well-formed and includes expected modules.
"""
import subprocess
import sys
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[1] # tests/ → repo root
def test_import_graph_creates_dot():
"""import_graph.py produces valid DOT output for this repo."""
script = REPO_ROOT / 'scripts' / 'import_graph.py'
result = subprocess.run(
[sys.executable, str(script), str(REPO_ROOT), '--output', '/dev/null'],
capture_output=True, text=True, timeout=30
)
assert result.returncode == 0, f"script failed: {result.stderr}"
# Should have printed a summary
assert ' modules,' in result.stderr or 'Summary:' in result.stderr
def test_import_graph_excludes_site_packages():
"""import_graph.py does not crash on unparseable files or external deps."""
script = REPO_ROOT / 'scripts' / 'import_graph.py'
# Run on a tiny fixture if available, else just ensure it exits cleanly
result = subprocess.run(
[sys.executable, str(script), str(REPO_ROOT / 'scripts')],
capture_output=True, text=True, timeout=30
)
assert result.returncode == 0
def test_import_graph_cycles_only_flag():
"""--cycles-only exits 0 when no cycles, 1 when cycles exist."""
script = REPO_ROOT / 'scripts' / 'import_graph.py'
result = subprocess.run(
[sys.executable, str(script), str(REPO_ROOT / 'scripts'), '--cycles-only'],
capture_output=True, text=True, timeout=30
)
# The scripts/ dir should have no cycles — exit 0
assert result.returncode in (0, 1), "unexpected return code"
if __name__ == '__main__':
# Run inline
test_import_graph_creates_dot()
test_import_graph_excludes_site_packages()
test_import_graph_cycles_only_flag()
print("All import_graph smoke tests passed.")