#!/usr/bin/env python3 """ arXiv RSS Aggregator — Phase 1 Proof-of-Concept Parent: the-nexus#830 Created: 2026-04-05 by Ezra This is a ZERO-DEPENDENCY proof-of-concept for the Deep Dive source aggregation layer. It fetches arXiv RSS feeds for cs.AI, cs.CL, cs.LG and stores items as JSON lines. Can run TODAY with no API keys, no GPU, no TTS decisions. """ import json import xml.etree.ElementTree as ET import urllib.request import urllib.error from datetime import datetime, timezone from pathlib import Path from typing import List, Dict, Any # arXiv RSS feeds for target categories FEEDS = { "cs.AI": "http://export.arxiv.org/rss/cs.AI", "cs.CL": "http://export.arxiv.org/rss/cs.CL", "cs.LG": "http://export.arxiv.org/rss/cs.LG", } # Storage paths (relative to project root) RAW_DIR = Path("data/deepdive/raw") def fetch_feed(category: str, url: str) -> str: """Fetch RSS feed with rate limit respect (1 req per 3 sec min).""" req = urllib.request.Request( url, headers={ "User-Agent": "DeepDiveBot/0.1 (research aggregator; ezra@timmy.local)" } ) try: with urllib.request.urlopen(req, timeout=30) as resp: return resp.read().decode("utf-8") except urllib.error.HTTPError as e: if e.code == 403: print(f"RATE LIMITED on {category}: arXiv is blocking. Wait 1 hour.") raise def parse_arxiv_rss(xml_content: str, category: str) -> List[Dict[str, Any]]: """Parse arXiv RSS into structured items.""" root = ET.fromstring(xml_content) # Handle RSS namespace ns = {"": "http://www.w3.org/2005/Atom"} # arXiv uses Atom namespace items = [] for entry in root.findall(".//{http://www.w3.org/2005/Atom}entry"): item = { "id": entry.findtext("{http://www.w3.org/2005/Atom}id", ""), "title": entry.findtext("{http://www.w3.org/2005/Atom}title", "").strip(), "summary": entry.findtext("{http://www.w3.org/2005/Atom}summary", "").strip(), "published": entry.findtext("{http://www.w3.org/2005/Atom}published", ""), "updated": entry.findtext("{http://www.w3.org/2005/Atom}updated", ""), "category": category, "authors": [], "links": [], "fetched_at": datetime.now(timezone.utc).isoformat(), } # Extract authors for author in entry.findall("{http://www.w3.org/2005/Atom}author"): name = author.findtext("{http://www.w3.org/2005/Atom}name", "") if name: item["authors"].append(name) # Extract links (PDF, abstract) for link in entry.findall("{http://www.w3.org/2005/Atom}link"): href = link.get("href", "") rel = link.get("rel", "") title = link.get("title", "") item["links"].append({"href": href, "rel": rel, "title": title}) items.append(item) return items def dedupe_items(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Remove duplicate papers across categories.""" seen = set() unique = [] for item in items: paper_id = item["id"].split("/")[-1].split("v")[0] # Extract arXiv ID if paper_id not in seen: seen.add(paper_id) item["arxiv_id"] = paper_id unique.append(item) return unique def save_items(items: List[Dict[str, Any]], output_dir: Path) -> Path: """Save items as JSON lines file.""" output_dir.mkdir(parents=True, exist_ok=True) today = datetime.now(timezone.utc).strftime("%Y-%m-%d") output_file = output_dir / f"arxiv-{today}.jsonl" with open(output_file, "w") as f: for item in items: f.write(json.dumps(item, ensure_ascii=False) + "\n") return output_file def load_existing_ids(output_dir: Path) -> set: """Load existing arXiv IDs to prevent re-fetching.""" existing = set() if not output_dir.exists(): return existing for f in output_dir.glob("arxiv-*.jsonl"): with open(f) as fp: for line in fp: try: item = json.loads(line) existing.add(item.get("arxiv_id", "")) except json.JSONDecodeError: continue return existing def main(): """Run daily aggregation.""" print("Deep Dive — Phase 1: arXiv Aggregation") print("=" * 50) RAW_DIR.mkdir(parents=True, exist_ok=True) existing_ids = load_existing_ids(RAW_DIR) all_items = [] for category, url in FEEDS.items(): print(f"\nFetching {category}...") try: # Respect arXiv rate limits (be polite) import time time.sleep(3) # 1 req per 3 seconds minimum xml = fetch_feed(category, url) items = parse_arxiv_rss(xml, category) # Filter existing new_items = [i for i in items if i.get("arxiv_id") not in existing_ids] print(f" Found {len(items)} items, {len(new_items)} new") all_items.extend(new_items) except Exception as e: print(f" ERROR: {e}") # Deduplicate across categories unique_items = dedupe_items(all_items) # Save if unique_items: output_file = save_items(unique_items, RAW_DIR) print(f"\n✅ Saved {len(unique_items)} items to {output_file}") else: print("\n⚠️ No new items found") return len(unique_items) if __name__ == "__main__": count = main() exit(0 if count >= 0 else 1)