diff --git a/bin/deepdive_aggregator.py b/bin/deepdive_aggregator.py new file mode 100644 index 0000000..de8b5e6 --- /dev/null +++ b/bin/deepdive_aggregator.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +"""deepdive_aggregator.py — Phase 1: Intelligence source aggregation. Issue #830.""" + +import argparse +import json +import xml.etree.ElementTree as ET +from dataclasses import dataclass, asdict +from datetime import datetime +from typing import List, Optional +from pathlib import Path +import urllib.request + + +@dataclass +class RawItem: + source: str + title: str + url: str + content: str + published: str + authors: Optional[str] = None + categories: Optional[List[str]] = None + + +class ArxivRSSAdapter: + def __init__(self, category: str): + self.name = f"arxiv_{category}" + self.url = f"http://export.arxiv.org/rss/{category}" + + def fetch(self) -> List[RawItem]: + try: + with urllib.request.urlopen(self.url, timeout=30) as resp: + xml_content = resp.read() + except Exception as e: + print(f"Error fetching {self.url}: {e}") + return [] + + items = [] + try: + root = ET.fromstring(xml_content) + channel = root.find("channel") + if channel is None: + return items + + for item in channel.findall("item"): + title = item.findtext("title", default="") + link = item.findtext("link", default="") + desc = item.findtext("description", default="") + pub_date = item.findtext("pubDate", default="") + + items.append(RawItem( + source=self.name, + title=title.strip(), + url=link, + content=desc[:2000], + published=self._parse_date(pub_date), + categories=[self.category] + )) + except ET.ParseError as e: + print(f"Parse error: {e}") + + return items + + def _parse_date(self, date_str: str) -> str: + from email.utils import parsedate_to_datetime + try: + dt = parsedate_to_datetime(date_str) + return dt.isoformat() + except: + return datetime.now().isoformat() + + +SOURCE_REGISTRY = { + "arxiv_cs_ai": lambda: ArxivRSSAdapter("cs.AI"), + "arxiv_cs_cl": lambda: ArxivRSSAdapter("cs.CL"), + "arxiv_cs_lg": lambda: ArxivRSSAdapter("cs.LG"), +} + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--sources", default="arxiv_cs_ai,arxiv_cs_cl") + parser.add_argument("--output") + args = parser.parse_args() + + sources = [s.strip() for s in args.sources.split(",")] + all_items = [] + + for source_name in sources: + if source_name not in SOURCE_REGISTRY: + print(f"[WARN] Unknown source: {source_name}") + continue + adapter = SOURCE_REGISTRY[source_name]() + items = adapter.fetch() + all_items.extend(items) + print(f"[INFO] {source_name}: {len(items)} items") + + all_items.sort(key=lambda x: x.published, reverse=True) + + output = { + "metadata": { + "count": len(all_items), + "sources": sources, + "generated": datetime.now().isoformat() + }, + "items": [asdict(i) for i in all_items] + } + + if args.output: + Path(args.output).write_text(json.dumps(output, indent=2)) + else: + print(json.dumps(output, indent=2)) + + +if __name__ == "__main__": + main()