#!/usr/bin/env python3 """Merge batch knowledge candidates into a deterministic profile.""" from __future__ import annotations import hashlib import json from collections import defaultdict from .common import ( CANDIDATES_DIR, CHANGES_FILE, PROFILE_FILE, ensure_layout, load_json, load_jsonl, slugify, write_json, append_jsonl, write_progress_snapshot, ) def claim_id(category: str, claim: str) -> str: digest = hashlib.sha1(f"{category}:{claim.strip().lower()}".encode()).hexdigest()[:12] return f"{slugify(category)}-{digest}" def contradiction_matches(target_claim: str, contradiction_hint: str) -> bool: target = target_claim.strip().lower() hint = contradiction_hint.strip().lower() return bool(hint) and (hint in target or target in hint) def main() -> None: ensure_layout() old_profile = load_json(PROFILE_FILE, {"claims": []}) old_by_id = {claim["id"]: claim for claim in old_profile.get("claims", [])} aggregates: dict[str, dict] = {} contradiction_hints: defaultdict[str, list[str]] = defaultdict(list) batch_files = sorted(CANDIDATES_DIR.glob("batch_*.json")) for path in batch_files: batch = load_json(path, {}) batch_id = batch.get("batch_id", path.stem) for candidate in batch.get("knowledge_candidates", []): claim = candidate.get("claim", "").strip() category = candidate.get("category", "recurring-theme").strip() or "recurring-theme" if not claim: continue item_id = claim_id(category, claim) existing = aggregates.setdefault( item_id, { "id": item_id, "category": category, "claim": claim, "evidence_tweet_ids": set(), "evidence_quotes": [], "confidence": 0.0, "status": "provisional", "first_seen_at": batch_id, "last_confirmed_at": batch_id, "contradicts": [], "support_count": 0, }, ) existing["support_count"] += 1 existing["confidence"] = max( existing["confidence"], float(candidate.get("confidence", 0.0) or 0.0) ) existing["last_confirmed_at"] = batch_id for tweet_id in candidate.get("evidence_tweet_ids", []): existing["evidence_tweet_ids"].add(str(tweet_id)) for quote in candidate.get("evidence_quotes", []): if quote and quote not in existing["evidence_quotes"]: existing["evidence_quotes"].append(quote) for hint in candidate.get("contradicts", []) or []: if hint: contradiction_hints[item_id].append(str(hint)) for item_id, claim in aggregates.items(): if claim["confidence"] >= 0.85 or len(claim["evidence_tweet_ids"]) >= 2: claim["status"] = "durable" if contradiction_hints.get(item_id): claim["contradicts"] = sorted(set(contradiction_hints[item_id])) for source_id, hints in contradiction_hints.items(): for hint in hints: for target_id, target in aggregates.items(): if target_id == source_id: continue if contradiction_matches(target["claim"], hint): target["status"] = "retracted" claims = [] for item_id in sorted(aggregates): claim = aggregates[item_id] claims.append( { "id": claim["id"], "category": claim["category"], "claim": claim["claim"], "evidence_tweet_ids": sorted(claim["evidence_tweet_ids"]), "evidence_quotes": claim["evidence_quotes"][:5], "confidence": round(claim["confidence"], 3), "status": claim["status"], "first_seen_at": claim["first_seen_at"], "last_confirmed_at": claim["last_confirmed_at"], "contradicts": claim["contradicts"], } ) profile = { "schema_version": 1, "claim_count": len(claims), "durable_claims": sum(1 for claim in claims if claim["status"] == "durable"), "retracted_claims": sum(1 for claim in claims if claim["status"] == "retracted"), "claims": claims, } write_json(PROFILE_FILE, profile) existing_events = { event.get("event_id") for event in load_jsonl(CHANGES_FILE) if event.get("event_id") } new_events = [] for claim in claims: previous = old_by_id.get(claim["id"]) if previous == claim: continue event_id = f"{claim['id']}:{claim['status']}:{claim['last_confirmed_at']}" if event_id in existing_events: continue new_events.append( { "event_id": event_id, "claim_id": claim["id"], "change_type": "new" if previous is None else "updated", "status": claim["status"], "batch_id": claim["last_confirmed_at"], } ) append_jsonl(CHANGES_FILE, new_events) snapshot = write_progress_snapshot() print(json.dumps({"status": "ok", "claims": len(claims), "events": len(new_events), "progress": snapshot}, sort_keys=True)) if __name__ == "__main__": main()