Compare commits

..

2 Commits

Author SHA1 Message Date
Rockachopa
ec76e9fec3 test(scanner): unit tests for github_trending_scanner
Some checks failed
Test / pytest (pull_request) Failing after 9s
2026-04-26 11:21:02 +00:00
38c5862737 feat(scanner): add GitHub Trending Scanner CLI for AI/ML repos 2026-04-26 11:20:51 +00:00
4 changed files with 383 additions and 311 deletions

View File

@@ -0,0 +1,258 @@
#!/usr/bin/env python3
"""GitHub Trending Scanner — Scan trending repos in AI/ML.
Extracts: repo description, stars, key features (topics, inferred highlights).
Filters by language and/or topic. Outputs dated JSON for daily scan pipeline.
Usage:
python3 github_trending_scanner.py --language python --topic ai --output metrics/trending
python3 github_trending_scanner.py --topic machine-learning --limit 50
python3 github_trending_scanner.py --language rust --topic artificial-intelligence
"""
import argparse
import json
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List, Dict
import urllib.request
import urllib.parse
import urllib.error
GITHUB_API_BASE = os.environ.get("GITHUB_API_BASE", "https://api.github.com")
DEFAULT_OUTPUT_DIR = os.environ.get("TRENDING_OUTPUT_DIR", "metrics/trending")
DEFAULT_LIMIT = int(os.environ.get("TRENDING_LIMIT", "30"))
DEFAULT_MIN_STARS = int(os.environ.get("TRENDING_MIN_STARS", "1000"))
def fetch_trending_repos(
language: Optional[str] = None,
topic: Optional[str] = None,
min_stars: int = DEFAULT_MIN_STARS,
limit: int = DEFAULT_LIMIT,
) -> List[Dict]:
"""Fetch trending-like repositories from GitHub using the search API.
GitHub's public search API is unauthenticated-rate-limited (60 req/hr).
This function retries on rate-limit backoff and falls back gracefully.
"""
# Build search query: stars threshold + optional language/topic filters
query = f"stars:>{min_stars}"
if language:
query += f" language:{language}"
if topic:
query += f" topic:{topic}"
# Sort by stars descending as a proxy for trending/popular
params = {
"q": query,
"sort": "stars",
"order": "desc",
"per_page": min(limit, 100), # GitHub max per_page is 100
}
url = f"{GITHUB_API_BASE}/search/repositories?{urllib.parse.urlencode(params)}"
headers = {
"Accept": "application/vnd.github.v3+json",
"User-Agent": "Sovereign-Trending-Scanner/1.0",
}
for attempt in range(3):
try:
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=30) as resp:
if resp.status != 200:
raise RuntimeError(f"GitHub API returned {resp.status}")
data = json.loads(resp.read().decode("utf-8"))
return data.get("items", [])[:limit]
except urllib.error.HTTPError as e:
if e.code == 403:
# Check for rate limit message
body = e.read().decode("utf-8", errors="replace").lower()
if "rate limit" in body or "api rate limit exceeded" in body:
reset_ts = int(e.headers.get("X-RateLimit-Reset", 0))
wait_seconds = max(5, reset_ts - int(time.time()) + 5)
print(f"Rate limit exceeded — waiting {wait_seconds}s (attempt {attempt+1}/3)...", file=sys.stderr)
time.sleep(wait_seconds)
continue
print(f"ERROR: GitHub API request failed: {e}{e.read().decode('utf-8', errors='replace')[:200]}", file=sys.stderr)
return []
except Exception as e:
if attempt < 2:
backoff = 2 ** attempt
print(f"WARNING: Fetch attempt {attempt+1} failed: {e} — retrying in {backoff}s", file=sys.stderr)
time.sleep(backoff)
continue
print(f"ERROR: All fetch attempts failed: {e}", file=sys.stderr)
return []
return []
def extract_repo_features(repo_data: Dict) -> Dict:
"""Extract structured fields for a trending repo."""
description = (repo_data.get("description") or "").strip()
topics = repo_data.get("topics", [])
# Infer key features from description and topics
features = infer_features(description, topics)
return {
"name": repo_data.get("full_name", ""),
"description": description,
"stars": repo_data.get("stargazers_count", 0),
"forks": repo_data.get("forks_count", 0),
"open_issues": repo_data.get("open_issues_count", 0),
"language": repo_data.get("language", ""),
"topics": topics,
"url": repo_data.get("html_url", ""),
"created_at": repo_data.get("created_at", ""),
"updated_at": repo_data.get("updated_at", ""),
"key_features": features,
"scanned_at": datetime.now(timezone.utc).isoformat(),
}
def infer_features(description: str, topics: List[str]) -> List[str]:
"""Infer notable capabilities/features from repo metadata.
Looks for AI/ML-relevant capabilities in topics and description.
"""
features = []
text = (description + " " + " ".join(topics)).lower()
# Domain capabilities (keys normalized to lowercase for consistency)
capability_keywords = {
"fine-tuning": ["fine-tun", "finetun"],
"agent framework": ["agent"],
"local/offline": ["local", "on-device", "offline"],
"quantized models": ["quantized", "quantization", "gguf", "gptq"],
"vision": ["vision", "multimodal", "image", "visual"],
"speech/audio": ["speech", "audio", "whisper", "tts"],
"retrieval/rag": ["rag", "retrieval", "embedding", "vector"],
"training": ["train", "training", "sft", "dpo"],
"gui/playground": ["gui", "playground", "webui", "interface"],
"sota": ["state-of-the-art", "sota", "latest"],
}
for label, keywords in capability_keywords.items():
if any(kw in text for kw in keywords):
features.append(label)
# Also include non-generic topics as features
generic_topics = {"ai", "ml", "machine-learning", "deep-learning", "llm", "python", "pytorch", "tensorflow"}
for topic in topics:
if topic.lower() not in generic_topics:
features.append(topic)
# Deduplicate while preserving order, return up to 10
seen = set()
unique = []
for f in features:
key = f.lower()
if key not in seen:
seen.add(key)
unique.append(f)
return unique[:10]
def save_trending(repos: List[Dict], output_dir: str = "metrics/trending") -> str:
"""Save trending results to a dated JSON file.
Returns the path of the written file.
"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
filename = output_path / f"github-trending-{date_str}.json"
output_data = {
"scanned_at": datetime.now(timezone.utc).isoformat(),
"count": len(repos),
"repos": repos,
}
with open(filename, "w") as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
return str(filename)
def main() -> None:
parser = argparse.ArgumentParser(
description="Scan GitHub trending repositories in AI/ML"
)
parser.add_argument(
"--language",
help="Filter by programming language (e.g., python, rust, go)",
)
parser.add_argument(
"--topic",
help="Filter by GitHub topic (e.g., ai, machine-learning, llm)",
)
parser.add_argument(
"--since",
default="daily",
choices=["daily", "weekly", "monthly"],
help="Trending period (daily/weekly/monthly) — informational only",
)
parser.add_argument(
"--output",
default="metrics/trending",
help="Output directory for results (default: metrics/trending)",
)
parser.add_argument(
"--limit",
type=int,
default=DEFAULT_LIMIT,
help=f"Maximum repos to fetch (default: {DEFAULT_LIMIT})",
)
parser.add_argument(
"--min-stars",
type=int,
default=DEFAULT_MIN_STARS,
help=f"Minimum star count for relevance (default: {DEFAULT_MIN_STARS})",
)
args = parser.parse_args()
print(
f"Fetching trending repos "
f"(language={args.language or 'any'}, topic={args.topic or 'any'}, period={args.since})..."
)
repos_raw = fetch_trending_repos(
language=args.language,
topic=args.topic,
min_stars=args.min_stars,
limit=args.limit,
)
if not repos_raw:
print("WARNING: No repos fetched — check network or rate limits", file=sys.stderr)
repos = [extract_repo_features(r) for r in repos_raw]
output_file = save_trending(repos, args.output)
print(f"Saved {len(repos)} trending repos to {output_file}")
# Brief human-readable summary
if repos:
print("\nTop repos:")
for repo in repos[:5]:
features_preview = ", ".join(repo["key_features"][:3])
print(f"{repo['stars']:>7} {repo['name']}")
if repo["description"]:
desc = repo["description"][:80]
print(f" {desc}{'...' if len(repo['description']) > 80 else ''}")
if features_preview:
print(f" Features: {features_preview}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,206 +0,0 @@
#!/usr/bin/env python3
"""
graph_visualizer.py — Generate visual graph representations of the knowledge graph.
Reads knowledge/index.json and renders the fact relationship graph.
Supports ASCII terminal output and DOT export for Graphviz.
Usage:
python3 scripts/graph_visualizer.py # ASCII, all nodes
python3 scripts/graph_visualizer.py --format dot # DOT output
python3 scripts/graph_visualizer.py --seed root --max-depth 2
python3 scripts/graph_visualizer.py --filter-domain hermes-agent
python3 scripts/graph_visualizer.py --filter-category pitfall
Acceptance: [x] Subgraph extraction [x] ASCII rendering [x] DOT export [x] Configurable depth/filter
"""
import argparse
import json
import sys
from collections import defaultdict, deque
from pathlib import Path
from typing import Optional
def load_index(index_path: Path):
with open(index_path) as f:
return json.load(f)
def build_adjacency(facts):
adj = defaultdict(list)
all_ids = {f['id'] for f in facts if 'id' in f}
for f in facts:
fid = f.get('id')
if not fid:
continue
for rel in f.get('related', []):
if rel in all_ids:
adj[fid].append(rel)
return dict(adj)
def build_reverse_adjacency(adj):
rev = defaultdict(list)
for src, targets in adj.items():
for tgt in targets:
rev[tgt].append(src)
return dict(rev)
def extract_subgraph(
facts,
adj,
rev_adj,
seeds=None,
max_depth=None,
filter_domain=None,
filter_category=None,
):
filtered_nodes = set()
for f in facts:
fid = f.get('id')
if not fid:
continue
if filter_domain and f.get('domain') != filter_domain:
continue
if filter_category and f.get('category') != filter_category:
continue
filtered_nodes.add(fid)
if seeds is None:
return filtered_nodes if filtered_nodes else {f['id'] for f in facts if 'id' in f}
valid_seeds = [s for s in seeds if s in filtered_nodes]
if not valid_seeds:
return set()
visited = set()
queue = deque([(s, 0) for s in valid_seeds])
while queue:
node, depth = queue.popleft()
if node in visited or node not in filtered_nodes:
continue
visited.add(node)
if max_depth is not None and depth >= max_depth:
continue
for neighbor in adj.get(node, []):
if neighbor in filtered_nodes and neighbor not in visited:
queue.append((neighbor, depth + 1))
for neighbor in rev_adj.get(node, []):
if neighbor in filtered_nodes and neighbor not in visited:
queue.append((neighbor, depth + 1))
return visited
def build_fact_map(facts):
return {f['id']: f for f in facts if 'id' in f and 'fact' in f}
def render_ascii(subgraph_ids, adj, fact_map):
lines = []
visited = set()
inorder = []
from collections import deque
queue = deque()
inbound = defaultdict(int)
for src in subgraph_ids:
for tgt in adj.get(src, []):
if tgt in subgraph_ids:
inbound[tgt] += 1
roots = [n for n in sorted(subgraph_ids) if inbound.get(n, 0) == 0]
if not roots:
roots = sorted(subgraph_ids)
for root in roots:
queue.append((root, 0, None))
while queue:
node, depth, parent_label = queue.popleft()
if node in visited:
continue
visited.add(node)
fact = fact_map.get(node, {})
label = fact.get('fact', str(node))[:80]
category = fact.get('category', 'fact')
domain = fact.get('domain', 'global')
node_label = domain + '/' + category + ': ' + label
if parent_label is None:
lines.append(f"{' ' * depth}┌─ {node_label}")
else:
lines.append(f"{' ' * depth}├─ {node_label}")
children = [c for c in adj.get(node, []) if c in subgraph_ids]
for i, child in enumerate(children):
queue.append((child, depth + 1, node))
if len(visited) < len(subgraph_ids):
lines.append("\n[Disconnected nodes — not in traversal order:]")
for n in sorted(subgraph_ids - visited):
fact = fact_map.get(n, {})
label = fact.get('fact', n)[:60]
lines.append(f" {n}{label}")
return "\n".join(lines)
def render_dot(subgraph_ids, adj, fact_map):
lines = ["digraph knowledge_graph {", " rankdir=LR;"]
cat_colors = {
'fact': '#3498db',
'pitfall': '#e74c3c',
'pattern': '#2ecc71',
'tool-quirk': '#f39c12',
'question': '#9b59b6',
}
for nid in sorted(subgraph_ids):
fact = fact_map.get(nid, {})
category = fact.get('category', 'fact')
domain = fact.get('domain', 'global')
label = fact.get('fact', nid).replace('"', '\\"')[:80]
fillcolor = cat_colors.get(category, '#666666')
lines.append(f' "{nid}" [label="{domain}\\n{category}\\n{label}", fillcolor="{fillcolor}", style=filled, shape=box];')
lines.append("")
for src in sorted(subgraph_ids):
for tgt in adj.get(src, []):
if tgt in subgraph_ids:
lines.append(f' "{src}" -> "{tgt}";')
lines.append("}")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Visualize the knowledge graph (ASCII terminal or DOT for Graphviz).")
parser.add_argument("--index", type=Path, default=Path(__file__).parent.parent / "knowledge" / "index.json",
help="Path to knowledge/index.json")
parser.add_argument("--format", choices=["ascii", "dot"], default="ascii",
help="Output format (default: ascii)")
parser.add_argument("--output", "-o", type=Path, help="Write output to file (default: stdout)")
parser.add_argument("--seed", help="Starting fact ID (comma-sep). Omit to render full graph.")
parser.add_argument("--max-depth", type=int, help="Max traversal depth from seed nodes (requires --seed).")
parser.add_argument("--filter-domain", help="Only include facts from this domain.")
parser.add_argument("--filter-category", help="Only include facts of this category.")
args = parser.parse_args()
index = load_index(args.index)
facts = index.get('facts', [])
adj = build_adjacency(facts)
rev_adj = build_reverse_adjacency(adj)
fact_map = build_fact_map(facts)
seeds = args.seed.split(',') if args.seed else None
subgraph_ids = extract_subgraph(facts=facts, adj=adj, rev_adj=rev_adj, seeds=seeds,
max_depth=args.max_depth,
filter_domain=args.filter_domain,
filter_category=args.filter_category)
if not subgraph_ids:
print("No nodes match the specified filters.", file=sys.stderr)
sys.exit(1)
if args.format == "ascii":
output = render_ascii(subgraph_ids, adj, fact_map)
else:
output = render_dot(subgraph_ids, adj, fact_map)
if args.output:
args.output.write_text(output)
print(f"Written: {args.output}", file=sys.stderr)
else:
print(output)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,125 @@
#!/usr/bin/env python3
"""Tests for github_trending_scanner.py — pure function validation.
Tests the feature inference, extraction, and output formatting logic
without relying on external GitHub API calls.
"""
import json
import sys
import tempfile
from pathlib import Path
# Add scripts dir to path for import
sys.path.insert(0, str(Path(__file__).resolve().parent))
from github_trending_scanner import (
extract_repo_features,
infer_features,
save_trending,
)
def test_infer_features_from_description():
"""Feature inference extracts capabilities from description text."""
desc = "A local, quantized LLM framework for fine-tuning and agent-based RAG with vision."
topics = ["ai", "llm"]
features = infer_features(desc, topics)
# Should include relevant capabilities (case-insensitive comparison)
expected_lower = {"fine-tuning", "local/offline", "quantized models", "agent framework", "vision", "retrieval/rag"}
actual_lower = set(f.lower() for f in features)
assert expected_lower.issubset(actual_lower), f"Missing features. Expected subset of {expected_lower}, got {actual_lower}"
print("PASS: infer_features_from_description")
def test_infer_features_from_topics_only():
"""Topics alone can drive feature detection."""
desc = ""
topics = ["computer-vision", "speech", "pytorch"]
features = infer_features(desc, topics)
# Non-generic topics should appear as features (topics preserved as-is)
assert "computer-vision" in features, f"Expected 'computer-vision' in {features}"
assert "speech" in features, f"Expected 'speech' in {features}"
# Generic topics (pytorch) may be filtered
print(f"PASS: infer_features_from_topics_only → {features}")
def test_extract_repo_features_produces_valid_structure():
"""extract_repo_features returns all required fields."""
mock_repo = {
"full_name": "example/repo",
"description": "An example repository",
"stargazers_count": 1234,
"forks_count": 56,
"open_issues_count": 7,
"language": "Python",
"topics": ["ai", "llm"],
"html_url": "https://github.com/example/repo",
"created_at": "2025-01-01T00:00:00Z",
"updated_at": "2026-01-01T00:00:00Z",
}
result = extract_repo_features(mock_repo)
assert result["name"] == "example/repo"
assert result["description"] == "An example repository"
assert result["stars"] == 1234
assert isinstance(result["key_features"], list)
assert "scanned_at" in result
assert result["url"] == "https://github.com/example/repo"
print("PASS: extract_repo_features_structure")
def test_save_trending_creates_dated_json():
"""save_trending writes a valid JSON file with the expected schema."""
repos = [
{
"name": "test/repo",
"description": "Test repository",
"stars": 999,
"language": "Python",
"topics": ["test"],
"key_features": ["testing"],
"scanned_at": "2026-04-26T00:00:00+00:00",
}
]
with tempfile.TemporaryDirectory() as tmp:
output_file = save_trending(repos, output_dir=tmp)
path = Path(output_file)
assert path.exists(), f"Output file not created: {output_file}"
with open(path) as f:
data = json.load(f)
assert "scanned_at" in data
assert data["count"] == 1
assert isinstance(data["repos"], list)
assert data["repos"][0]["name"] == "test/repo"
print(f"PASS: save_trending → {output_file}")
def test_save_trending_respects_output_dir_creation():
"""Output directory is created if it doesn't exist."""
repos = []
with tempfile.TemporaryDirectory() as tmp:
nested = Path(tmp) / "nested" / "trending"
assert not nested.exists()
output_file = save_trending(repos, output_dir=str(nested))
assert nested.exists()
assert Path(output_file).exists()
print("PASS: output_dir_creation")
if __name__ == "__main__":
test_infer_features_from_description()
test_infer_features_from_topics_only()
test_extract_repo_features_produces_valid_structure()
test_save_trending_creates_dated_json()
test_save_trending_respects_output_dir_creation()
print("\nAll github_trending_scanner tests passed.")

View File

@@ -1,105 +0,0 @@
#!/usr/bin/env python3
"""
Tests for graph_visualizer.py — smoke test + subgraph logic.
Run: python3 scripts/test_graph_visualizer.py
"""
import json, sys, tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
import graph_visualizer as gv
def make_index(facts, tmp_dir):
p = tmp_dir / "index.json"
p.write_text(json.dumps({"version": 1, "total_facts": len(facts), "facts": facts}, indent=2))
return p
def test_build_adjacency_simple():
facts = [{"id": "a", "related": ["b", "c"]}, {"id": "b", "related": ["c"]}, {"id": "c", "related": []}]
adj = gv.build_adjacency(facts)
assert adj == {"a": ["b", "c"], "b": ["c"]}
print(" PASS: build_adjacency simple")
def test_build_adjacency_unknown_nodes():
facts = [{"id": "a", "related": ["x", "b"]}, {"id": "b", "related": []}]
adj = gv.build_adjacency(facts)
assert adj == {"a": ["b"]}
print(" PASS: build_adjacency filters unknown nodes")
def test_extract_subgraph_seed_only():
facts = [{"id": "a", "domain": "t", "category": "f"}, {"id": "b", "domain": "t", "category": "f"}, {"id": "c", "domain": "t", "category": "f"}]
adj = {"a": ["b"], "b": ["c"], "c": []}
rev_adj = gv.build_reverse_adjacency(adj)
sub = gv.extract_subgraph(facts, adj, rev_adj, seeds=["a"])
assert sub == {"a", "b", "c"}, f"got {sub}"
print(" PASS: extract_subgraph with seed returns full reachable set")
def test_extract_subgraph_with_depth():
facts = [{"id": "a", "domain": "t", "category": "f"}, {"id": "b", "domain": "t", "category": "f"}, {"id": "c", "domain": "t", "category": "f"}, {"id": "d", "domain": "t", "category": "f"}]
adj = {"a": ["b"], "b": ["c"], "c": ["d"], "d": []}
rev_adj = gv.build_reverse_adjacency(adj)
sub = gv.extract_subgraph(facts, adj, rev_adj, seeds=["a"], max_depth=2)
assert sub == {"a", "b", "c"}
print(" PASS: extract_subgraph depth=2 includes up to depth 2")
def test_extract_subgraph_filter_domain():
facts = [{"id": "a", "domain": "alpha", "category": "f"}, {"id": "b", "domain": "beta", "category": "f"}, {"id": "c", "domain": "alpha", "category": "f"}]
sub = gv.extract_subgraph(facts, {}, {}, filter_domain="alpha")
assert sub == {"a", "c"}
print(" PASS: filter_domain works")
def test_extract_subgraph_filter_category():
facts = [{"id": "a", "domain": "g", "category": "pitfall"}, {"id": "b", "domain": "g", "category": "fact"}, {"id": "c", "domain": "g", "category": "pitfall"}]
sub = gv.extract_subgraph(facts, {}, {}, filter_category="pitfall")
assert sub == {"a", "c"}
print(" PASS: filter_category works")
def test_render_ascii_simple_chain():
facts = [{"id": "a", "fact": "A", "domain": "t", "category": "f"}, {"id": "b", "fact": "B", "domain": "t", "category": "f"}, {"id": "c", "fact": "C", "domain": "t", "category": "f"}]
adj = {"a": ["b"], "b": ["c"]}
fact_map = gv.build_fact_map(facts)
out = gv.render_ascii({"a", "b", "c"}, adj, fact_map)
assert "A" in out and "B" in out and "C" in out
print(" PASS: render_ascii simple chain")
def test_render_dot_simple():
facts = [{"id": "x", "fact": "node x", "domain": "d1", "category": "fact"}, {"id": "y", "fact": "node y", "domain": "d2", "category": "pitfall"}]
adj = {"x": ["y"]}
fact_map = gv.build_fact_map(facts)
out = gv.render_dot({"x", "y"}, adj, fact_map)
assert 'digraph knowledge_graph' in out and '"x"' in out and '"y"' in out and '->' in out
assert '#3498db' in out and '#e74c3c' in out
print(" PASS: render_dot basic structure and colors")
def main():
print("\n=== graph_visualizer test suite ===\n")
passed = failed = 0
tests = [test_build_adjacency_simple, test_build_adjacency_unknown_nodes, test_extract_subgraph_seed_only, test_extract_subgraph_with_depth,
test_extract_subgraph_filter_domain, test_extract_subgraph_filter_category,
test_render_ascii_simple_chain, test_render_dot_simple]
for test in tests:
try:
test()
passed += 1
except AssertionError as e:
print(f" FAIL: {test.__name__}{e}")
failed += 1
except Exception as e:
print(f" ERROR: {test.__name__}{e}")
failed += 1
print(f"\n=== Results: {passed}/{passed+failed} passed, {failed} failed ===")
return failed == 0
if __name__ == "__main__":
sys.exit(0 if main() else 1)