diff --git a/intelligence/deepdive/pipeline.py b/intelligence/deepdive/pipeline.py index 6a4dc00..9aa8e16 100644 --- a/intelligence/deepdive/pipeline.py +++ b/intelligence/deepdive/pipeline.py @@ -16,7 +16,7 @@ import logging import re import tempfile from dataclasses import dataclass, asdict -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import List, Dict, Optional, Any import os @@ -107,7 +107,66 @@ class RSSAggregator: return datetime(*parsed_time[:6]) except: pass - return datetime.utcnow() + return datetime.now(timezone.utc).replace(tzinfo=None) + + def _fetch_arxiv_api(self, category: str, max_items: int = 50) -> List[FeedItem]: + """Fallback to arXiv API when RSS is empty.""" + import urllib.request + import xml.etree.ElementTree as ET + + api_url = f"http://export.arxiv.org/api/query?search_query=cat:{category}&sortBy=submittedDate&sortOrder=descending&start=0&max_results={max_items}" + logger.info(f"ArXiv RSS empty, falling back to API: {category}") + + try: + req = urllib.request.Request(api_url, headers={'User-Agent': 'DeepDiveBot/1.0'}) + with urllib.request.urlopen(req, timeout=self.timeout) as resp: + data = resp.read().decode('utf-8') + + ns = {'atom': 'http://www.w3.org/2005/Atom'} + root = ET.fromstring(data) + items = [] + + for entry in root.findall('atom:entry', ns)[:max_items]: + title = entry.find('atom:title', ns) + title = title.text.replace('\n', ' ').strip() if title is not None else 'Untitled' + + summary = entry.find('atom:summary', ns) + summary = summary.text.strip() if summary is not None else '' + + link = entry.find('atom:id', ns) + link = link.text.strip() if link is not None else '' + + published = entry.find('atom:published', ns) + published_text = published.text if published is not None else None + + content = f"{title}{summary}" + content_hash = self._compute_hash(content) + + if published_text: + try: + pub_dt = datetime.fromisoformat(published_text.replace('Z', '+00:00')).replace(tzinfo=None) + except Exception: + pub_dt = datetime.now(timezone.utc).replace(tzinfo=None) + else: + pub_dt = datetime.now(timezone.utc).replace(tzinfo=None) + + item = FeedItem( + title=title, + summary=summary, + url=link, + source=f"arxiv_api_{category}", + published=pub_dt, + content_hash=content_hash, + raw={'published': published_text} + ) + items.append(item) + + logger.info(f"Fetched {len(items)} items from arXiv API fallback") + return items + + except Exception as e: + logger.error(f"ArXiv API fallback failed: {e}") + return [] async def fetch_feed(self, url: str, name: str, since: Optional[datetime] = None, @@ -115,7 +174,10 @@ class RSSAggregator: """Fetch single feed with caching. Returns normalized items.""" if not HAS_FEEDPARSER: - logger.error("feedparser not installed. Run: pip install feedparser") + logger.warning("feedparser not installed — using API fallback") + if 'arxiv' in name.lower() and 'arxiv.org/rss' in url: + category = url.split('/')[-1] if '/' in url else 'cs.AI' + return self._fetch_arxiv_api(category, max_items) return [] logger.info(f"Fetching {name}: {url}") @@ -151,6 +213,11 @@ class RSSAggregator: ) items.append(item) + # ArXiv API fallback for empty RSS + if not items and 'arxiv' in name.lower() and 'arxiv.org/rss' in url: + category = url.split('/')[-1] if '/' in url else 'cs.AI' + items = self._fetch_arxiv_api(category, max_items) + logger.info(f"Fetched {len(items)} items from {name}") return items @@ -372,7 +439,7 @@ Context: Hermes agents run locally with Gemma 4, sovereign infrastructure.""" 'headline': f"Deep Dive: {len(items)} items, top score {items[0][1]:.2f}", 'briefing': synthesis, 'sources': [item[0].to_dict() for item in items], - 'generated_at': datetime.utcnow().isoformat() + 'generated_at': datetime.now(timezone.utc).isoformat() } @@ -468,9 +535,40 @@ class TelegramDelivery: return False def deliver_voice(self, audio_path: Path) -> bool: - # TODO: Implement multipart voice message upload - logger.info(f"Voice delivery: {audio_path} (implement with requests file upload)") - return True + """Deliver audio file as Telegram voice message using multipart upload.""" + if not HAS_HTTPX or not httpx: + logger.error("httpx not installed") + return False + + try: + import mimetypes + mime, _ = mimetypes.guess_type(str(audio_path)) + mime = mime or "audio/ogg" + + with open(audio_path, "rb") as f: + files = { + "voice": (audio_path.name, f, mime), + } + data = { + "chat_id": self.chat_id, + } + resp = httpx.post( + f"{self.base_url}/sendVoice", + data=data, + files=files, + timeout=60.0 + ) + + if resp.status_code == 200: + logger.info("Telegram voice delivery successful") + return True + else: + logger.error(f"Telegram voice delivery failed: {resp.text}") + return False + + except Exception as e: + logger.error(f"Telegram voice delivery error: {e}") + return False # ============================================================================ @@ -482,29 +580,30 @@ class DeepDivePipeline: def __init__(self, config: Dict[str, Any]): self.config = config + # Config may be wrapped under 'deepdive' key or flat + self.cfg = config.get('deepdive', config) self.cache_dir = Path.home() / ".cache" / "deepdive" self.cache_dir.mkdir(parents=True, exist_ok=True) self.aggregator = RSSAggregator(self.cache_dir) - relevance_config = config.get('relevance', {}) + relevance_config = self.cfg.get('relevance', {}) self.scorer = RelevanceScorer(relevance_config.get('model', 'all-MiniLM-L6-v2')) - llm_endpoint = config.get('synthesis', {}).get('llm_endpoint', 'http://localhost:11435/v1') + llm_endpoint = self.cfg.get('synthesis', {}).get('llm_endpoint', 'http://localhost:11435/v1') self.synthesizer = SynthesisEngine(llm_endpoint) self.audio_gen = AudioGenerator() - delivery_config = config.get('delivery', {}) + delivery_config = self.cfg.get('delivery', {}) self.telegram = None - if delivery_config.get('telegram_bot_token') and delivery_config.get('telegram_chat_id'): - self.telegram = TelegramDelivery( - delivery_config['telegram_bot_token'], - delivery_config['telegram_chat_id'] - ) + bot_token = delivery_config.get('bot_token') or delivery_config.get('telegram_bot_token') + chat_id = delivery_config.get('channel_id') or delivery_config.get('telegram_chat_id') + if bot_token and chat_id: + self.telegram = TelegramDelivery(bot_token, str(chat_id)) async def run(self, since: Optional[datetime] = None, - dry_run: bool = False) -> Dict[str, Any]: + dry_run: bool = False, force: bool = False) -> Dict[str, Any]: logger.info("="*60) logger.info("DEEP DIVE INTELLIGENCE PIPELINE") @@ -512,37 +611,39 @@ class DeepDivePipeline: # Phase 1 logger.info("Phase 1: Source Aggregation") - sources = self.config.get('sources', []) + sources = self.cfg.get('sources', []) items = await self.aggregator.fetch_all(sources, since) if not items: logger.warning("No items fetched") - return {'status': 'empty', 'items_count': 0} + if not force: + return {'status': 'empty', 'items_count': 0} + logger.info("Force mode enabled — continuing with empty dataset") # Phase 2 logger.info("Phase 2: Relevance Scoring") - relevance_config = self.config.get('relevance', {}) + relevance_config = self.cfg.get('relevance', {}) top_n = relevance_config.get('top_n', 10) min_score = relevance_config.get('min_score', 0.5) ranked = self.scorer.rank(items, top_n=top_n, min_score=min_score) logger.info(f"Selected {len(ranked)} items above threshold {min_score}") - if not ranked: + if not ranked and not force: return {'status': 'filtered', 'items_count': len(items), 'ranked_count': 0} # Phase 3 logger.info("Phase 3: Synthesis") briefing = self.synthesizer.generate_structured(ranked) - timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") briefing_path = self.cache_dir / f"briefing_{timestamp}.json" with open(briefing_path, 'w') as f: json.dump(briefing, f, indent=2) logger.info(f"Briefing saved: {briefing_path}") # Phase 4 - if self.config.get('audio', {}).get('enabled', False): + if self.cfg.get('tts', {}).get('enabled', False) or self.cfg.get('audio', {}).get('enabled', False): logger.info("Phase 4: Audio Generation") audio_path = self.audio_gen.generate(briefing) else: @@ -587,6 +688,8 @@ async def main(): help="Fetch only today's items") parser.add_argument('--since', '-s', type=int, default=24, help='Hours back to fetch (default: 24)') + parser.add_argument('--force', '-f', action='store_true', + help='Run pipeline even if no items are fetched (for testing)') args = parser.parse_args() @@ -598,12 +701,12 @@ async def main(): config = yaml.safe_load(f) if args.today: - since = datetime.utcnow().replace(hour=0, minute=0, second=0) + since = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0).replace(tzinfo=None) else: - since = datetime.utcnow() - timedelta(hours=args.since) + since = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=args.since) pipeline = DeepDivePipeline(config) - result = await pipeline.run(since=since, dry_run=args.dry_run) + result = await pipeline.run(since=since, dry_run=args.dry_run, force=args.force) print("\n" + "="*60) print("PIPELINE RESULT")