[ezra] #830: Fix config wrapper, add arXiv API fallback, implement voice delivery, fix datetime
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
This commit is contained in:
@@ -16,7 +16,7 @@ import logging
|
||||
import re
|
||||
import tempfile
|
||||
from dataclasses import dataclass, asdict
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Optional, Any
|
||||
import os
|
||||
@@ -107,7 +107,66 @@ class RSSAggregator:
|
||||
return datetime(*parsed_time[:6])
|
||||
except:
|
||||
pass
|
||||
return datetime.utcnow()
|
||||
return datetime.now(timezone.utc).replace(tzinfo=None)
|
||||
|
||||
def _fetch_arxiv_api(self, category: str, max_items: int = 50) -> List[FeedItem]:
|
||||
"""Fallback to arXiv API when RSS is empty."""
|
||||
import urllib.request
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
api_url = f"http://export.arxiv.org/api/query?search_query=cat:{category}&sortBy=submittedDate&sortOrder=descending&start=0&max_results={max_items}"
|
||||
logger.info(f"ArXiv RSS empty, falling back to API: {category}")
|
||||
|
||||
try:
|
||||
req = urllib.request.Request(api_url, headers={'User-Agent': 'DeepDiveBot/1.0'})
|
||||
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
|
||||
data = resp.read().decode('utf-8')
|
||||
|
||||
ns = {'atom': 'http://www.w3.org/2005/Atom'}
|
||||
root = ET.fromstring(data)
|
||||
items = []
|
||||
|
||||
for entry in root.findall('atom:entry', ns)[:max_items]:
|
||||
title = entry.find('atom:title', ns)
|
||||
title = title.text.replace('\n', ' ').strip() if title is not None else 'Untitled'
|
||||
|
||||
summary = entry.find('atom:summary', ns)
|
||||
summary = summary.text.strip() if summary is not None else ''
|
||||
|
||||
link = entry.find('atom:id', ns)
|
||||
link = link.text.strip() if link is not None else ''
|
||||
|
||||
published = entry.find('atom:published', ns)
|
||||
published_text = published.text if published is not None else None
|
||||
|
||||
content = f"{title}{summary}"
|
||||
content_hash = self._compute_hash(content)
|
||||
|
||||
if published_text:
|
||||
try:
|
||||
pub_dt = datetime.fromisoformat(published_text.replace('Z', '+00:00')).replace(tzinfo=None)
|
||||
except Exception:
|
||||
pub_dt = datetime.now(timezone.utc).replace(tzinfo=None)
|
||||
else:
|
||||
pub_dt = datetime.now(timezone.utc).replace(tzinfo=None)
|
||||
|
||||
item = FeedItem(
|
||||
title=title,
|
||||
summary=summary,
|
||||
url=link,
|
||||
source=f"arxiv_api_{category}",
|
||||
published=pub_dt,
|
||||
content_hash=content_hash,
|
||||
raw={'published': published_text}
|
||||
)
|
||||
items.append(item)
|
||||
|
||||
logger.info(f"Fetched {len(items)} items from arXiv API fallback")
|
||||
return items
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"ArXiv API fallback failed: {e}")
|
||||
return []
|
||||
|
||||
async def fetch_feed(self, url: str, name: str,
|
||||
since: Optional[datetime] = None,
|
||||
@@ -115,7 +174,10 @@ class RSSAggregator:
|
||||
"""Fetch single feed with caching. Returns normalized items."""
|
||||
|
||||
if not HAS_FEEDPARSER:
|
||||
logger.error("feedparser not installed. Run: pip install feedparser")
|
||||
logger.warning("feedparser not installed — using API fallback")
|
||||
if 'arxiv' in name.lower() and 'arxiv.org/rss' in url:
|
||||
category = url.split('/')[-1] if '/' in url else 'cs.AI'
|
||||
return self._fetch_arxiv_api(category, max_items)
|
||||
return []
|
||||
|
||||
logger.info(f"Fetching {name}: {url}")
|
||||
@@ -151,6 +213,11 @@ class RSSAggregator:
|
||||
)
|
||||
items.append(item)
|
||||
|
||||
# ArXiv API fallback for empty RSS
|
||||
if not items and 'arxiv' in name.lower() and 'arxiv.org/rss' in url:
|
||||
category = url.split('/')[-1] if '/' in url else 'cs.AI'
|
||||
items = self._fetch_arxiv_api(category, max_items)
|
||||
|
||||
logger.info(f"Fetched {len(items)} items from {name}")
|
||||
return items
|
||||
|
||||
@@ -372,7 +439,7 @@ Context: Hermes agents run locally with Gemma 4, sovereign infrastructure."""
|
||||
'headline': f"Deep Dive: {len(items)} items, top score {items[0][1]:.2f}",
|
||||
'briefing': synthesis,
|
||||
'sources': [item[0].to_dict() for item in items],
|
||||
'generated_at': datetime.utcnow().isoformat()
|
||||
'generated_at': datetime.now(timezone.utc).isoformat()
|
||||
}
|
||||
|
||||
|
||||
@@ -468,9 +535,40 @@ class TelegramDelivery:
|
||||
return False
|
||||
|
||||
def deliver_voice(self, audio_path: Path) -> bool:
|
||||
# TODO: Implement multipart voice message upload
|
||||
logger.info(f"Voice delivery: {audio_path} (implement with requests file upload)")
|
||||
"""Deliver audio file as Telegram voice message using multipart upload."""
|
||||
if not HAS_HTTPX or not httpx:
|
||||
logger.error("httpx not installed")
|
||||
return False
|
||||
|
||||
try:
|
||||
import mimetypes
|
||||
mime, _ = mimetypes.guess_type(str(audio_path))
|
||||
mime = mime or "audio/ogg"
|
||||
|
||||
with open(audio_path, "rb") as f:
|
||||
files = {
|
||||
"voice": (audio_path.name, f, mime),
|
||||
}
|
||||
data = {
|
||||
"chat_id": self.chat_id,
|
||||
}
|
||||
resp = httpx.post(
|
||||
f"{self.base_url}/sendVoice",
|
||||
data=data,
|
||||
files=files,
|
||||
timeout=60.0
|
||||
)
|
||||
|
||||
if resp.status_code == 200:
|
||||
logger.info("Telegram voice delivery successful")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"Telegram voice delivery failed: {resp.text}")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Telegram voice delivery error: {e}")
|
||||
return False
|
||||
|
||||
|
||||
# ============================================================================
|
||||
@@ -482,29 +580,30 @@ class DeepDivePipeline:
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
self.config = config
|
||||
# Config may be wrapped under 'deepdive' key or flat
|
||||
self.cfg = config.get('deepdive', config)
|
||||
self.cache_dir = Path.home() / ".cache" / "deepdive"
|
||||
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self.aggregator = RSSAggregator(self.cache_dir)
|
||||
|
||||
relevance_config = config.get('relevance', {})
|
||||
relevance_config = self.cfg.get('relevance', {})
|
||||
self.scorer = RelevanceScorer(relevance_config.get('model', 'all-MiniLM-L6-v2'))
|
||||
|
||||
llm_endpoint = config.get('synthesis', {}).get('llm_endpoint', 'http://localhost:11435/v1')
|
||||
llm_endpoint = self.cfg.get('synthesis', {}).get('llm_endpoint', 'http://localhost:11435/v1')
|
||||
self.synthesizer = SynthesisEngine(llm_endpoint)
|
||||
|
||||
self.audio_gen = AudioGenerator()
|
||||
|
||||
delivery_config = config.get('delivery', {})
|
||||
delivery_config = self.cfg.get('delivery', {})
|
||||
self.telegram = None
|
||||
if delivery_config.get('telegram_bot_token') and delivery_config.get('telegram_chat_id'):
|
||||
self.telegram = TelegramDelivery(
|
||||
delivery_config['telegram_bot_token'],
|
||||
delivery_config['telegram_chat_id']
|
||||
)
|
||||
bot_token = delivery_config.get('bot_token') or delivery_config.get('telegram_bot_token')
|
||||
chat_id = delivery_config.get('channel_id') or delivery_config.get('telegram_chat_id')
|
||||
if bot_token and chat_id:
|
||||
self.telegram = TelegramDelivery(bot_token, str(chat_id))
|
||||
|
||||
async def run(self, since: Optional[datetime] = None,
|
||||
dry_run: bool = False) -> Dict[str, Any]:
|
||||
dry_run: bool = False, force: bool = False) -> Dict[str, Any]:
|
||||
|
||||
logger.info("="*60)
|
||||
logger.info("DEEP DIVE INTELLIGENCE PIPELINE")
|
||||
@@ -512,37 +611,39 @@ class DeepDivePipeline:
|
||||
|
||||
# Phase 1
|
||||
logger.info("Phase 1: Source Aggregation")
|
||||
sources = self.config.get('sources', [])
|
||||
sources = self.cfg.get('sources', [])
|
||||
items = await self.aggregator.fetch_all(sources, since)
|
||||
|
||||
if not items:
|
||||
logger.warning("No items fetched")
|
||||
if not force:
|
||||
return {'status': 'empty', 'items_count': 0}
|
||||
logger.info("Force mode enabled — continuing with empty dataset")
|
||||
|
||||
# Phase 2
|
||||
logger.info("Phase 2: Relevance Scoring")
|
||||
relevance_config = self.config.get('relevance', {})
|
||||
relevance_config = self.cfg.get('relevance', {})
|
||||
top_n = relevance_config.get('top_n', 10)
|
||||
min_score = relevance_config.get('min_score', 0.5)
|
||||
|
||||
ranked = self.scorer.rank(items, top_n=top_n, min_score=min_score)
|
||||
logger.info(f"Selected {len(ranked)} items above threshold {min_score}")
|
||||
|
||||
if not ranked:
|
||||
if not ranked and not force:
|
||||
return {'status': 'filtered', 'items_count': len(items), 'ranked_count': 0}
|
||||
|
||||
# Phase 3
|
||||
logger.info("Phase 3: Synthesis")
|
||||
briefing = self.synthesizer.generate_structured(ranked)
|
||||
|
||||
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
||||
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
|
||||
briefing_path = self.cache_dir / f"briefing_{timestamp}.json"
|
||||
with open(briefing_path, 'w') as f:
|
||||
json.dump(briefing, f, indent=2)
|
||||
logger.info(f"Briefing saved: {briefing_path}")
|
||||
|
||||
# Phase 4
|
||||
if self.config.get('audio', {}).get('enabled', False):
|
||||
if self.cfg.get('tts', {}).get('enabled', False) or self.cfg.get('audio', {}).get('enabled', False):
|
||||
logger.info("Phase 4: Audio Generation")
|
||||
audio_path = self.audio_gen.generate(briefing)
|
||||
else:
|
||||
@@ -587,6 +688,8 @@ async def main():
|
||||
help="Fetch only today's items")
|
||||
parser.add_argument('--since', '-s', type=int, default=24,
|
||||
help='Hours back to fetch (default: 24)')
|
||||
parser.add_argument('--force', '-f', action='store_true',
|
||||
help='Run pipeline even if no items are fetched (for testing)')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -598,12 +701,12 @@ async def main():
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
if args.today:
|
||||
since = datetime.utcnow().replace(hour=0, minute=0, second=0)
|
||||
since = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0).replace(tzinfo=None)
|
||||
else:
|
||||
since = datetime.utcnow() - timedelta(hours=args.since)
|
||||
since = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=args.since)
|
||||
|
||||
pipeline = DeepDivePipeline(config)
|
||||
result = await pipeline.run(since=since, dry_run=args.dry_run)
|
||||
result = await pipeline.run(since=since, dry_run=args.dry_run, force=args.force)
|
||||
|
||||
print("\n" + "="*60)
|
||||
print("PIPELINE RESULT")
|
||||
|
||||
Reference in New Issue
Block a user