#!/usr/bin/env python3 """ Deep Dive Phase 1: Source Aggregation Layer Aggregates research sources from arXiv, lab blogs, and newsletters. Usage: python phase1_aggregate.py [--date YYYY-MM-DD] [--output-dir DIR] Issue: the-nexus#830 """ import argparse import asyncio import hashlib import json import os import xml.etree.ElementTree as ET from dataclasses import asdict, dataclass from datetime import datetime from pathlib import Path from typing import List, Optional from urllib.parse import urljoin import aiohttp import feedparser @dataclass class SourceItem: """A single source item (paper, blog post, etc.)""" id: str title: str url: str source: str # 'arxiv', 'openai', 'anthropic', 'deepmind', etc. published: str # ISO format date summary: str authors: List[str] categories: List[str] raw_content: str = "" class ArXIVAggregator: """Aggregate from arXiv RSS feeds for CS categories.""" CATEGORIES = ['cs.AI', 'cs.CL', 'cs.LG'] BASE_URL = "http://export.arxiv.org/rss/" async def fetch(self, session: aiohttp.ClientSession) -> List[SourceItem]: items = [] for cat in self.CATEGORIES: url = f"{self.BASE_URL}{cat}" try: async with session.get(url, timeout=30) as resp: if resp.status == 200: content = await resp.text() items.extend(self._parse(content, cat)) except Exception as e: print(f"[ERROR] arXiv {cat}: {e}") return items def _parse(self, content: str, category: str) -> List[SourceItem]: items = [] try: feed = feedparser.parse(content) for entry in feed.entries: item = SourceItem( id=entry.get('id', entry.get('link', '')), title=entry.get('title', ''), url=entry.get('link', ''), source=f'arxiv-{category}', published=entry.get('published', entry.get('updated', '')), summary=entry.get('summary', '')[:2000], authors=[a.get('name', '') for a in entry.get('authors', [])], categories=[t.get('term', '') for t in entry.get('tags', [])], raw_content=entry.get('summary', '') ) items.append(item) except Exception as e: print(f"[ERROR] Parse arXiv RSS: {e}") return items class BlogAggregator: """Aggregate from major AI lab blogs via RSS/Atom.""" SOURCES = { 'openai': 'https://openai.com/blog/rss.xml', 'anthropic': 'https://www.anthropic.com/news.atom', 'deepmind': 'https://deepmind.google/blog/rss.xml', 'google-research': 'https://research.google/blog/rss/', } async def fetch(self, session: aiohttp.ClientSession) -> List[SourceItem]: items = [] for source, url in self.SOURCES.items(): try: async with session.get(url, timeout=30) as resp: if resp.status == 200: content = await resp.text() items.extend(self._parse(content, source)) except Exception as e: print(f"[ERROR] {source}: {e}") return items def _parse(self, content: str, source: str) -> List[SourceItem]: items = [] try: feed = feedparser.parse(content) for entry in feed.entries[:10]: # Limit to recent 10 per source item = SourceItem( id=entry.get('id', entry.get('link', '')), title=entry.get('title', ''), url=entry.get('link', ''), source=source, published=entry.get('published', entry.get('updated', '')), summary=entry.get('summary', '')[:2000], authors=[a.get('name', '') for a in entry.get('authors', [])], categories=[], raw_content=entry.get('content', [{'value': ''}])[0].get('value', '')[:5000] ) items.append(item) except Exception as e: print(f"[ERROR] Parse {source}: {e}") return items class SourceAggregator: """Main aggregation orchestrator.""" def __init__(self, output_dir: Path, date: str): self.output_dir = output_dir self.date = date self.sources_dir = output_dir / "sources" / date self.sources_dir.mkdir(parents=True, exist_ok=True) async def run(self) -> List[SourceItem]: """Run full aggregation pipeline.""" print(f"[Phase 1] Aggregating sources for {self.date}") all_items = [] async with aiohttp.ClientSession() as session: # Parallel fetch from all sources arxiv_agg = ArXIVAggregator() blog_agg = BlogAggregator() arxiv_task = arxiv_agg.fetch(session) blog_task = blog_agg.fetch(session) results = await asyncio.gather(arxiv_task, blog_task, return_exceptions=True) for result in results: if isinstance(result, Exception): print(f"[ERROR] Aggregation failed: {result}") else: all_items.extend(result) print(f"[Phase 1] Total items aggregated: {len(all_items)}") # Save to disk self._save(all_items) return all_items def _save(self, items: List[SourceItem]): """Save aggregated items to JSON.""" output_file = self.sources_dir / "aggregated.json" data = { 'date': self.date, 'generated_at': datetime.now().isoformat(), 'count': len(items), 'items': [asdict(item) for item in items] } with open(output_file, 'w') as f: json.dump(data, f, indent=2) print(f"[Phase 1] Saved to {output_file}") def main(): parser = argparse.ArgumentParser(description='Deep Dive Phase 1: Source Aggregation') parser.add_argument('--date', default=datetime.now().strftime('%Y-%m-%d'), help='Target date (YYYY-MM-DD)') parser.add_argument('--output-dir', type=Path, default=Path('../data'), help='Output directory for data') args = parser.parse_args() aggregator = SourceAggregator(args.output_dir, args.date) asyncio.run(aggregator.run()) if __name__ == '__main__': main()