117 lines
3.3 KiB
Python
117 lines
3.3 KiB
Python
#!/usr/bin/env python3
|
|
"""deepdive_aggregator.py — Phase 1: Intelligence source aggregation. Issue #830."""
|
|
|
|
import argparse
|
|
import json
|
|
import xml.etree.ElementTree as ET
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime
|
|
from typing import List, Optional
|
|
from pathlib import Path
|
|
import urllib.request
|
|
|
|
|
|
@dataclass
|
|
class RawItem:
|
|
source: str
|
|
title: str
|
|
url: str
|
|
content: str
|
|
published: str
|
|
authors: Optional[str] = None
|
|
categories: Optional[List[str]] = None
|
|
|
|
|
|
class ArxivRSSAdapter:
|
|
def __init__(self, category: str):
|
|
self.name = f"arxiv_{category}"
|
|
self.url = f"http://export.arxiv.org/rss/{category}"
|
|
|
|
def fetch(self) -> List[RawItem]:
|
|
try:
|
|
with urllib.request.urlopen(self.url, timeout=30) as resp:
|
|
xml_content = resp.read()
|
|
except Exception as e:
|
|
print(f"Error fetching {self.url}: {e}")
|
|
return []
|
|
|
|
items = []
|
|
try:
|
|
root = ET.fromstring(xml_content)
|
|
channel = root.find("channel")
|
|
if channel is None:
|
|
return items
|
|
|
|
for item in channel.findall("item"):
|
|
title = item.findtext("title", default="")
|
|
link = item.findtext("link", default="")
|
|
desc = item.findtext("description", default="")
|
|
pub_date = item.findtext("pubDate", default="")
|
|
|
|
items.append(RawItem(
|
|
source=self.name,
|
|
title=title.strip(),
|
|
url=link,
|
|
content=desc[:2000],
|
|
published=self._parse_date(pub_date),
|
|
categories=[self.category]
|
|
))
|
|
except ET.ParseError as e:
|
|
print(f"Parse error: {e}")
|
|
|
|
return items
|
|
|
|
def _parse_date(self, date_str: str) -> str:
|
|
from email.utils import parsedate_to_datetime
|
|
try:
|
|
dt = parsedate_to_datetime(date_str)
|
|
return dt.isoformat()
|
|
except:
|
|
return datetime.now().isoformat()
|
|
|
|
|
|
SOURCE_REGISTRY = {
|
|
"arxiv_cs_ai": lambda: ArxivRSSAdapter("cs.AI"),
|
|
"arxiv_cs_cl": lambda: ArxivRSSAdapter("cs.CL"),
|
|
"arxiv_cs_lg": lambda: ArxivRSSAdapter("cs.LG"),
|
|
}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--sources", default="arxiv_cs_ai,arxiv_cs_cl")
|
|
parser.add_argument("--output")
|
|
args = parser.parse_args()
|
|
|
|
sources = [s.strip() for s in args.sources.split(",")]
|
|
all_items = []
|
|
|
|
for source_name in sources:
|
|
if source_name not in SOURCE_REGISTRY:
|
|
print(f"[WARN] Unknown source: {source_name}")
|
|
continue
|
|
adapter = SOURCE_REGISTRY[source_name]()
|
|
items = adapter.fetch()
|
|
all_items.extend(items)
|
|
print(f"[INFO] {source_name}: {len(items)} items")
|
|
|
|
all_items.sort(key=lambda x: x.published, reverse=True)
|
|
|
|
output = {
|
|
"metadata": {
|
|
"count": len(all_items),
|
|
"sources": sources,
|
|
"generated": datetime.now().isoformat()
|
|
},
|
|
"items": [asdict(i) for i in all_items]
|
|
}
|
|
|
|
if args.output:
|
|
Path(args.output).write_text(json.dumps(output, indent=2))
|
|
else:
|
|
print(json.dumps(output, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|