Files
ezra-environment/the-nexus/deepdive/bin/phase1_aggregate.py
Ezra 9f010ad044 [BURN] Deep Dive scaffold: 5-phase sovereign NotebookLM (#830)
Complete production-ready scaffold for automated daily AI intelligence briefings:

- Phase 1: Source aggregation (arXiv + lab blogs)
- Phase 2: Relevance ranking (keyword + source authority scoring)
- Phase 3: LLM synthesis (Hermes-context briefing generation)
- Phase 4: TTS audio (edge-tts/OpenAI/ElevenLabs)
- Phase 5: Telegram delivery (voice message)

Deliverables:
- docs/ARCHITECTURE.md (9000+ lines) - system design
- docs/OPERATIONS.md - runbook and troubleshooting
- 5 executable phase scripts (bin/)
- Full pipeline orchestrator (run_full_pipeline.py)
- requirements.txt, README.md

Addresses all 9 acceptance criteria from #830.
Ready for host selection, credential config, and cron activation.

Author: Ezra | Burn mode | 2026-04-05
2026-04-05 05:48:12 +00:00

192 lines
6.5 KiB
Python

#!/usr/bin/env python3
"""
Deep Dive Phase 1: Source Aggregation Layer
Aggregates research sources from arXiv, lab blogs, and newsletters.
Usage:
python phase1_aggregate.py [--date YYYY-MM-DD] [--output-dir DIR]
Issue: the-nexus#830
"""
import argparse
import asyncio
import hashlib
import json
import os
import xml.etree.ElementTree as ET
from dataclasses import asdict, dataclass
from datetime import datetime
from pathlib import Path
from typing import List, Optional
from urllib.parse import urljoin
import aiohttp
import feedparser
@dataclass
class SourceItem:
"""A single source item (paper, blog post, etc.)"""
id: str
title: str
url: str
source: str # 'arxiv', 'openai', 'anthropic', 'deepmind', etc.
published: str # ISO format date
summary: str
authors: List[str]
categories: List[str]
raw_content: str = ""
class ArXIVAggregator:
"""Aggregate from arXiv RSS feeds for CS categories."""
CATEGORIES = ['cs.AI', 'cs.CL', 'cs.LG']
BASE_URL = "http://export.arxiv.org/rss/"
async def fetch(self, session: aiohttp.ClientSession) -> List[SourceItem]:
items = []
for cat in self.CATEGORIES:
url = f"{self.BASE_URL}{cat}"
try:
async with session.get(url, timeout=30) as resp:
if resp.status == 200:
content = await resp.text()
items.extend(self._parse(content, cat))
except Exception as e:
print(f"[ERROR] arXiv {cat}: {e}")
return items
def _parse(self, content: str, category: str) -> List[SourceItem]:
items = []
try:
feed = feedparser.parse(content)
for entry in feed.entries:
item = SourceItem(
id=entry.get('id', entry.get('link', '')),
title=entry.get('title', ''),
url=entry.get('link', ''),
source=f'arxiv-{category}',
published=entry.get('published', entry.get('updated', '')),
summary=entry.get('summary', '')[:2000],
authors=[a.get('name', '') for a in entry.get('authors', [])],
categories=[t.get('term', '') for t in entry.get('tags', [])],
raw_content=entry.get('summary', '')
)
items.append(item)
except Exception as e:
print(f"[ERROR] Parse arXiv RSS: {e}")
return items
class BlogAggregator:
"""Aggregate from major AI lab blogs via RSS/Atom."""
SOURCES = {
'openai': 'https://openai.com/blog/rss.xml',
'anthropic': 'https://www.anthropic.com/news.atom',
'deepmind': 'https://deepmind.google/blog/rss.xml',
'google-research': 'https://research.google/blog/rss/',
}
async def fetch(self, session: aiohttp.ClientSession) -> List[SourceItem]:
items = []
for source, url in self.SOURCES.items():
try:
async with session.get(url, timeout=30) as resp:
if resp.status == 200:
content = await resp.text()
items.extend(self._parse(content, source))
except Exception as e:
print(f"[ERROR] {source}: {e}")
return items
def _parse(self, content: str, source: str) -> List[SourceItem]:
items = []
try:
feed = feedparser.parse(content)
for entry in feed.entries[:10]: # Limit to recent 10 per source
item = SourceItem(
id=entry.get('id', entry.get('link', '')),
title=entry.get('title', ''),
url=entry.get('link', ''),
source=source,
published=entry.get('published', entry.get('updated', '')),
summary=entry.get('summary', '')[:2000],
authors=[a.get('name', '') for a in entry.get('authors', [])],
categories=[],
raw_content=entry.get('content', [{'value': ''}])[0].get('value', '')[:5000]
)
items.append(item)
except Exception as e:
print(f"[ERROR] Parse {source}: {e}")
return items
class SourceAggregator:
"""Main aggregation orchestrator."""
def __init__(self, output_dir: Path, date: str):
self.output_dir = output_dir
self.date = date
self.sources_dir = output_dir / "sources" / date
self.sources_dir.mkdir(parents=True, exist_ok=True)
async def run(self) -> List[SourceItem]:
"""Run full aggregation pipeline."""
print(f"[Phase 1] Aggregating sources for {self.date}")
all_items = []
async with aiohttp.ClientSession() as session:
# Parallel fetch from all sources
arxiv_agg = ArXIVAggregator()
blog_agg = BlogAggregator()
arxiv_task = arxiv_agg.fetch(session)
blog_task = blog_agg.fetch(session)
results = await asyncio.gather(arxiv_task, blog_task, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print(f"[ERROR] Aggregation failed: {result}")
else:
all_items.extend(result)
print(f"[Phase 1] Total items aggregated: {len(all_items)}")
# Save to disk
self._save(all_items)
return all_items
def _save(self, items: List[SourceItem]):
"""Save aggregated items to JSON."""
output_file = self.sources_dir / "aggregated.json"
data = {
'date': self.date,
'generated_at': datetime.now().isoformat(),
'count': len(items),
'items': [asdict(item) for item in items]
}
with open(output_file, 'w') as f:
json.dump(data, f, indent=2)
print(f"[Phase 1] Saved to {output_file}")
def main():
parser = argparse.ArgumentParser(description='Deep Dive Phase 1: Source Aggregation')
parser.add_argument('--date', default=datetime.now().strftime('%Y-%m-%d'),
help='Target date (YYYY-MM-DD)')
parser.add_argument('--output-dir', type=Path, default=Path('../data'),
help='Output directory for data')
args = parser.parse_args()
aggregator = SourceAggregator(args.output_dir, args.date)
asyncio.run(aggregator.run())
if __name__ == '__main__':
main()