From 990ba26662649a439047dd6ecb01b25a73b42d2b Mon Sep 17 00:00:00 2001 From: Ezra Date: Sun, 5 Apr 2026 08:58:27 +0000 Subject: [PATCH] [EZRA BURN-MODE] Phase 1: arXiv RSS aggregator (PROOF-OF-CONCEPT) --- scaffold/deepdive/phase1/arxiv_aggregator.py | 176 +++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 scaffold/deepdive/phase1/arxiv_aggregator.py diff --git a/scaffold/deepdive/phase1/arxiv_aggregator.py b/scaffold/deepdive/phase1/arxiv_aggregator.py new file mode 100644 index 0000000..2b3467c --- /dev/null +++ b/scaffold/deepdive/phase1/arxiv_aggregator.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 +""" +arXiv RSS Aggregator — Phase 1 Proof-of-Concept +Parent: the-nexus#830 +Created: 2026-04-05 by Ezra + +This is a ZERO-DEPENDENCY proof-of-concept for the Deep Dive source aggregation layer. +It fetches arXiv RSS feeds for cs.AI, cs.CL, cs.LG and stores items as JSON lines. + +Can run TODAY with no API keys, no GPU, no TTS decisions. +""" + +import json +import xml.etree.ElementTree as ET +import urllib.request +import urllib.error +from datetime import datetime, timezone +from pathlib import Path +from typing import List, Dict, Any + +# arXiv RSS feeds for target categories +FEEDS = { + "cs.AI": "http://export.arxiv.org/rss/cs.AI", + "cs.CL": "http://export.arxiv.org/rss/cs.CL", + "cs.LG": "http://export.arxiv.org/rss/cs.LG", +} + +# Storage paths (relative to project root) +RAW_DIR = Path("data/deepdive/raw") + + +def fetch_feed(category: str, url: str) -> str: + """Fetch RSS feed with rate limit respect (1 req per 3 sec min).""" + req = urllib.request.Request( + url, + headers={ + "User-Agent": "DeepDiveBot/0.1 (research aggregator; ezra@timmy.local)" + } + ) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return resp.read().decode("utf-8") + except urllib.error.HTTPError as e: + if e.code == 403: + print(f"RATE LIMITED on {category}: arXiv is blocking. Wait 1 hour.") + raise + + +def parse_arxiv_rss(xml_content: str, category: str) -> List[Dict[str, Any]]: + """Parse arXiv RSS into structured items.""" + root = ET.fromstring(xml_content) + + # Handle RSS namespace + ns = {"": "http://www.w3.org/2005/Atom"} # arXiv uses Atom namespace + + items = [] + for entry in root.findall(".//{http://www.w3.org/2005/Atom}entry"): + item = { + "id": entry.findtext("{http://www.w3.org/2005/Atom}id", ""), + "title": entry.findtext("{http://www.w3.org/2005/Atom}title", "").strip(), + "summary": entry.findtext("{http://www.w3.org/2005/Atom}summary", "").strip(), + "published": entry.findtext("{http://www.w3.org/2005/Atom}published", ""), + "updated": entry.findtext("{http://www.w3.org/2005/Atom}updated", ""), + "category": category, + "authors": [], + "links": [], + "fetched_at": datetime.now(timezone.utc).isoformat(), + } + + # Extract authors + for author in entry.findall("{http://www.w3.org/2005/Atom}author"): + name = author.findtext("{http://www.w3.org/2005/Atom}name", "") + if name: + item["authors"].append(name) + + # Extract links (PDF, abstract) + for link in entry.findall("{http://www.w3.org/2005/Atom}link"): + href = link.get("href", "") + rel = link.get("rel", "") + title = link.get("title", "") + item["links"].append({"href": href, "rel": rel, "title": title}) + + items.append(item) + + return items + + +def dedupe_items(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Remove duplicate papers across categories.""" + seen = set() + unique = [] + for item in items: + paper_id = item["id"].split("/")[-1].split("v")[0] # Extract arXiv ID + if paper_id not in seen: + seen.add(paper_id) + item["arxiv_id"] = paper_id + unique.append(item) + return unique + + +def save_items(items: List[Dict[str, Any]], output_dir: Path) -> Path: + """Save items as JSON lines file.""" + output_dir.mkdir(parents=True, exist_ok=True) + + today = datetime.now(timezone.utc).strftime("%Y-%m-%d") + output_file = output_dir / f"arxiv-{today}.jsonl" + + with open(output_file, "w") as f: + for item in items: + f.write(json.dumps(item, ensure_ascii=False) + "\n") + + return output_file + + +def load_existing_ids(output_dir: Path) -> set: + """Load existing arXiv IDs to prevent re-fetching.""" + existing = set() + if not output_dir.exists(): + return existing + + for f in output_dir.glob("arxiv-*.jsonl"): + with open(f) as fp: + for line in fp: + try: + item = json.loads(line) + existing.add(item.get("arxiv_id", "")) + except json.JSONDecodeError: + continue + return existing + + +def main(): + """Run daily aggregation.""" + print("Deep Dive — Phase 1: arXiv Aggregation") + print("=" * 50) + + RAW_DIR.mkdir(parents=True, exist_ok=True) + existing_ids = load_existing_ids(RAW_DIR) + + all_items = [] + + for category, url in FEEDS.items(): + print(f"\nFetching {category}...") + try: + # Respect arXiv rate limits (be polite) + import time + time.sleep(3) # 1 req per 3 seconds minimum + + xml = fetch_feed(category, url) + items = parse_arxiv_rss(xml, category) + + # Filter existing + new_items = [i for i in items if i.get("arxiv_id") not in existing_ids] + + print(f" Found {len(items)} items, {len(new_items)} new") + all_items.extend(new_items) + + except Exception as e: + print(f" ERROR: {e}") + + # Deduplicate across categories + unique_items = dedupe_items(all_items) + + # Save + if unique_items: + output_file = save_items(unique_items, RAW_DIR) + print(f"\n✅ Saved {len(unique_items)} items to {output_file}") + else: + print("\n⚠️ No new items found") + + return len(unique_items) + + +if __name__ == "__main__": + count = main() + exit(0 if count >= 0 else 1)