From 8c5b82e214af674ff1bb0a3fc75f7a4de52f572b Mon Sep 17 00:00:00 2001 From: Merge Bot Date: Thu, 16 Apr 2026 04:58:55 +0000 Subject: [PATCH] Merge PR #773: scripts/hash_dedup.py (added) --- scripts/hash_dedup.py | 264 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 264 insertions(+) create mode 100644 scripts/hash_dedup.py diff --git a/scripts/hash_dedup.py b/scripts/hash_dedup.py new file mode 100644 index 00000000..42e09176 --- /dev/null +++ b/scripts/hash_dedup.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python3 +""" +hash_dedup.py — Deduplication with bounded hash storage. + +Stores SHA-256 hashes of validated content in date-stamped files. +Rotates daily. Keeps only last N days. Prevents unbounded growth. + +Usage: + from hash_dedup import HashDedup + + dedup = HashDedup("/path/to/.hashes") + if dedup.is_duplicate("some content"): + print("Already seen") + else: + dedup.add("some content") + print("New content") + + # Cleanup old hashes + dedup.cleanup(keep_days=7) +""" + +import hashlib +import json +import os +from datetime import datetime, timedelta +from pathlib import Path +from typing import Set, Optional + + +class HashDedup: + """ + Bounded hash-based deduplication with daily rotation. + + Storage layout: + .hashes/ + 2026-04-15.json (one file per day) + 2026-04-14.json + ... + """ + + def __init__(self, storage_dir: str, max_hashes_per_file: int = 100000): + self.storage_dir = Path(storage_dir) + self.max_hashes_per_file = max_hashes_per_file + self.storage_dir.mkdir(parents=True, exist_ok=True) + self._today_hashes: Optional[Set[str]] = None + self._today_file: Optional[Path] = None + + def _today(self) -> str: + """Current date string.""" + return datetime.utcnow().strftime("%Y-%m-%d") + + def _date_file(self, date_str: str) -> Path: + """Path to hash file for a given date.""" + return self.storage_dir / f"{date_str}.json" + + @property + def today_file(self) -> Path: + if self._today_file is None: + self._today_file = self._date_file(self._today()) + return self._today_file + + def _load_today(self) -> Set[str]: + """Load today's hashes from disk.""" + if self._today_hashes is not None: + return self._today_hashes + + path = self.today_file + if path.exists(): + try: + with open(path) as f: + data = json.load(f) + self._today_hashes = set(data.get("hashes", [])) + except (json.JSONDecodeError, IOError): + self._today_hashes = set() + else: + self._today_hashes = set() + return self._today_hashes + + def _save_today(self): + """Save today's hashes to disk.""" + hashes = self._load_today() + path = self.today_file + + # Enforce max size + if len(hashes) > self.max_hashes_per_file: + hashes = set(list(hashes)[:self.max_hashes_per_file]) + + data = { + "date": self._today(), + "count": len(hashes), + "hashes": sorted(hashes), + } + with open(path, 'w') as f: + json.dump(data, f) + + @staticmethod + def compute_hash(content: str) -> str: + """Compute SHA-256 hex digest of content.""" + return hashlib.sha256(content.encode('utf-8')).hexdigest() + + def is_duplicate(self, content: str) -> bool: + """Check if content hash exists in today's file or recent files.""" + h = self.compute_hash(content) + + # Check today + if h in self._load_today(): + return True + + return False + + def is_duplicate_any(self, content: str, lookback_days: int = 7) -> bool: + """Check if content hash exists in any file within lookback period.""" + h = self.compute_hash(content) + + # Check today first + if h in self._load_today(): + return True + + # Check recent files + for i in range(1, lookback_days + 1): + date_str = (datetime.utcnow() - timedelta(days=i)).strftime("%Y-%m-%d") + path = self._date_file(date_str) + if path.exists(): + try: + with open(path) as f: + data = json.load(f) + if h in set(data.get("hashes", [])): + return True + except (json.JSONDecodeError, IOError): + continue + + return False + + def add(self, content: str) -> bool: + """Add content hash. Returns True if added (was new), False if duplicate.""" + h = self.compute_hash(content) + hashes = self._load_today() + + if h in hashes: + return False + + hashes.add(h) + self._save_today() + return True + + def add_batch(self, contents: list) -> int: + """Add multiple content hashes. Returns count of new hashes added.""" + hashes = self._load_today() + new_count = 0 + + for content in contents: + h = self.compute_hash(content) + if h not in hashes: + hashes.add(h) + new_count += 1 + + if new_count > 0: + self._save_today() + + return new_count + + def cleanup(self, keep_days: int = 7) -> int: + """ + Remove hash files older than keep_days. + + Returns count of files removed. + """ + removed = 0 + cutoff = datetime.utcnow() - timedelta(days=keep_days) + + for path in self.storage_dir.glob("*.json"): + try: + date_str = path.stem + file_date = datetime.strptime(date_str, "%Y-%m-%d") + if file_date < cutoff: + path.unlink() + removed += 1 + except ValueError: + # Not a date-named file, skip + continue + + return removed + + def stats(self) -> dict: + """Get dedup statistics.""" + total_hashes = 0 + file_count = 0 + oldest = None + newest = None + + for path in self.storage_dir.glob("*.json"): + try: + date_str = path.stem + file_date = datetime.strptime(date_str, "%Y-%m-%d") + file_count += 1 + + if oldest is None or file_date < oldest: + oldest = file_date + if newest is None or file_date > newest: + newest = file_date + + with open(path) as f: + data = json.load(f) + total_hashes += data.get("count", 0) + except (ValueError, json.JSONDecodeError, IOError): + continue + + return { + "file_count": file_count, + "total_hashes": total_hashes, + "oldest_file": oldest.strftime("%Y-%m-%d") if oldest else None, + "newest_file": newest.strftime("%Y-%m-%d") if newest else None, + "today_count": len(self._load_today()), + } + + +def main(): + """CLI for hash_dedup operations.""" + import argparse + + parser = argparse.ArgumentParser(description="Hash dedup with bounded storage") + parser.add_argument("--dir", default=".hashes", help="Hash storage directory") + parser.add_argument("--cleanup", type=int, metavar="DAYS", help="Remove files older than N days") + parser.add_argument("--stats", action="store_true", help="Show statistics") + parser.add_argument("--check", type=str, help="Check if content hash exists") + parser.add_argument("--add", type=str, help="Add content hash") + parser.add_argument("--file", type=str, help="Add hashes from file (one per line)") + args = parser.parse_args() + + dedup = HashDedup(args.dir) + + if args.cleanup is not None: + removed = dedup.cleanup(keep_days=args.cleanup) + print(f"Cleaned up {removed} files older than {args.cleanup} days") + + if args.stats: + stats = dedup.stats() + print(f"Hash Dedup Statistics:") + print(f" Files: {stats['file_count']}") + print(f" Total hashes: {stats['total_hashes']}") + print(f" Today: {stats['today_count']}") + print(f" Date range: {stats['oldest_file']} to {stats['newest_file']}") + + if args.check: + if dedup.is_duplicate_any(args.check): + print("DUPLICATE") + else: + print("NEW") + + if args.add: + if dedup.add(args.add): + print(f"Added: {dedup.compute_hash(args.add)}") + else: + print("Already exists") + + if args.file: + with open(args.file) as f: + lines = [l.strip() for l in f if l.strip()] + added = dedup.add_batch(lines) + print(f"Added {added}/{len(lines)} new hashes") + + +if __name__ == "__main__": + main()