From cc215e3ed71a062cf85331a02b6b3ed8a3c900eb Mon Sep 17 00:00:00 2001 From: Timmy Date: Tue, 21 Apr 2026 07:58:09 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20knowledge=20deduplication=20=E2=80=94?= =?UTF-8?q?=20content=20hash=20+=20token=20similarity=20(#196)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Dedup module for knowledge entries with: - SHA256 content hashing for exact duplicates - Token Jaccard similarity for near-duplicates (default 0.95) - Quality-based merge: keeps higher confidence/source_count - Metadata merging: tags, related, source_count - Dry-run mode - 30 tests passing - Built-in --test mode with generated duplicates Usage: python scripts/dedup.py --input knowledge/index.json python scripts/dedup.py --input knowledge/index.json --dry-run python scripts/dedup.py --test Closes #196. --- scripts/dedup.py | 317 ++++++++++++++++++++++++++++++++++++++++++++ tests/test_dedup.py | 207 +++++++++++++++++++++++++++++ 2 files changed, 524 insertions(+) create mode 100644 scripts/dedup.py create mode 100644 tests/test_dedup.py diff --git a/scripts/dedup.py b/scripts/dedup.py new file mode 100644 index 0000000..9ae49fe --- /dev/null +++ b/scripts/dedup.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python3 +""" +dedup.py — Knowledge deduplication: content hash + semantic similarity. + +Deduplicates harvested knowledge entries to avoid training on duplicates. +Uses content hashing for exact matches and token overlap for near-duplicates. + +Usage: + python3 dedup.py --input knowledge/index.json --output knowledge/index_deduped.json + python3 dedup.py --input knowledge/index.json --dry-run + python3 dedup.py --test # Run built-in dedup test +""" + +import argparse +import hashlib +import json +import re +import sys +from pathlib import Path +from typing import List, Dict, Optional, Tuple + + +def normalize_text(text: str) -> str: + """Normalize text for hashing: lowercase, collapse whitespace, strip.""" + text = text.lower().strip() + text = re.sub(r'\s+', ' ', text) + return text + + +def content_hash(text: str) -> str: + """SHA256 hash of normalized text for exact dedup.""" + normalized = normalize_text(text) + return hashlib.sha256(normalized.encode('utf-8')).hexdigest() + + +def tokenize(text: str) -> set: + """Simple tokenizer: lowercase words, 3+ chars.""" + words = re.findall(r'[a-z0-9_]{3,}', text.lower()) + return set(words) + + +def token_similarity(a: str, b: str) -> float: + """Token-based Jaccard similarity (0.0-1.0). + + Fast local alternative to embedding similarity. + Good enough for near-duplicate detection. + """ + tokens_a = tokenize(a) + tokens_b = tokenize(b) + if not tokens_a or not tokens_b: + return 0.0 + intersection = tokens_a & tokens_b + union = tokens_a | tokens_b + return len(intersection) / len(union) + + +def quality_score(fact: dict) -> float: + """Compute quality score for merge ranking. + + Higher is better. Factors: + - confidence (0-1) + - source_count (more confirmations = better) + - has tags (richer metadata) + """ + confidence = fact.get('confidence', 0.5) + source_count = fact.get('source_count', 1) + has_tags = 1.0 if fact.get('tags') else 0.0 + has_related = 1.0 if fact.get('related') else 0.0 + + # Weighted composite + score = ( + confidence * 0.5 + + min(source_count / 10, 1.0) * 0.3 + + has_tags * 0.1 + + has_related * 0.1 + ) + return round(score, 4) + + +def merge_facts(keep: dict, drop: dict) -> dict: + """Merge two near-duplicate facts, keeping higher-quality fields. + + The 'keep' fact is enriched with metadata from 'drop'. + """ + # Merge tags (union) + keep_tags = set(keep.get('tags', [])) + drop_tags = set(drop.get('tags', [])) + keep['tags'] = sorted(keep_tags | drop_tags) + + # Merge related (union) + keep_related = set(keep.get('related', [])) + drop_related = set(drop.get('related', [])) + keep['related'] = sorted(keep_related | drop_related) + + # Update source_count (sum) + keep['source_count'] = keep.get('source_count', 1) + drop.get('source_count', 1) + + # Update confidence (max — we've now seen it from multiple sources) + keep['confidence'] = max(keep.get('confidence', 0), drop.get('confidence', 0)) + + # Track that we merged + if '_merged_from' not in keep: + keep['_merged_from'] = [] + keep['_merged_from'].append(drop.get('id', 'unknown')) + + return keep + + +def dedup_facts( + facts: List[dict], + exact_threshold: float = 1.0, + near_threshold: float = 0.95, + dry_run: bool = False, +) -> Tuple[List[dict], dict]: + """Deduplicate a list of knowledge facts. + + Args: + facts: List of fact dicts (from index.json) + exact_threshold: Hash match = exact duplicate + near_threshold: Token similarity above this = near-duplicate + dry_run: If True, don't modify, just report + + Returns: + (deduped_facts, stats_dict) + """ + if not facts: + return [], {"total": 0, "exact_dupes": 0, "near_dupes": 0, "unique": 0} + + # Phase 1: Exact dedup by content hash + hash_seen = {} # hash -> index in deduped list + exact_dupes = 0 + deduped = [] + + for fact in facts: + text = fact.get('fact', '') + h = content_hash(text) + + if h in hash_seen: + # Exact duplicate — merge metadata into existing + existing_idx = hash_seen[h] + if not dry_run: + deduped[existing_idx] = merge_facts(deduped[existing_idx], fact) + exact_dupes += 1 + else: + hash_seen[h] = len(deduped) + deduped.append(fact) + + # Phase 2: Near-dup by token similarity + near_dupes = 0 + i = 0 + while i < len(deduped): + j = i + 1 + while j < len(deduped): + sim = token_similarity(deduped[i].get('fact', ''), deduped[j].get('fact', '')) + if sim >= near_threshold: + # Near-duplicate — keep higher quality + q_i = quality_score(deduped[i]) + q_j = quality_score(deduped[j]) + if q_i >= q_j: + if not dry_run: + deduped[i] = merge_facts(deduped[i], deduped[j]) + deduped.pop(j) + else: + # j is higher quality — merge i into j, then remove i + if not dry_run: + deduped[j] = merge_facts(deduped[j], deduped[i]) + deduped.pop(i) + break # i changed, restart inner loop + near_dupes += 1 + else: + j += 1 + i += 1 + + stats = { + "total": len(facts), + "exact_dupes": exact_dupes, + "near_dupes": near_dupes, + "unique": len(deduped), + "removed": len(facts) - len(deduped), + } + + return deduped, stats + + +def dedup_index_file( + input_path: str, + output_path: Optional[str] = None, + near_threshold: float = 0.95, + dry_run: bool = False, +) -> dict: + """Deduplicate an index.json file. + + Args: + input_path: Path to index.json + output_path: Where to write deduped file (default: overwrite input) + near_threshold: Token similarity threshold for near-dupes + dry_run: Report only, don't write + + Returns stats dict. + """ + path = Path(input_path) + if not path.exists(): + raise FileNotFoundError(f"Index file not found: {input_path}") + + with open(path) as f: + data = json.load(f) + + facts = data.get('facts', []) + deduped, stats = dedup_facts(facts, near_threshold=near_threshold, dry_run=dry_run) + + if not dry_run: + data['facts'] = deduped + data['total_facts'] = len(deduped) + data['last_dedup'] = __import__('datetime').datetime.now( + __import__('datetime').timezone.utc + ).isoformat() + + out_path = Path(output_path) if output_path else path + with open(out_path, 'w') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + return stats + + +def generate_test_duplicates(n: int = 20) -> List[dict]: + """Generate test facts with intentional duplicates for testing. + + Creates n unique facts plus n/4 exact dupes and n/4 near-dupes. + """ + import random + random.seed(42) + + unique_facts = [] + for i in range(n): + topic = random.choice(["git", "python", "docker", "rust", "nginx"]) + tip = random.choice(["use verbose flags", "check logs first", "restart service", "clear cache", "update config"]) + unique_facts.append({ + "id": f"test:fact:{i:03d}", + "fact": f"When working with {topic}, always {tip} before deploying.", + "category": "fact", + "domain": "test", + "confidence": round(random.uniform(0.5, 1.0), 2), + "source_count": random.randint(1, 5), + "tags": [topic, "test"], + }) + + # Add exact duplicates (same text, different IDs) + duped = list(unique_facts) + for i in range(n // 4): + original = unique_facts[i] + dupe = dict(original) + dupe["id"] = f"test:fact:dup{i:03d}" + dupe["confidence"] = round(random.uniform(0.3, 0.8), 2) + duped.append(dupe) + + # Add near-duplicates (slightly different phrasing) + for i in range(n // 4): + original = unique_facts[i] + near = dict(original) + near["id"] = f"test:fact:near{i:03d}" + near["fact"] = original["fact"].replace("always", "should").replace("before deploying", "prior to deployment") + near["confidence"] = round(random.uniform(0.4, 0.9), 2) + duped.append(near) + + return duped + + +def main(): + parser = argparse.ArgumentParser(description="Knowledge deduplication") + parser.add_argument("--input", help="Path to index.json") + parser.add_argument("--output", help="Output path (default: overwrite input)") + parser.add_argument("--threshold", type=float, default=0.95, + help="Near-dup similarity threshold (default: 0.95)") + parser.add_argument("--dry-run", action="store_true", help="Report only, don't write") + parser.add_argument("--test", action="store_true", help="Run built-in dedup test") + parser.add_argument("--json", action="store_true", help="JSON output") + args = parser.parse_args() + + if args.test: + test_facts = generate_test_duplicates(20) + print(f"Generated {len(test_facts)} test facts (20 unique + dupes)") + deduped, stats = dedup_facts(test_facts, near_threshold=args.threshold) + print(f"\nDedup results:") + print(f" Total input: {stats['total']}") + print(f" Exact dupes: {stats['exact_dupes']}") + print(f" Near dupes: {stats['near_dupes']}") + print(f" Unique output: {stats['unique']}") + print(f" Removed: {stats['removed']}") + + # Verify: should have ~20 unique (some merged) + assert stats['unique'] <= 20, f"Too many unique: {stats['unique']} > 20" + assert stats['unique'] >= 15, f"Too few unique: {stats['unique']} < 15" + assert stats['removed'] > 0, "No duplicates removed" + print("\nOK: Dedup test passed") + return + + if not args.input: + print("ERROR: Provide --input or --test") + sys.exit(1) + + stats = dedup_index_file(args.input, args.output, args.threshold, args.dry_run) + + if args.json: + print(json.dumps(stats, indent=2)) + else: + print(f"Dedup results:") + print(f" Total input: {stats['total']}") + print(f" Exact dupes: {stats['exact_dupes']}") + print(f" Near dupes: {stats['near_dupes']}") + print(f" Unique output: {stats['unique']}") + print(f" Removed: {stats['removed']}") + if args.dry_run: + print(" (dry run — no changes written)") + + +if __name__ == "__main__": + main() diff --git a/tests/test_dedup.py b/tests/test_dedup.py new file mode 100644 index 0000000..cabb49f --- /dev/null +++ b/tests/test_dedup.py @@ -0,0 +1,207 @@ +"""Tests for knowledge deduplication module (Issue #196).""" + +import json +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).parent.parent / "scripts")) + +from dedup import ( + normalize_text, + content_hash, + tokenize, + token_similarity, + quality_score, + merge_facts, + dedup_facts, + generate_test_duplicates, +) + + +class TestNormalize: + def test_lowercases(self): + assert normalize_text("Hello World") == "hello world" + + def test_collapses_whitespace(self): + assert normalize_text(" hello world ") == "hello world" + + def test_strips(self): + assert normalize_text(" text ") == "text" + + +class TestContentHash: + def test_deterministic(self): + h1 = content_hash("Hello World") + h2 = content_hash("hello world") + h3 = content_hash(" Hello World ") + assert h1 == h2 == h3 + + def test_different_texts(self): + h1 = content_hash("Hello") + h2 = content_hash("World") + assert h1 != h2 + + def test_returns_hex(self): + h = content_hash("test") + assert len(h) == 64 # SHA256 + assert all(c in '0123456789abcdef' for c in h) + + +class TestTokenize: + def test_extracts_words(self): + tokens = tokenize("Hello World Test") + assert "hello" in tokens + assert "world" in tokens + assert "test" in tokens + + def test_skips_short_words(self): + tokens = tokenize("a to is the hello") + assert "a" not in tokens + assert "to" not in tokens + assert "hello" in tokens + + def test_returns_set(self): + tokens = tokenize("hello hello world") + assert isinstance(tokens, set) + assert len(tokens) == 2 + + +class TestTokenSimilarity: + def test_identical(self): + assert token_similarity("hello world", "hello world") == 1.0 + + def test_no_overlap(self): + assert token_similarity("alpha beta", "gamma delta") == 0.0 + + def test_partial_overlap(self): + sim = token_similarity("hello world test", "hello universe test") + assert 0.3 < sim < 0.7 + + def test_empty(self): + assert token_similarity("", "hello") == 0.0 + assert token_similarity("hello", "") == 0.0 + + def test_symmetric(self): + a = "hello world test" + b = "hello universe test" + assert token_similarity(a, b) == token_similarity(b, a) + + +class TestQualityScore: + def test_high_confidence(self): + fact = {"confidence": 0.95, "source_count": 5, "tags": ["test"], "related": ["x"]} + score = quality_score(fact) + assert score > 0.7 + + def test_low_confidence(self): + fact = {"confidence": 0.3, "source_count": 1} + score = quality_score(fact) + assert score < 0.5 + + def test_defaults(self): + score = quality_score({}) + assert 0 < score < 1 + + +class TestMergeFacts: + def test_merges_tags(self): + keep = {"id": "a", "fact": "test", "tags": ["git"], "confidence": 0.9} + drop = {"id": "b", "fact": "test", "tags": ["python"], "confidence": 0.8} + merged = merge_facts(keep, drop) + assert "git" in merged["tags"] + assert "python" in merged["tags"] + + def test_merges_source_count(self): + keep = {"id": "a", "fact": "test", "source_count": 3} + drop = {"id": "b", "fact": "test", "source_count": 2} + merged = merge_facts(keep, drop) + assert merged["source_count"] == 5 + + def test_keeps_higher_confidence(self): + keep = {"id": "a", "fact": "test", "confidence": 0.7} + drop = {"id": "b", "fact": "test", "confidence": 0.9} + merged = merge_facts(keep, drop) + assert merged["confidence"] == 0.9 + + def test_tracks_merged_from(self): + keep = {"id": "a", "fact": "test"} + drop = {"id": "b", "fact": "test"} + merged = merge_facts(keep, drop) + assert "b" in merged["_merged_from"] + + +class TestDedupFacts: + def test_removes_exact_dupes(self): + facts = [ + {"id": "1", "fact": "Always use git rebase"}, + {"id": "2", "fact": "Always use git rebase"}, # exact dupe + {"id": "3", "fact": "Check logs first"}, + ] + deduped, stats = dedup_facts(facts) + assert stats["exact_dupes"] == 1 + assert stats["unique"] == 2 + + def test_removes_near_dupes(self): + facts = [ + {"id": "1", "fact": "Always check logs before deploying to production server"}, + {"id": "2", "fact": "Always check logs before deploying to production environment"}, + {"id": "3", "fact": "Use docker compose for local development environments"}, + ] + deduped, stats = dedup_facts(facts, near_threshold=0.5) + assert stats["near_dupes"] >= 1 + assert stats["unique"] == 2 + + def test_preserves_unique(self): + facts = [ + {"id": "1", "fact": "Use git rebase for clean history"}, + {"id": "2", "fact": "Docker containers should be stateless"}, + {"id": "3", "fact": "Always write tests before code"}, + ] + deduped, stats = dedup_facts(facts) + assert stats["unique"] == 3 + assert stats["removed"] == 0 + + def test_empty_input(self): + deduped, stats = dedup_facts([]) + assert stats["total"] == 0 + assert stats["unique"] == 0 + + def test_keeps_higher_quality_near_dup(self): + facts = [ + {"id": "1", "fact": "Check logs before deploying to production server", "confidence": 0.5, "source_count": 1}, + {"id": "2", "fact": "Check logs before deploying to production environment", "confidence": 0.9, "source_count": 5, "tags": ["ops"]}, + ] + deduped, stats = dedup_facts(facts, near_threshold=0.5) + assert stats["unique"] == 1 + # Higher quality fact should be kept + assert deduped[0]["confidence"] == 0.9 + + def test_dry_run_does_not_modify(self): + facts = [ + {"id": "1", "fact": "Same text"}, + {"id": "2", "fact": "Same text"}, + ] + deduped, stats = dedup_facts(facts, dry_run=True) + assert stats["exact_dupes"] == 1 + # In dry_run, merge_facts is skipped so facts aren't modified + assert len(deduped) == 1 + + +class TestGenerateTestDuplicates: + def test_generates_correct_count(self): + facts = generate_test_duplicates(20) + assert len(facts) > 20 # 20 unique + duplicates + + def test_has_exact_dupes(self): + facts = generate_test_duplicates(20) + hashes = [content_hash(f["fact"]) for f in facts] + # Should have some duplicate hashes + assert len(hashes) != len(set(hashes)) + + def test_dedup_removes_dupes(self): + facts = generate_test_duplicates(20) + deduped, stats = dedup_facts(facts) + assert stats["unique"] <= 20 + assert stats["removed"] > 0