Complete production-ready scaffold for automated daily AI intelligence briefings: - Phase 1: Source aggregation (arXiv + lab blogs) - Phase 2: Relevance ranking (keyword + source authority scoring) - Phase 3: LLM synthesis (Hermes-context briefing generation) - Phase 4: TTS audio (edge-tts/OpenAI/ElevenLabs) - Phase 5: Telegram delivery (voice message) Deliverables: - docs/ARCHITECTURE.md (9000+ lines) - system design - docs/OPERATIONS.md - runbook and troubleshooting - 5 executable phase scripts (bin/) - Full pipeline orchestrator (run_full_pipeline.py) - requirements.txt, README.md Addresses all 9 acceptance criteria from #830. Ready for host selection, credential config, and cron activation. Author: Ezra | Burn mode | 2026-04-05
230 lines
7.1 KiB
Python
230 lines
7.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Deep Dive Phase 2: Relevance Engine
|
|
Filters and ranks sources by relevance to Hermes/Timmy mission.
|
|
|
|
Usage:
|
|
python phase2_rank.py [--date YYYY-MM-DD] [--output-dir DIR]
|
|
|
|
Issue: the-nexus#830
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple
|
|
|
|
import numpy as np
|
|
|
|
|
|
@dataclass
|
|
class ScoredItem:
|
|
"""A source item with relevance scores."""
|
|
id: str
|
|
title: str
|
|
url: str
|
|
source: str
|
|
published: str
|
|
summary: str
|
|
authors: List[str]
|
|
categories: List[str]
|
|
scores: Dict[str, float]
|
|
total_score: float
|
|
|
|
|
|
class RelevanceEngine:
|
|
"""Score sources by relevance to Hermes/Timmy work."""
|
|
|
|
# Keywords weighted by importance to Hermes mission
|
|
HERMES_KEYWORDS = {
|
|
# Core (high weight)
|
|
'agent': 1.5,
|
|
'agents': 1.5,
|
|
'multi-agent': 2.0,
|
|
'mcp': 2.0, # Model Context Protocol
|
|
'hermes': 2.5,
|
|
'timmy': 2.5,
|
|
'tool use': 1.8,
|
|
'function calling': 1.8,
|
|
'llm': 1.2,
|
|
'llms': 1.2,
|
|
|
|
# Architecture (medium-high weight)
|
|
'transformer': 1.3,
|
|
'attention': 1.2,
|
|
'fine-tuning': 1.4,
|
|
'rlhf': 1.5,
|
|
'reinforcement learning': 1.5,
|
|
'training': 1.1,
|
|
'inference': 1.1,
|
|
|
|
# Relevance (medium weight)
|
|
'autonomous': 1.3,
|
|
'orchestration': 1.4,
|
|
'workflow': 1.1,
|
|
'pipeline': 1.0,
|
|
'automation': 1.2,
|
|
|
|
# Technical (context weight)
|
|
'rag': 1.2,
|
|
'retrieval': 1.0,
|
|
'embedding': 1.1,
|
|
'vector': 0.9,
|
|
'clustering': 0.8,
|
|
}
|
|
|
|
# Source authority weights
|
|
SOURCE_WEIGHTS = {
|
|
'arxiv-cs.AI': 1.2,
|
|
'arxiv-cs.CL': 1.1,
|
|
'arxiv-cs.LG': 1.15,
|
|
'openai': 1.0,
|
|
'anthropic': 1.0,
|
|
'deepmind': 1.0,
|
|
'google-research': 0.95,
|
|
}
|
|
|
|
def __init__(self, output_dir: Path, date: str):
|
|
self.output_dir = output_dir
|
|
self.date = date
|
|
self.sources_dir = output_dir / "sources" / date
|
|
self.ranked_dir = output_dir / "ranked"
|
|
self.ranked_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def load_sources(self) -> List[dict]:
|
|
"""Load aggregated sources from Phase 1."""
|
|
source_file = self.sources_dir / "aggregated.json"
|
|
if not source_file.exists():
|
|
raise FileNotFoundError(f"Phase 1 output not found: {source_file}")
|
|
|
|
with open(source_file) as f:
|
|
data = json.load(f)
|
|
|
|
return data.get('items', [])
|
|
|
|
def calculate_keyword_score(self, item: dict) -> float:
|
|
"""Calculate keyword match score."""
|
|
text = f"{item.get('title', '')} {item.get('summary', '')}"
|
|
text_lower = text.lower()
|
|
|
|
score = 0.0
|
|
for keyword, weight in self.HERMES_KEYWORDS.items():
|
|
count = len(re.findall(r'\b' + re.escape(keyword.lower()) + r'\b', text_lower))
|
|
score += count * weight
|
|
|
|
return min(score, 10.0) # Cap at 10
|
|
|
|
def calculate_source_score(self, item: dict) -> float:
|
|
"""Calculate source authority score."""
|
|
source = item.get('source', '')
|
|
return self.SOURCE_WEIGHTS.get(source, 0.8)
|
|
|
|
def calculate_recency_score(self, item: dict) -> float:
|
|
"""Calculate recency score (higher for more recent)."""
|
|
# Simplified: all items from today get full score
|
|
# Could parse dates for more nuance
|
|
return 1.0
|
|
|
|
def score_item(self, item: dict) -> ScoredItem:
|
|
"""Calculate full relevance scores for an item."""
|
|
keyword_score = self.calculate_keyword_score(item)
|
|
source_score = self.calculate_source_score(item)
|
|
recency_score = self.calculate_recency_score(item)
|
|
|
|
# Weighted total
|
|
total_score = (
|
|
keyword_score * 0.5 +
|
|
source_score * 0.3 +
|
|
recency_score * 0.2
|
|
)
|
|
|
|
return ScoredItem(
|
|
id=item.get('id', ''),
|
|
title=item.get('title', ''),
|
|
url=item.get('url', ''),
|
|
source=item.get('source', ''),
|
|
published=item.get('published', ''),
|
|
summary=item.get('summary', '')[:500],
|
|
authors=item.get('authors', []),
|
|
categories=item.get('categories', []),
|
|
scores={
|
|
'keyword': round(keyword_score, 2),
|
|
'source': round(source_score, 2),
|
|
'recency': round(recency_score, 2),
|
|
},
|
|
total_score=round(total_score, 2)
|
|
)
|
|
|
|
def rank_items(self, items: List[dict], top_n: int = 20) -> List[ScoredItem]:
|
|
"""Score and rank all items."""
|
|
scored = [self.score_item(item) for item in items]
|
|
scored.sort(key=lambda x: x.total_score, reverse=True)
|
|
return scored[:top_n]
|
|
|
|
def save_ranked(self, items: List[ScoredItem]):
|
|
"""Save ranked items to JSON."""
|
|
output_file = self.ranked_dir / f"{self.date}.json"
|
|
|
|
data = {
|
|
'date': self.date,
|
|
'generated_at': datetime.now().isoformat(),
|
|
'count': len(items),
|
|
'items': [
|
|
{
|
|
'id': item.id,
|
|
'title': item.title,
|
|
'url': item.url,
|
|
'source': item.source,
|
|
'published': item.published,
|
|
'summary': item.summary,
|
|
'scores': item.scores,
|
|
'total_score': item.total_score,
|
|
}
|
|
for item in items
|
|
]
|
|
}
|
|
|
|
with open(output_file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
print(f"[Phase 2] Saved ranked items to {output_file}")
|
|
|
|
def run(self, top_n: int = 20) -> List[ScoredItem]:
|
|
"""Run full ranking pipeline."""
|
|
print(f"[Phase 2] Ranking sources for {self.date}")
|
|
|
|
sources = self.load_sources()
|
|
print(f"[Phase 2] Loaded {len(sources)} sources")
|
|
|
|
ranked = self.rank_items(sources, top_n)
|
|
print(f"[Phase 2] Top {len(ranked)} items selected")
|
|
|
|
# Print top 5 for visibility
|
|
print("\n[Phase 2] Top 5 Sources:")
|
|
for i, item in enumerate(ranked[:5], 1):
|
|
print(f" {i}. [{item.total_score:.1f}] {item.title[:60]}...")
|
|
|
|
self.save_ranked(ranked)
|
|
return ranked
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Deep Dive Phase 2: Relevance Engine')
|
|
parser.add_argument('--date', default=datetime.now().strftime('%Y-%m-%d'),
|
|
help='Target date (YYYY-MM-DD)')
|
|
parser.add_argument('--output-dir', type=Path, default=Path('../data'),
|
|
help='Output directory for data')
|
|
parser.add_argument('--top-n', type=int, default=20,
|
|
help='Number of top items to keep')
|
|
args = parser.parse_args()
|
|
|
|
engine = RelevanceEngine(args.output_dir, args.date)
|
|
engine.run(args.top_n)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|