Files
the-nexus/intelligence/deepdive/dedup_index.py
perplexity 4b15cf8283
Some checks failed
CI / test (pull_request) Failing after 16s
CI / validate (pull_request) Failing after 14s
Review Approval Gate / verify-review (pull_request) Failing after 3s
feat: full-history persistent dedup index for DPO training pairs
Replace the 5-file sliding window cross-run dedup with a persistent
hash index that covers ALL historical training data. Overfitting risk
compounds across the full dataset — a 5-file window lets old duplicates
leak back into training after enough overnight runs.

New module: dedup_index.py (DedupIndex)
- Persistent JSON index (.dpo_dedup_index.json) alongside JSONL files
- Append-on-export: new prompt hashes registered after each successful
  export — no full rescan needed for normal operations
- Incremental sync: on load, detects JSONL files not yet indexed and
  ingests them automatically (handles files from other tools)
- Full rebuild: rebuild() scans ALL deepdive_*.jsonl + pairs_*.jsonl
  to reconstruct from scratch (first run, corruption recovery)
- Atomic writes (write-to-tmp + rename) to prevent index corruption
- Standalone CLI: python3 dedup_index.py <dir> --rebuild --stats

Modified: dpo_quality.py
- Imports DedupIndex with graceful degradation
- Replaces _load_history_hashes() with persistent index lookup
- Fallback: if index unavailable, scans ALL files in-memory (not just 5)
- New register_exported_hashes() method called after export
- Config key: dedup_full_history (replaces dedup_history_files)

Modified: dpo_generator.py
- Calls validator.register_exported_hashes() after successful export
  to keep the persistent index current without rescanning

Modified: config.yaml
- Replaced dedup_history_files: 5 with dedup_full_history: true

Tested — 7 integration tests:
  ✓ Fresh index build from empty directory
  ✓ Build from 3 existing JSONL files (15 unique hashes)
  ✓ Incremental sync when new file appears between runs
  ✓ Append after export + persistence across reloads
  ✓ Rebuild from scratch (recovers from corruption)
  ✓ Validator catches day-1 dupe from 20-day history (5-file window miss)
  ✓ Full pipeline: generate → validate → export → register → re-run detects
2026-04-13 03:11:10 +00:00

373 lines
12 KiB
Python

#!/usr/bin/env python3
"""Persistent DPO Prompt Deduplication Index.
Maintains a full-history hash index of every prompt ever exported,
preventing overfitting from accumulating duplicate training pairs
across arbitrarily many overnight runs.
Design:
- Append-only JSON index file alongside the JSONL training data
- On export: new prompt hashes appended (no full rescan)
- On load: integrity check against disk manifest; incremental
ingestion of any JSONL files not yet indexed
- rebuild() forces full rescan of all historical JSONL files
- Zero external dependencies (stdlib only)
Storage format (.dpo_dedup_index.json):
{
"version": 2,
"created_at": "2026-04-13T...",
"last_updated": "2026-04-13T...",
"indexed_files": ["deepdive_20260412.jsonl", ...],
"prompt_hashes": ["a1b2c3d4e5f6", ...],
"stats": {"total_prompts": 142, "total_files": 12}
}
Usage:
from dedup_index import DedupIndex
idx = DedupIndex(output_dir) # Loads or builds automatically
idx.contains("hash") # O(1) lookup
idx.add_hashes(["h1", "h2"]) # Append after export
idx.register_file("new.jsonl") # Track which files are indexed
idx.rebuild() # Full rescan from disk
Standalone CLI:
python3 dedup_index.py ~/.timmy/training-data/dpo-pairs/ --rebuild
python3 dedup_index.py ~/.timmy/training-data/dpo-pairs/ --stats
"""
import hashlib
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Set
logger = logging.getLogger("deepdive.dedup_index")
INDEX_FILENAME = ".dpo_dedup_index.json"
INDEX_VERSION = 2
# JSONL filename patterns to scan (covers both deepdive and twitter archive)
JSONL_PATTERNS = ["deepdive_*.jsonl", "pairs_*.jsonl"]
class DedupIndex:
"""Persistent full-history prompt deduplication index.
Backed by a JSON file in the training data directory.
Loads lazily on first access, rebuilds automatically if missing.
"""
def __init__(self, output_dir: Path, auto_load: bool = True):
self.output_dir = Path(output_dir)
self.index_path = self.output_dir / INDEX_FILENAME
self._hashes: Set[str] = set()
self._indexed_files: Set[str] = set()
self._created_at: Optional[str] = None
self._last_updated: Optional[str] = None
self._loaded: bool = False
if auto_load:
self._ensure_loaded()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def contains(self, prompt_hash: str) -> bool:
"""Check if a prompt hash exists in the full history."""
self._ensure_loaded()
return prompt_hash in self._hashes
def contains_any(self, prompt_hashes: List[str]) -> Dict[str, bool]:
"""Batch lookup. Returns {hash: True/False} for each input."""
self._ensure_loaded()
return {h: h in self._hashes for h in prompt_hashes}
def add_hashes(self, hashes: List[str]) -> int:
"""Append new prompt hashes to the index. Returns count added."""
self._ensure_loaded()
before = len(self._hashes)
self._hashes.update(hashes)
added = len(self._hashes) - before
if added > 0:
self._save()
logger.debug(f"Added {added} new hashes to dedup index")
return added
def register_file(self, filename: str) -> None:
"""Mark a JSONL file as indexed (prevents re-scanning)."""
self._ensure_loaded()
self._indexed_files.add(filename)
self._save()
def add_hashes_and_register(self, hashes: List[str], filename: str) -> int:
"""Atomic: append hashes + register file in one save."""
self._ensure_loaded()
before = len(self._hashes)
self._hashes.update(hashes)
self._indexed_files.add(filename)
added = len(self._hashes) - before
self._save()
return added
def rebuild(self) -> Dict[str, int]:
"""Full rebuild: scan ALL JSONL files in output_dir from scratch.
Returns stats dict with counts.
"""
logger.info(f"Rebuilding dedup index from {self.output_dir}")
self._hashes.clear()
self._indexed_files.clear()
self._created_at = datetime.now(timezone.utc).isoformat()
files_scanned = 0
prompts_indexed = 0
all_jsonl = self._discover_jsonl_files()
for path in sorted(all_jsonl):
file_hashes = self._extract_hashes_from_file(path)
self._hashes.update(file_hashes)
self._indexed_files.add(path.name)
files_scanned += 1
prompts_indexed += len(file_hashes)
self._save()
stats = {
"files_scanned": files_scanned,
"unique_prompts": len(self._hashes),
"total_prompts_seen": prompts_indexed,
}
logger.info(
f"Rebuild complete: {files_scanned} files, "
f"{len(self._hashes)} unique prompt hashes "
f"({prompts_indexed} total including dupes)"
)
return stats
@property
def size(self) -> int:
"""Number of unique prompt hashes in the index."""
self._ensure_loaded()
return len(self._hashes)
@property
def files_indexed(self) -> int:
"""Number of JSONL files tracked in the index."""
self._ensure_loaded()
return len(self._indexed_files)
def stats(self) -> Dict:
"""Return index statistics."""
self._ensure_loaded()
return {
"version": INDEX_VERSION,
"index_path": str(self.index_path),
"unique_prompts": len(self._hashes),
"files_indexed": len(self._indexed_files),
"created_at": self._created_at,
"last_updated": self._last_updated,
}
# ------------------------------------------------------------------
# Internal: load / save / sync
# ------------------------------------------------------------------
def _ensure_loaded(self) -> None:
"""Load index if not yet loaded. Build if missing."""
if self._loaded:
return
if self.index_path.exists():
self._load()
# Check for un-indexed files and ingest them
self._sync_incremental()
else:
# No index exists — build from scratch
if self.output_dir.exists():
self.rebuild()
else:
# Empty dir, nothing to index
self._created_at = datetime.now(timezone.utc).isoformat()
self._loaded = True
self._save()
def _load(self) -> None:
"""Load index from disk."""
try:
with open(self.index_path, "r") as f:
data = json.load(f)
version = data.get("version", 1)
if version < INDEX_VERSION:
logger.info(f"Index version {version} < {INDEX_VERSION}, rebuilding")
self.rebuild()
return
self._hashes = set(data.get("prompt_hashes", []))
self._indexed_files = set(data.get("indexed_files", []))
self._created_at = data.get("created_at")
self._last_updated = data.get("last_updated")
self._loaded = True
logger.info(
f"Loaded dedup index: {len(self._hashes)} hashes, "
f"{len(self._indexed_files)} files"
)
except (json.JSONDecodeError, KeyError, TypeError) as e:
logger.warning(f"Corrupt dedup index, rebuilding: {e}")
self.rebuild()
def _save(self) -> None:
"""Persist index to disk."""
self.output_dir.mkdir(parents=True, exist_ok=True)
self._last_updated = datetime.now(timezone.utc).isoformat()
data = {
"version": INDEX_VERSION,
"created_at": self._created_at or self._last_updated,
"last_updated": self._last_updated,
"indexed_files": sorted(self._indexed_files),
"prompt_hashes": sorted(self._hashes),
"stats": {
"total_prompts": len(self._hashes),
"total_files": len(self._indexed_files),
},
}
# Atomic write: write to temp then rename
tmp_path = self.index_path.with_suffix(".tmp")
with open(tmp_path, "w") as f:
json.dump(data, f, indent=2)
tmp_path.rename(self.index_path)
def _sync_incremental(self) -> None:
"""Find JSONL files on disk not in the index and ingest them."""
on_disk = self._discover_jsonl_files()
unindexed = [p for p in on_disk if p.name not in self._indexed_files]
if not unindexed:
self._loaded = True
return
logger.info(f"Incremental sync: {len(unindexed)} new files to index")
new_hashes = 0
for path in sorted(unindexed):
file_hashes = self._extract_hashes_from_file(path)
self._hashes.update(file_hashes)
self._indexed_files.add(path.name)
new_hashes += len(file_hashes)
self._loaded = True
self._save()
logger.info(
f"Incremental sync complete: +{len(unindexed)} files, "
f"+{new_hashes} prompt hashes (total: {len(self._hashes)})"
)
def _discover_jsonl_files(self) -> List[Path]:
"""Find all JSONL training data files in output_dir."""
if not self.output_dir.exists():
return []
files = []
for pattern in JSONL_PATTERNS:
files.extend(self.output_dir.glob(pattern))
return sorted(set(files))
@staticmethod
def _extract_hashes_from_file(path: Path) -> List[str]:
"""Extract prompt hashes from a single JSONL file."""
hashes = []
try:
with open(path) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
pair = json.loads(line)
prompt = pair.get("prompt", "")
if prompt:
normalized = " ".join(prompt.lower().split())
h = hashlib.sha256(normalized.encode()).hexdigest()[:16]
hashes.append(h)
except json.JSONDecodeError:
continue
except Exception as e:
logger.warning(f"Failed to read {path}: {e}")
return hashes
@staticmethod
def hash_prompt(prompt: str) -> str:
"""Compute the canonical prompt hash (same algorithm as validator)."""
normalized = " ".join(prompt.lower().split())
return hashlib.sha256(normalized.encode()).hexdigest()[:16]
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
import argparse
parser = argparse.ArgumentParser(
description="DPO dedup index management"
)
parser.add_argument(
"output_dir", type=Path,
help="Path to DPO pairs directory"
)
parser.add_argument(
"--rebuild", action="store_true",
help="Force full rebuild from all JSONL files"
)
parser.add_argument(
"--stats", action="store_true",
help="Print index statistics"
)
parser.add_argument(
"--json", action="store_true",
help="Output as JSON"
)
args = parser.parse_args()
if not args.output_dir.exists():
print(f"Error: directory not found: {args.output_dir}")
return 1
idx = DedupIndex(args.output_dir, auto_load=not args.rebuild)
if args.rebuild:
result = idx.rebuild()
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"Rebuilt index: {result['files_scanned']} files, "
f"{result['unique_prompts']} unique prompts")
s = idx.stats()
if args.json:
print(json.dumps(s, indent=2))
else:
print("=" * 50)
print(" DPO DEDUP INDEX")
print("=" * 50)
print(f" Path: {s['index_path']}")
print(f" Unique prompts: {s['unique_prompts']}")
print(f" Files indexed: {s['files_indexed']}")
print(f" Created: {s['created_at']}")
print(f" Last updated: {s['last_updated']}")
print("=" * 50)
return 0
if __name__ == "__main__":
exit(main())