[ezra] Deep Dive scaffold for #830: deepdive_aggregator.py
This commit is contained in:
116
bin/deepdive_aggregator.py
Normal file
116
bin/deepdive_aggregator.py
Normal file
@@ -0,0 +1,116 @@
|
||||
#!/usr/bin/env python3
|
||||
"""deepdive_aggregator.py — Phase 1: Intelligence source aggregation. Issue #830."""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import xml.etree.ElementTree as ET
|
||||
from dataclasses import dataclass, asdict
|
||||
from datetime import datetime
|
||||
from typing import List, Optional
|
||||
from pathlib import Path
|
||||
import urllib.request
|
||||
|
||||
|
||||
@dataclass
|
||||
class RawItem:
|
||||
source: str
|
||||
title: str
|
||||
url: str
|
||||
content: str
|
||||
published: str
|
||||
authors: Optional[str] = None
|
||||
categories: Optional[List[str]] = None
|
||||
|
||||
|
||||
class ArxivRSSAdapter:
|
||||
def __init__(self, category: str):
|
||||
self.name = f"arxiv_{category}"
|
||||
self.url = f"http://export.arxiv.org/rss/{category}"
|
||||
|
||||
def fetch(self) -> List[RawItem]:
|
||||
try:
|
||||
with urllib.request.urlopen(self.url, timeout=30) as resp:
|
||||
xml_content = resp.read()
|
||||
except Exception as e:
|
||||
print(f"Error fetching {self.url}: {e}")
|
||||
return []
|
||||
|
||||
items = []
|
||||
try:
|
||||
root = ET.fromstring(xml_content)
|
||||
channel = root.find("channel")
|
||||
if channel is None:
|
||||
return items
|
||||
|
||||
for item in channel.findall("item"):
|
||||
title = item.findtext("title", default="")
|
||||
link = item.findtext("link", default="")
|
||||
desc = item.findtext("description", default="")
|
||||
pub_date = item.findtext("pubDate", default="")
|
||||
|
||||
items.append(RawItem(
|
||||
source=self.name,
|
||||
title=title.strip(),
|
||||
url=link,
|
||||
content=desc[:2000],
|
||||
published=self._parse_date(pub_date),
|
||||
categories=[self.category]
|
||||
))
|
||||
except ET.ParseError as e:
|
||||
print(f"Parse error: {e}")
|
||||
|
||||
return items
|
||||
|
||||
def _parse_date(self, date_str: str) -> str:
|
||||
from email.utils import parsedate_to_datetime
|
||||
try:
|
||||
dt = parsedate_to_datetime(date_str)
|
||||
return dt.isoformat()
|
||||
except:
|
||||
return datetime.now().isoformat()
|
||||
|
||||
|
||||
SOURCE_REGISTRY = {
|
||||
"arxiv_cs_ai": lambda: ArxivRSSAdapter("cs.AI"),
|
||||
"arxiv_cs_cl": lambda: ArxivRSSAdapter("cs.CL"),
|
||||
"arxiv_cs_lg": lambda: ArxivRSSAdapter("cs.LG"),
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--sources", default="arxiv_cs_ai,arxiv_cs_cl")
|
||||
parser.add_argument("--output")
|
||||
args = parser.parse_args()
|
||||
|
||||
sources = [s.strip() for s in args.sources.split(",")]
|
||||
all_items = []
|
||||
|
||||
for source_name in sources:
|
||||
if source_name not in SOURCE_REGISTRY:
|
||||
print(f"[WARN] Unknown source: {source_name}")
|
||||
continue
|
||||
adapter = SOURCE_REGISTRY[source_name]()
|
||||
items = adapter.fetch()
|
||||
all_items.extend(items)
|
||||
print(f"[INFO] {source_name}: {len(items)} items")
|
||||
|
||||
all_items.sort(key=lambda x: x.published, reverse=True)
|
||||
|
||||
output = {
|
||||
"metadata": {
|
||||
"count": len(all_items),
|
||||
"sources": sources,
|
||||
"generated": datetime.now().isoformat()
|
||||
},
|
||||
"items": [asdict(i) for i in all_items]
|
||||
}
|
||||
|
||||
if args.output:
|
||||
Path(args.output).write_text(json.dumps(output, indent=2))
|
||||
else:
|
||||
print(json.dumps(output, indent=2))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user