diff --git a/intelligence/deepdive/config.yaml b/intelligence/deepdive/config.yaml index b65cdd48..bcef6079 100644 --- a/intelligence/deepdive/config.yaml +++ b/intelligence/deepdive/config.yaml @@ -108,7 +108,7 @@ deepdive: min_chosen_rejected_ratio: 1.3 # Chosen must be ≥1.3x longer than rejected max_chosen_rejected_similarity: 0.70 # Max Jaccard overlap between chosen/rejected max_prompt_prompt_similarity: 0.85 # Max Jaccard overlap between prompts (dedup) - dedup_history_files: 5 # How many recent JSONL files to scan for cross-run dedup + dedup_full_history: true # Persistent index covers ALL historical JSONL (no sliding window) # Phase 0: Fleet Context Grounding fleet_context: diff --git a/intelligence/deepdive/dedup_index.py b/intelligence/deepdive/dedup_index.py new file mode 100644 index 00000000..e0b2bf70 --- /dev/null +++ b/intelligence/deepdive/dedup_index.py @@ -0,0 +1,372 @@ +#!/usr/bin/env python3 +"""Persistent DPO Prompt Deduplication Index. + +Maintains a full-history hash index of every prompt ever exported, +preventing overfitting from accumulating duplicate training pairs +across arbitrarily many overnight runs. + +Design: + - Append-only JSON index file alongside the JSONL training data + - On export: new prompt hashes appended (no full rescan) + - On load: integrity check against disk manifest; incremental + ingestion of any JSONL files not yet indexed + - rebuild() forces full rescan of all historical JSONL files + - Zero external dependencies (stdlib only) + +Storage format (.dpo_dedup_index.json): + { + "version": 2, + "created_at": "2026-04-13T...", + "last_updated": "2026-04-13T...", + "indexed_files": ["deepdive_20260412.jsonl", ...], + "prompt_hashes": ["a1b2c3d4e5f6", ...], + "stats": {"total_prompts": 142, "total_files": 12} + } + +Usage: + from dedup_index import DedupIndex + + idx = DedupIndex(output_dir) # Loads or builds automatically + idx.contains("hash") # O(1) lookup + idx.add_hashes(["h1", "h2"]) # Append after export + idx.register_file("new.jsonl") # Track which files are indexed + idx.rebuild() # Full rescan from disk + +Standalone CLI: + python3 dedup_index.py ~/.timmy/training-data/dpo-pairs/ --rebuild + python3 dedup_index.py ~/.timmy/training-data/dpo-pairs/ --stats +""" + +import hashlib +import json +import logging +from datetime import datetime, timezone +from pathlib import Path +from typing import Dict, List, Optional, Set + +logger = logging.getLogger("deepdive.dedup_index") + +INDEX_FILENAME = ".dpo_dedup_index.json" +INDEX_VERSION = 2 + +# JSONL filename patterns to scan (covers both deepdive and twitter archive) +JSONL_PATTERNS = ["deepdive_*.jsonl", "pairs_*.jsonl"] + + +class DedupIndex: + """Persistent full-history prompt deduplication index. + + Backed by a JSON file in the training data directory. + Loads lazily on first access, rebuilds automatically if missing. + """ + + def __init__(self, output_dir: Path, auto_load: bool = True): + self.output_dir = Path(output_dir) + self.index_path = self.output_dir / INDEX_FILENAME + + self._hashes: Set[str] = set() + self._indexed_files: Set[str] = set() + self._created_at: Optional[str] = None + self._last_updated: Optional[str] = None + self._loaded: bool = False + + if auto_load: + self._ensure_loaded() + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def contains(self, prompt_hash: str) -> bool: + """Check if a prompt hash exists in the full history.""" + self._ensure_loaded() + return prompt_hash in self._hashes + + def contains_any(self, prompt_hashes: List[str]) -> Dict[str, bool]: + """Batch lookup. Returns {hash: True/False} for each input.""" + self._ensure_loaded() + return {h: h in self._hashes for h in prompt_hashes} + + def add_hashes(self, hashes: List[str]) -> int: + """Append new prompt hashes to the index. Returns count added.""" + self._ensure_loaded() + before = len(self._hashes) + self._hashes.update(hashes) + added = len(self._hashes) - before + if added > 0: + self._save() + logger.debug(f"Added {added} new hashes to dedup index") + return added + + def register_file(self, filename: str) -> None: + """Mark a JSONL file as indexed (prevents re-scanning).""" + self._ensure_loaded() + self._indexed_files.add(filename) + self._save() + + def add_hashes_and_register(self, hashes: List[str], filename: str) -> int: + """Atomic: append hashes + register file in one save.""" + self._ensure_loaded() + before = len(self._hashes) + self._hashes.update(hashes) + self._indexed_files.add(filename) + added = len(self._hashes) - before + self._save() + return added + + def rebuild(self) -> Dict[str, int]: + """Full rebuild: scan ALL JSONL files in output_dir from scratch. + + Returns stats dict with counts. + """ + logger.info(f"Rebuilding dedup index from {self.output_dir}") + self._hashes.clear() + self._indexed_files.clear() + self._created_at = datetime.now(timezone.utc).isoformat() + + files_scanned = 0 + prompts_indexed = 0 + + all_jsonl = self._discover_jsonl_files() + for path in sorted(all_jsonl): + file_hashes = self._extract_hashes_from_file(path) + self._hashes.update(file_hashes) + self._indexed_files.add(path.name) + files_scanned += 1 + prompts_indexed += len(file_hashes) + + self._save() + + stats = { + "files_scanned": files_scanned, + "unique_prompts": len(self._hashes), + "total_prompts_seen": prompts_indexed, + } + logger.info( + f"Rebuild complete: {files_scanned} files, " + f"{len(self._hashes)} unique prompt hashes " + f"({prompts_indexed} total including dupes)" + ) + return stats + + @property + def size(self) -> int: + """Number of unique prompt hashes in the index.""" + self._ensure_loaded() + return len(self._hashes) + + @property + def files_indexed(self) -> int: + """Number of JSONL files tracked in the index.""" + self._ensure_loaded() + return len(self._indexed_files) + + def stats(self) -> Dict: + """Return index statistics.""" + self._ensure_loaded() + return { + "version": INDEX_VERSION, + "index_path": str(self.index_path), + "unique_prompts": len(self._hashes), + "files_indexed": len(self._indexed_files), + "created_at": self._created_at, + "last_updated": self._last_updated, + } + + # ------------------------------------------------------------------ + # Internal: load / save / sync + # ------------------------------------------------------------------ + + def _ensure_loaded(self) -> None: + """Load index if not yet loaded. Build if missing.""" + if self._loaded: + return + + if self.index_path.exists(): + self._load() + # Check for un-indexed files and ingest them + self._sync_incremental() + else: + # No index exists — build from scratch + if self.output_dir.exists(): + self.rebuild() + else: + # Empty dir, nothing to index + self._created_at = datetime.now(timezone.utc).isoformat() + self._loaded = True + self._save() + + def _load(self) -> None: + """Load index from disk.""" + try: + with open(self.index_path, "r") as f: + data = json.load(f) + + version = data.get("version", 1) + if version < INDEX_VERSION: + logger.info(f"Index version {version} < {INDEX_VERSION}, rebuilding") + self.rebuild() + return + + self._hashes = set(data.get("prompt_hashes", [])) + self._indexed_files = set(data.get("indexed_files", [])) + self._created_at = data.get("created_at") + self._last_updated = data.get("last_updated") + self._loaded = True + + logger.info( + f"Loaded dedup index: {len(self._hashes)} hashes, " + f"{len(self._indexed_files)} files" + ) + except (json.JSONDecodeError, KeyError, TypeError) as e: + logger.warning(f"Corrupt dedup index, rebuilding: {e}") + self.rebuild() + + def _save(self) -> None: + """Persist index to disk.""" + self.output_dir.mkdir(parents=True, exist_ok=True) + self._last_updated = datetime.now(timezone.utc).isoformat() + + data = { + "version": INDEX_VERSION, + "created_at": self._created_at or self._last_updated, + "last_updated": self._last_updated, + "indexed_files": sorted(self._indexed_files), + "prompt_hashes": sorted(self._hashes), + "stats": { + "total_prompts": len(self._hashes), + "total_files": len(self._indexed_files), + }, + } + + # Atomic write: write to temp then rename + tmp_path = self.index_path.with_suffix(".tmp") + with open(tmp_path, "w") as f: + json.dump(data, f, indent=2) + tmp_path.rename(self.index_path) + + def _sync_incremental(self) -> None: + """Find JSONL files on disk not in the index and ingest them.""" + on_disk = self._discover_jsonl_files() + unindexed = [p for p in on_disk if p.name not in self._indexed_files] + + if not unindexed: + self._loaded = True + return + + logger.info(f"Incremental sync: {len(unindexed)} new files to index") + new_hashes = 0 + for path in sorted(unindexed): + file_hashes = self._extract_hashes_from_file(path) + self._hashes.update(file_hashes) + self._indexed_files.add(path.name) + new_hashes += len(file_hashes) + + self._loaded = True + self._save() + logger.info( + f"Incremental sync complete: +{len(unindexed)} files, " + f"+{new_hashes} prompt hashes (total: {len(self._hashes)})" + ) + + def _discover_jsonl_files(self) -> List[Path]: + """Find all JSONL training data files in output_dir.""" + if not self.output_dir.exists(): + return [] + + files = [] + for pattern in JSONL_PATTERNS: + files.extend(self.output_dir.glob(pattern)) + return sorted(set(files)) + + @staticmethod + def _extract_hashes_from_file(path: Path) -> List[str]: + """Extract prompt hashes from a single JSONL file.""" + hashes = [] + try: + with open(path) as f: + for line in f: + line = line.strip() + if not line: + continue + try: + pair = json.loads(line) + prompt = pair.get("prompt", "") + if prompt: + normalized = " ".join(prompt.lower().split()) + h = hashlib.sha256(normalized.encode()).hexdigest()[:16] + hashes.append(h) + except json.JSONDecodeError: + continue + except Exception as e: + logger.warning(f"Failed to read {path}: {e}") + return hashes + + @staticmethod + def hash_prompt(prompt: str) -> str: + """Compute the canonical prompt hash (same algorithm as validator).""" + normalized = " ".join(prompt.lower().split()) + return hashlib.sha256(normalized.encode()).hexdigest()[:16] + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def main(): + import argparse + + parser = argparse.ArgumentParser( + description="DPO dedup index management" + ) + parser.add_argument( + "output_dir", type=Path, + help="Path to DPO pairs directory" + ) + parser.add_argument( + "--rebuild", action="store_true", + help="Force full rebuild from all JSONL files" + ) + parser.add_argument( + "--stats", action="store_true", + help="Print index statistics" + ) + parser.add_argument( + "--json", action="store_true", + help="Output as JSON" + ) + args = parser.parse_args() + + if not args.output_dir.exists(): + print(f"Error: directory not found: {args.output_dir}") + return 1 + + idx = DedupIndex(args.output_dir, auto_load=not args.rebuild) + + if args.rebuild: + result = idx.rebuild() + if args.json: + print(json.dumps(result, indent=2)) + else: + print(f"Rebuilt index: {result['files_scanned']} files, " + f"{result['unique_prompts']} unique prompts") + + s = idx.stats() + if args.json: + print(json.dumps(s, indent=2)) + else: + print("=" * 50) + print(" DPO DEDUP INDEX") + print("=" * 50) + print(f" Path: {s['index_path']}") + print(f" Unique prompts: {s['unique_prompts']}") + print(f" Files indexed: {s['files_indexed']}") + print(f" Created: {s['created_at']}") + print(f" Last updated: {s['last_updated']}") + print("=" * 50) + + return 0 + + +if __name__ == "__main__": + exit(main()) diff --git a/intelligence/deepdive/dpo_generator.py b/intelligence/deepdive/dpo_generator.py index 3a9bc33f..52a11981 100644 --- a/intelligence/deepdive/dpo_generator.py +++ b/intelligence/deepdive/dpo_generator.py @@ -413,6 +413,16 @@ class DPOPairGenerator: output_path = self.export(pairs, session_id) + # Register exported hashes in the persistent dedup index + if self.validator: + try: + exported_dicts = [p.to_dict() for p in pairs] + self.validator.register_exported_hashes( + exported_dicts, output_path.name + ) + except Exception as e: + logger.warning(f"Failed to register hashes in dedup index: {e}") + # Summary by task type type_counts = {} for p in pairs: diff --git a/intelligence/deepdive/dpo_quality.py b/intelligence/deepdive/dpo_quality.py index 0f9a6991..196cf313 100644 --- a/intelligence/deepdive/dpo_quality.py +++ b/intelligence/deepdive/dpo_quality.py @@ -26,6 +26,14 @@ from dataclasses import dataclass, field, asdict from pathlib import Path from typing import Any, Dict, List, Optional, Set +# Persistent dedup index +try: + from dedup_index import DedupIndex + HAS_DEDUP_INDEX = True +except ImportError: + HAS_DEDUP_INDEX = False + DedupIndex = None + logger = logging.getLogger("deepdive.dpo_quality") @@ -46,8 +54,9 @@ DEFAULT_CONFIG = { "max_chosen_rejected_similarity": 0.70, # Flag if chosen ≈ rejected "max_prompt_prompt_similarity": 0.85, # Flag if two prompts are near-dupes - # Cross-run dedup: hash window (how many recent JSONL files to scan) - "dedup_history_files": 5, + # Cross-run dedup: full-history persistent index + # (replaces the old sliding-window approach) + "dedup_full_history": True, # What to do with flagged pairs: "drop" or "flag" # "drop" = remove from export entirely @@ -125,13 +134,27 @@ class DPOQualityValidator: self.cfg = {**DEFAULT_CONFIG, **(config or {})} self.output_dir = Path(output_dir) if output_dir else Path.home() / ".timmy" / "training-data" / "dpo-pairs" - # Cache of prompt hashes from previous runs (loaded lazily) + # Persistent full-history dedup index + self._dedup_index = None + if HAS_DEDUP_INDEX and self.cfg.get("dedup_full_history", True): + try: + self._dedup_index = DedupIndex(self.output_dir) + logger.info( + f"Full-history dedup index: {self._dedup_index.size} prompts, " + f"{self._dedup_index.files_indexed} files" + ) + except Exception as e: + logger.warning(f"Failed to load dedup index, falling back to in-memory: {e}") + self._dedup_index = None + + # Fallback: in-memory hash cache (used if index unavailable) self._history_hashes: Optional[Set[str]] = None logger.info( f"DPOQualityValidator: action={self.cfg['flagged_pair_action']}, " f"max_cr_sim={self.cfg['max_chosen_rejected_similarity']}, " - f"max_pp_sim={self.cfg['max_prompt_prompt_similarity']}" + f"max_pp_sim={self.cfg['max_prompt_prompt_similarity']}, " + f"dedup={'full-history index' if self._dedup_index else 'in-memory fallback'}" ) # ------------------------------------------------------------------- @@ -278,57 +301,78 @@ class DPOQualityValidator: return dupe_warnings - def _load_history_hashes(self) -> Set[str]: - """Load prompt hashes from recent JSONL files for cross-run dedup.""" - if self._history_hashes is not None: - return self._history_hashes - - hashes = set() - if not self.output_dir.exists(): - self._history_hashes = hashes - return hashes - - jsonl_files = sorted( - self.output_dir.glob("deepdive_*.jsonl"), - key=lambda p: p.stat().st_mtime, - reverse=True, - ) - - files_to_scan = jsonl_files[:self.cfg["dedup_history_files"]] - for path in files_to_scan: - try: - with open(path) as f: - for line in f: - line = line.strip() - if not line: - continue - pair = json.loads(line) - prompt_hash = self._content_hash(pair.get("prompt", "")) - hashes.add(prompt_hash) - except Exception as e: - logger.warning(f"Failed to read history file {path}: {e}") - - logger.info(f"Loaded {len(hashes)} prompt hashes from {len(files_to_scan)} history files") - self._history_hashes = hashes - return hashes - def _check_cross_run_dupes(self, pairs: List[Dict[str, Any]]) -> Dict[int, str]: - """Check if any pair prompts already exist in recent history. + """Check if any pair prompts exist in full training history. + + Uses persistent DedupIndex when available (covers all historical + JSONL files). Falls back to in-memory scan of ALL files if index + module is unavailable. Returns dict mapping pair index → warning string for duplicates. """ - history = self._load_history_hashes() - if not history: - return {} - dupe_warnings: Dict[int, str] = {} + + if self._dedup_index: + # Full-history lookup via persistent index + for i, pair in enumerate(pairs): + prompt_hash = self._content_hash(pair.get("prompt", "")) + if self._dedup_index.contains(prompt_hash): + dupe_warnings[i] = ( + f"cross-run duplicate (prompt seen in full history — " + f"{self._dedup_index.size} indexed prompts)" + ) + return dupe_warnings + + # Fallback: scan all JSONL files in output_dir (no sliding window) + if self._history_hashes is None: + self._history_hashes = set() + if self.output_dir.exists(): + jsonl_files = sorted(self.output_dir.glob("deepdive_*.jsonl")) + jsonl_files.extend(sorted(self.output_dir.glob("pairs_*.jsonl"))) + for path in jsonl_files: + try: + with open(path) as f: + for line in f: + line = line.strip() + if not line: + continue + pair_data = json.loads(line) + h = self._content_hash(pair_data.get("prompt", "")) + self._history_hashes.add(h) + except Exception as e: + logger.warning(f"Failed to read history file {path}: {e}") + logger.info( + f"Fallback dedup: loaded {len(self._history_hashes)} hashes " + f"from {len(jsonl_files)} files" + ) + for i, pair in enumerate(pairs): prompt_hash = self._content_hash(pair.get("prompt", "")) - if prompt_hash in history: - dupe_warnings[i] = "cross-run duplicate (prompt seen in recent history)" + if prompt_hash in self._history_hashes: + dupe_warnings[i] = "cross-run duplicate (prompt seen in full history)" return dupe_warnings + def register_exported_hashes(self, pairs: List[Dict[str, Any]], + filename: str) -> None: + """After successful export, register new prompt hashes in the index. + + Called by DPOPairGenerator after writing the JSONL file. + """ + hashes = [self._content_hash(p.get("prompt", "")) for p in pairs] + + if self._dedup_index: + added = self._dedup_index.add_hashes_and_register(hashes, filename) + logger.info( + f"Registered {added} new hashes in dedup index " + f"(total: {self._dedup_index.size})" + ) + else: + # Update in-memory fallback + if self._history_hashes is None: + self._history_hashes = set() + self._history_hashes.update(hashes) + # ------------------------------------------------------------------- # Main validation entry point # -------------------------------------------------------------------