[ezra] Phase 2: Relevance scoring for Deep Dive (#830)
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled

This commit is contained in:
2026-04-05 05:16:33 +00:00
parent 2b06e179d1
commit cbf05e1fc8

246
bin/deepdive_filter.py Normal file
View File

@@ -0,0 +1,246 @@
#!/usr/bin/env python3
"""
Deep Dive Phase 2: Relevance Filtering
Scores and filters entries by Hermes/Timmy relevance.
Usage:
deepdive_filter.py --input PATH --output PATH [--top-n N]
"""
import argparse
import json
import re
from pathlib import Path
from typing import List, Dict, Tuple
from dataclasses import dataclass
from collections import Counter
try:
from sentence_transformers import SentenceTransformer, util
EMBEDDINGS_AVAILABLE = True
except ImportError:
EMBEDDINGS_AVAILABLE = False
print("[WARN] sentence-transformers not available, keyword-only mode")
@dataclass
class ScoredEntry:
entry: dict
relevance_score: float
keyword_score: float
embedding_score: float = 0.0
keywords_matched: List[str] = None
reasons: List[str] = None
class KeywordScorer:
"""Scores entries by keyword matching."""
WEIGHTS = {
"high": 3.0,
"medium": 1.5,
"low": 0.5
}
KEYWORDS = {
"high": [
"hermes", "timmy", "timmy foundation",
"langchain", "llm agent", "agent framework",
"multi-agent", "agent orchestration",
"reinforcement learning", "RLHF", "DPO", "GRPO",
"tool use", "tool calling", "function calling",
"chain-of-thought", "reasoning", "planning",
"fine-tuning", "instruction tuning",
"alignment", "safety"
],
"medium": [
"llm", "large language model", "transformer",
"inference optimization", "quantization", "distillation",
"rag", "retrieval augmented", "vector database",
"context window", "prompt engineering",
"mcp", "model context protocol",
"openai", "anthropic", "claude", "gpt",
"training", "foundation model"
],
"low": [
"ai", "artificial intelligence",
"machine learning", "deep learning",
"neural network"
]
}
def score(self, entry: dict) -> Tuple[float, List[str], List[str]]:
"""Return (score, matched_keywords, reasons)."""
text = f"{entry.get('title', '')} {entry.get('summary', '')}".lower()
matched = []
reasons = []
total_score = 0.0
for tier, keywords in self.KEYWORDS.items():
weight = self.WEIGHTS[tier]
for keyword in keywords:
if keyword.lower() in text:
matched.append(keyword)
total_score += weight
if len(reasons) < 3: # Limit reasons
reasons.append(f"Keyword '{keyword}' ({tier} priority)")
# Bonus for arXiv AI/CL/LG papers
if entry.get('source', '').startswith('arxiv'):
total_score += 0.5
reasons.append("arXiv AI paper (category bonus)")
# Normalize score (roughly 0-10 scale)
normalized = min(10.0, total_score)
return normalized, matched, reasons
class EmbeddingScorer:
"""Scores entries by embedding similarity to Hermes context."""
HERMES_CONTEXT = [
"Hermes agent framework for autonomous AI systems",
"Tool calling and function use in LLMs",
"Multi-agent orchestration and communication",
"Reinforcement learning from human feedback",
"LLM fine-tuning and alignment",
"Model context protocol and agent tools",
"Open source AI agent systems",
]
def __init__(self):
if not EMBEDDINGS_AVAILABLE:
self.model = None
self.context_embeddings = None
return
print("[INFO] Loading embedding model...")
self.model = SentenceTransformer('all-MiniLM-L6-v2')
self.context_embeddings = self.model.encode(
self.HERMES_CONTEXT, convert_to_tensor=True
)
def score(self, entry: dict) -> float:
"""Return similarity score 0-1."""
if not EMBEDDINGS_AVAILABLE or not self.model:
return 0.0
text = f"{entry.get('title', '')}. {entry.get('summary', '')}"
if not text.strip():
return 0.0
entry_embedding = self.model.encode(text, convert_to_tensor=True)
similarities = util.cos_sim(entry_embedding, self.context_embeddings)
max_sim = float(similarities.max())
return max_sim
class RelevanceFilter:
"""Main filtering orchestrator."""
def __init__(self, use_embeddings: bool = True):
self.keyword_scorer = KeywordScorer()
self.embedding_scorer = EmbeddingScorer() if use_embeddings else None
# Combined weights
self.weights = {
"keyword": 0.6,
"embedding": 0.4
}
def rank_entries(self, entries: List[dict]) -> List[ScoredEntry]:
"""Rank all entries by relevance."""
scored = []
for entry in entries:
kw_score, keywords, reasons = self.keyword_scorer.score(entry)
emb_score = 0.0
if self.embedding_scorer:
emb_score = self.embedding_scorer.score(entry)
# Convert 0-1 to 0-10 scale
emb_score = emb_score * 10
# Combined score
combined = (
self.weights["keyword"] * kw_score +
self.weights["embedding"] * emb_score
)
scored.append(ScoredEntry(
entry=entry,
relevance_score=combined,
keyword_score=kw_score,
embedding_score=emb_score,
keywords_matched=keywords,
reasons=reasons
))
# Sort by relevance (descending)
scored.sort(key=lambda x: x.relevance_score, reverse=True)
return scored
def filter_top_n(self, entries: List[dict], n: int = 15, threshold: float = 2.0) -> List[ScoredEntry]:
"""Filter to top N entries above threshold."""
scored = self.rank_entries(entries)
# Filter by threshold
above_threshold = [s for s in scored if s.relevance_score >= threshold]
# Take top N
result = above_threshold[:n]
print(f"[INFO] Filtered {len(entries)}{len(result)} (threshold={threshold})")
return result
def main():
parser = argparse.ArgumentParser(description="Deep Dive: Relevance Filtering")
parser.add_argument("--input", "-i", type=Path, required=True, help="Input JSONL from aggregator")
parser.add_argument("--output", "-o", type=Path, required=True, help="Output JSONL with scores")
parser.add_argument("--top-n", "-n", type=int, default=15, help="Number of top entries to keep")
parser.add_argument("--threshold", "-t", type=float, default=2.0, help="Minimum relevance score")
parser.add_argument("--no-embeddings", action="store_true", help="Disable embedding scoring")
args = parser.parse_args()
print(f"[Deep Dive] Phase 2: Filtering relevance from {args.input}")
# Load entries
entries = []
with open(args.input) as f:
for line in f:
entries.append(json.loads(line))
print(f"[INFO] Loaded {len(entries)} entries")
# Filter
filter_engine = RelevanceFilter(use_embeddings=not args.no_embeddings)
filtered = filter_engine.filter_top_n(entries, n=args.top_n, threshold=args.threshold)
# Save results
args.output.parent.mkdir(parents=True, exist_ok=True)
with open(args.output, "w") as f:
for item in filtered:
f.write(json.dumps({
"entry": item.entry,
"relevance_score": item.relevance_score,
"keyword_score": item.keyword_score,
"embedding_score": item.embedding_score,
"keywords_matched": item.keywords_matched,
"reasons": item.reasons
}) + "\n")
print(f"[SUCCESS] Phase 2 complete: {len(filtered)} entries written to {args.output}")
# Show top 5
print("\nTop 5 entries:")
for item in filtered[:5]:
title = item.entry.get('title', 'Unknown')[:60]
print(f" [{item.relevance_score:.1f}] {title}...")
if __name__ == "__main__":
main()