[ezra] Deep Dive scaffold #830: deepdive_aggregator.py
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled

This commit is contained in:
2026-04-05 01:51:02 +00:00
parent 8ba0bdd2f6
commit 9ba00b7ea8

116
bin/deepdive_aggregator.py Normal file
View File

@@ -0,0 +1,116 @@
#!/usr/bin/env python3
"""deepdive_aggregator.py — Phase 1: Intelligence source aggregation. Issue #830."""
import argparse
import json
import xml.etree.ElementTree as ET
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import List, Optional
from pathlib import Path
import urllib.request
@dataclass
class RawItem:
source: str
title: str
url: str
content: str
published: str
authors: Optional[str] = None
categories: Optional[List[str]] = None
class ArxivRSSAdapter:
def __init__(self, category: str):
self.name = f"arxiv_{category}"
self.url = f"http://export.arxiv.org/rss/{category}"
def fetch(self) -> List[RawItem]:
try:
with urllib.request.urlopen(self.url, timeout=30) as resp:
xml_content = resp.read()
except Exception as e:
print(f"Error fetching {self.url}: {e}")
return []
items = []
try:
root = ET.fromstring(xml_content)
channel = root.find("channel")
if channel is None:
return items
for item in channel.findall("item"):
title = item.findtext("title", default="")
link = item.findtext("link", default="")
desc = item.findtext("description", default="")
pub_date = item.findtext("pubDate", default="")
items.append(RawItem(
source=self.name,
title=title.strip(),
url=link,
content=desc[:2000],
published=self._parse_date(pub_date),
categories=[self.category]
))
except ET.ParseError as e:
print(f"Parse error: {e}")
return items
def _parse_date(self, date_str: str) -> str:
from email.utils import parsedate_to_datetime
try:
dt = parsedate_to_datetime(date_str)
return dt.isoformat()
except:
return datetime.now().isoformat()
SOURCE_REGISTRY = {
"arxiv_cs_ai": lambda: ArxivRSSAdapter("cs.AI"),
"arxiv_cs_cl": lambda: ArxivRSSAdapter("cs.CL"),
"arxiv_cs_lg": lambda: ArxivRSSAdapter("cs.LG"),
}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--sources", default="arxiv_cs_ai,arxiv_cs_cl")
parser.add_argument("--output")
args = parser.parse_args()
sources = [s.strip() for s in args.sources.split(",")]
all_items = []
for source_name in sources:
if source_name not in SOURCE_REGISTRY:
print(f"[WARN] Unknown source: {source_name}")
continue
adapter = SOURCE_REGISTRY[source_name]()
items = adapter.fetch()
all_items.extend(items)
print(f"[INFO] {source_name}: {len(items)} items")
all_items.sort(key=lambda x: x.published, reverse=True)
output = {
"metadata": {
"count": len(all_items),
"sources": sources,
"generated": datetime.now().isoformat()
},
"items": [asdict(i) for i in all_items]
}
if args.output:
Path(args.output).write_text(json.dumps(output, indent=2))
else:
print(json.dumps(output, indent=2))
if __name__ == "__main__":
main()