#!/usr/bin/env python3 """ dedup.py — Knowledge deduplication: content hash + semantic similarity. Deduplicates harvested knowledge entries to avoid training on duplicates. Uses content hashing for exact matches and token overlap for near-duplicates. Usage: python3 dedup.py --input knowledge/index.json --output knowledge/index_deduped.json python3 dedup.py --input knowledge/index.json --dry-run python3 dedup.py --test # Run built-in dedup test """ import argparse import hashlib import json import re import sys from pathlib import Path from typing import List, Dict, Optional, Tuple def normalize_text(text: str) -> str: """Normalize text for hashing: lowercase, collapse whitespace, strip.""" text = text.lower().strip() text = re.sub(r'\s+', ' ', text) return text def content_hash(text: str) -> str: """SHA256 hash of normalized text for exact dedup.""" normalized = normalize_text(text) return hashlib.sha256(normalized.encode('utf-8')).hexdigest() def tokenize(text: str) -> set: """Simple tokenizer: lowercase words, 3+ chars.""" words = re.findall(r'[a-z0-9_]{3,}', text.lower()) return set(words) def token_similarity(a: str, b: str) -> float: """Token-based Jaccard similarity (0.0-1.0). Fast local alternative to embedding similarity. Good enough for near-duplicate detection. """ tokens_a = tokenize(a) tokens_b = tokenize(b) if not tokens_a or not tokens_b: return 0.0 intersection = tokens_a & tokens_b union = tokens_a | tokens_b return len(intersection) / len(union) def quality_score(fact: dict) -> float: """Compute quality score for merge ranking. Higher is better. Factors: - confidence (0-1) - source_count (more confirmations = better) - has tags (richer metadata) """ confidence = fact.get('confidence', 0.5) source_count = fact.get('source_count', 1) has_tags = 1.0 if fact.get('tags') else 0.0 has_related = 1.0 if fact.get('related') else 0.0 # Weighted composite score = ( confidence * 0.5 + min(source_count / 10, 1.0) * 0.3 + has_tags * 0.1 + has_related * 0.1 ) return round(score, 4) def merge_facts(keep: dict, drop: dict) -> dict: """Merge two near-duplicate facts, keeping higher-quality fields. The 'keep' fact is enriched with metadata from 'drop'. """ # Merge tags (union) keep_tags = set(keep.get('tags', [])) drop_tags = set(drop.get('tags', [])) keep['tags'] = sorted(keep_tags | drop_tags) # Merge related (union) keep_related = set(keep.get('related', [])) drop_related = set(drop.get('related', [])) keep['related'] = sorted(keep_related | drop_related) # Update source_count (sum) keep['source_count'] = keep.get('source_count', 1) + drop.get('source_count', 1) # Update confidence (max — we've now seen it from multiple sources) keep['confidence'] = max(keep.get('confidence', 0), drop.get('confidence', 0)) # Track that we merged if '_merged_from' not in keep: keep['_merged_from'] = [] keep['_merged_from'].append(drop.get('id', 'unknown')) return keep def dedup_facts( facts: List[dict], exact_threshold: float = 1.0, near_threshold: float = 0.95, dry_run: bool = False, ) -> Tuple[List[dict], dict]: """Deduplicate a list of knowledge facts. Args: facts: List of fact dicts (from index.json) exact_threshold: Hash match = exact duplicate near_threshold: Token similarity above this = near-duplicate dry_run: If True, don't modify, just report Returns: (deduped_facts, stats_dict) """ if not facts: return [], {"total": 0, "exact_dupes": 0, "near_dupes": 0, "unique": 0} # Phase 1: Exact dedup by content hash hash_seen = {} # hash -> index in deduped list exact_dupes = 0 deduped = [] for fact in facts: text = fact.get('fact', '') h = content_hash(text) if h in hash_seen: # Exact duplicate — merge metadata into existing existing_idx = hash_seen[h] if not dry_run: deduped[existing_idx] = merge_facts(deduped[existing_idx], fact) exact_dupes += 1 else: hash_seen[h] = len(deduped) deduped.append(fact) # Phase 2: Near-dup by token similarity near_dupes = 0 i = 0 while i < len(deduped): j = i + 1 while j < len(deduped): sim = token_similarity(deduped[i].get('fact', ''), deduped[j].get('fact', '')) if sim >= near_threshold: # Near-duplicate — keep higher quality q_i = quality_score(deduped[i]) q_j = quality_score(deduped[j]) if q_i >= q_j: if not dry_run: deduped[i] = merge_facts(deduped[i], deduped[j]) deduped.pop(j) else: # j is higher quality — merge i into j, then remove i if not dry_run: deduped[j] = merge_facts(deduped[j], deduped[i]) deduped.pop(i) break # i changed, restart inner loop near_dupes += 1 else: j += 1 i += 1 stats = { "total": len(facts), "exact_dupes": exact_dupes, "near_dupes": near_dupes, "unique": len(deduped), "removed": len(facts) - len(deduped), } return deduped, stats def dedup_index_file( input_path: str, output_path: Optional[str] = None, near_threshold: float = 0.95, dry_run: bool = False, ) -> dict: """Deduplicate an index.json file. Args: input_path: Path to index.json output_path: Where to write deduped file (default: overwrite input) near_threshold: Token similarity threshold for near-dupes dry_run: Report only, don't write Returns stats dict. """ path = Path(input_path) if not path.exists(): raise FileNotFoundError(f"Index file not found: {input_path}") with open(path) as f: data = json.load(f) facts = data.get('facts', []) deduped, stats = dedup_facts(facts, near_threshold=near_threshold, dry_run=dry_run) if not dry_run: data['facts'] = deduped data['total_facts'] = len(deduped) data['last_dedup'] = __import__('datetime').datetime.now( __import__('datetime').timezone.utc ).isoformat() out_path = Path(output_path) if output_path else path with open(out_path, 'w') as f: json.dump(data, f, indent=2, ensure_ascii=False) return stats def generate_test_duplicates(n: int = 20) -> List[dict]: """Generate test facts with intentional duplicates for testing. Creates n unique facts plus n/4 exact dupes and n/4 near-dupes. """ import random random.seed(42) unique_facts = [] for i in range(n): topic = random.choice(["git", "python", "docker", "rust", "nginx"]) tip = random.choice(["use verbose flags", "check logs first", "restart service", "clear cache", "update config"]) unique_facts.append({ "id": f"test:fact:{i:03d}", "fact": f"When working with {topic}, always {tip} before deploying.", "category": "fact", "domain": "test", "confidence": round(random.uniform(0.5, 1.0), 2), "source_count": random.randint(1, 5), "tags": [topic, "test"], }) # Add exact duplicates (same text, different IDs) duped = list(unique_facts) for i in range(n // 4): original = unique_facts[i] dupe = dict(original) dupe["id"] = f"test:fact:dup{i:03d}" dupe["confidence"] = round(random.uniform(0.3, 0.8), 2) duped.append(dupe) # Add near-duplicates (slightly different phrasing) for i in range(n // 4): original = unique_facts[i] near = dict(original) near["id"] = f"test:fact:near{i:03d}" near["fact"] = original["fact"].replace("always", "should").replace("before deploying", "prior to deployment") near["confidence"] = round(random.uniform(0.4, 0.9), 2) duped.append(near) return duped def main(): parser = argparse.ArgumentParser(description="Knowledge deduplication") parser.add_argument("--input", help="Path to index.json") parser.add_argument("--output", help="Output path (default: overwrite input)") parser.add_argument("--threshold", type=float, default=0.95, help="Near-dup similarity threshold (default: 0.95)") parser.add_argument("--dry-run", action="store_true", help="Report only, don't write") parser.add_argument("--test", action="store_true", help="Run built-in dedup test") parser.add_argument("--json", action="store_true", help="JSON output") args = parser.parse_args() if args.test: test_facts = generate_test_duplicates(20) print(f"Generated {len(test_facts)} test facts (20 unique + dupes)") deduped, stats = dedup_facts(test_facts, near_threshold=args.threshold) print(f"\nDedup results:") print(f" Total input: {stats['total']}") print(f" Exact dupes: {stats['exact_dupes']}") print(f" Near dupes: {stats['near_dupes']}") print(f" Unique output: {stats['unique']}") print(f" Removed: {stats['removed']}") # Verify: should have ~20 unique (some merged) assert stats['unique'] <= 20, f"Too many unique: {stats['unique']} > 20" assert stats['unique'] >= 15, f"Too few unique: {stats['unique']} < 15" assert stats['removed'] > 0, "No duplicates removed" print("\nOK: Dedup test passed") return if not args.input: print("ERROR: Provide --input or --test") sys.exit(1) stats = dedup_index_file(args.input, args.output, args.threshold, args.dry_run) if args.json: print(json.dumps(stats, indent=2)) else: print(f"Dedup results:") print(f" Total input: {stats['total']}") print(f" Exact dupes: {stats['exact_dupes']}") print(f" Near dupes: {stats['near_dupes']}") print(f" Unique output: {stats['unique']}") print(f" Removed: {stats['removed']}") if args.dry_run: print(" (dry run — no changes written)") if __name__ == "__main__": main()