#!/usr/bin/env python3 """deepdive_aggregator.py — Phase 1: Intelligence source aggregation. Issue #830.""" import argparse import json import xml.etree.ElementTree as ET from dataclasses import dataclass, asdict from datetime import datetime from typing import List, Optional from pathlib import Path import urllib.request @dataclass class RawItem: source: str title: str url: str content: str published: str authors: Optional[str] = None categories: Optional[List[str]] = None class ArxivRSSAdapter: def __init__(self, category: str): self.name = f"arxiv_{category}" self.url = f"http://export.arxiv.org/rss/{category}" def fetch(self) -> List[RawItem]: try: with urllib.request.urlopen(self.url, timeout=30) as resp: xml_content = resp.read() except Exception as e: print(f"Error fetching {self.url}: {e}") return [] items = [] try: root = ET.fromstring(xml_content) channel = root.find("channel") if channel is None: return items for item in channel.findall("item"): title = item.findtext("title", default="") link = item.findtext("link", default="") desc = item.findtext("description", default="") pub_date = item.findtext("pubDate", default="") items.append(RawItem( source=self.name, title=title.strip(), url=link, content=desc[:2000], published=self._parse_date(pub_date), categories=[self.category] )) except ET.ParseError as e: print(f"Parse error: {e}") return items def _parse_date(self, date_str: str) -> str: from email.utils import parsedate_to_datetime try: dt = parsedate_to_datetime(date_str) return dt.isoformat() except: return datetime.now().isoformat() SOURCE_REGISTRY = { "arxiv_cs_ai": lambda: ArxivRSSAdapter("cs.AI"), "arxiv_cs_cl": lambda: ArxivRSSAdapter("cs.CL"), "arxiv_cs_lg": lambda: ArxivRSSAdapter("cs.LG"), } def main(): parser = argparse.ArgumentParser() parser.add_argument("--sources", default="arxiv_cs_ai,arxiv_cs_cl") parser.add_argument("--output") args = parser.parse_args() sources = [s.strip() for s in args.sources.split(",")] all_items = [] for source_name in sources: if source_name not in SOURCE_REGISTRY: print(f"[WARN] Unknown source: {source_name}") continue adapter = SOURCE_REGISTRY[source_name]() items = adapter.fetch() all_items.extend(items) print(f"[INFO] {source_name}: {len(items)} items") all_items.sort(key=lambda x: x.published, reverse=True) output = { "metadata": { "count": len(all_items), "sources": sources, "generated": datetime.now().isoformat() }, "items": [asdict(i) for i in all_items] } if args.output: Path(args.output).write_text(json.dumps(output, indent=2)) else: print(json.dumps(output, indent=2)) if __name__ == "__main__": main()