hojichar.filters.deduplication
This module provides filters for deduplication using MinHash and Locality-Sensitive Hashing (LSH).
If you want to use this module, install hojichar via pip install 'hojichar[dedup]'
What is MinHash LSH?
A gentle introduction for first‑time leaner for MinHash LSH
1 What problem does this solve?
When you have millions of documents it is too expensive to compare every pair directly. MinHash + Locality‑Sensitive Hashing (LSH) lets you
- estimate Jaccard similarity extremely fast, and memory‑efficiently
- retrieve near‑duplicates in sub‑linear time.
In practice you can keep a single set
or Redis index of the generated LSH keys and ask:
"Does any existing document share at least one LSH key with mine?"
If the answer is yes the two documents are almost certainly similar; if no they are very likely different.
How the pipeline works
GenerateDedupLSH
filter generates LSH keys for each document.
- Tokenize
- Split text into tokens. (Default: character‑level, but you can plug in any callable.)
- n-grams
- Group tokens into n‑grams (n_grams=5) to capture context.
- MinHash
- Hash each n‑gram with
num_perm
independent permutations and keep only the minimum value. The resulting signature is a vector of num_perm 32‑bit integers. - This module uses
rensa.RMinHash
which is a fast MinHash implementation by Rust language.
- Hash each n‑gram with
- Banding and compression
- Split the signature into b bands, each containing
r
integers (num_perm ≈ b×r). - Treat the
r
integers as raw bytes, hash them with xxhash‑128, and format as+ .
- Split the signature into b bands, each containing
- Output
- Store all band keys in
document.extras['dedup_lsh']
as a list of strings.
- Store all band keys in
InlineDeduplicator
, RedisDeduplicator
, and RedisBloomDeduplicator
filters use the generated LSH keys to mark documents as duplicates.
InlineDeduplicator
stores LSH keys in a local set, so it works only in a single process.RedisDeduplicator
stores LSH keys in Redis, so it works in a distributed environment.RedisBloomDeduplicator
stores LSH keys in RedisBloom, which is a scalable Bloom filter. It uses less memory than Redis keys but may return false positives.
1""" 2This module provides filters for deduplication using MinHash and Locality-Sensitive Hashing (LSH). 3If you want to use this module, install hojichar via `pip install 'hojichar[dedup]'` 4 5## What is MinHash LSH? 6*A gentle introduction for first‑time leaner for MinHash LSH* 7 8--- 9 10### 1 What problem does this solve? 11 12When you have **millions of documents** it is too expensive to compare every pair directly. 13**MinHash + Locality‑Sensitive Hashing (LSH)** lets you 14 15- estimate Jaccard similarity extremely fast, and memory‑efficiently 16- retrieve **near‑duplicates** in sub‑linear time. 17 18In practice you can keep a single `set` or Redis index of the generated **LSH keys** and ask: 19"Does any existing document share *at least one* LSH key with mine?" 20 21If the answer is yes the two documents are almost certainly similar; if no they are very likely different. 22 23### How the pipeline works 24 25`GenerateDedupLSH` filter generates LSH keys for each document. 26 271. Tokenize 28 - Split text into tokens. (Default: character‑level, but you can plug in any callable.) 292. n-grams 30 - Group tokens into n‑grams (n_grams=5) to capture context. 313. MinHash 32 - Hash each n‑gram with `num_perm` independent permutations and keep only the minimum value. The resulting **signature** is a vector of num_perm 32‑bit integers. 33 - This module uses `rensa.RMinHash` which is a fast MinHash implementation by Rust language. 344. Banding and compression 35 - Split the signature into b bands, each containing `r` integers (num_perm ≈ b×r). 36 - Treat the `r` integers as raw bytes, hash them with xxhash‑128, and format as <band_idx>+<digest32hex>. 375. Output 38 - Store all band keys in `document.extras['dedup_lsh']` as a list of strings. 39 40`InlineDeduplicator`, `RedisDeduplicator`, and `RedisBloomDeduplicator` filters use the generated LSH keys to mark documents as duplicates. 41 42- `InlineDeduplicator` stores LSH keys in a local set, so it works only in a single process. 43- `RedisDeduplicator` stores LSH keys in Redis, so it works in a distributed environment. 44- `RedisBloomDeduplicator` stores LSH keys in RedisBloom, which is a scalable Bloom filter. It uses less memory than Redis keys but may return false positives. 45 46""" 47 48from __future__ import annotations 49 50import importlib 51import re 52from typing import Any, Callable, Final, Iterable, Optional 53 54import numpy as np 55import redis 56import xxhash 57from datasketch.lsh import _optimal_param # type: ignore 58from nltk.util import ngrams # type: ignore 59from numpy.typing import NDArray 60from rensa import RMinHash # type: ignore 61 62from hojichar import Document, Filter 63 64_japanese_tagger: Optional["fugashi.Tagger"] = None # type: ignore[name-defined] # noqa: F821 65NON_ALPHA = re.compile("[^A-Za-z_0-9]") 66 67 68def char_level_splitter(text: str) -> list[str]: 69 """ 70 Split the text into characters. 71 This is a simple implementation that splits the text into individual characters. 72 """ 73 return list(text) 74 75 76def non_alpha_num_splitter(text: str) -> list[str]: 77 """ 78 Split the text into alphanumeric tokens. 79 This is a simple implementation that splits on non-alphanumeric characters. 80 """ 81 return [token for token in NON_ALPHA.split(text) if token] 82 83 84def japanese_word_splitter(text: str) -> list[str]: 85 """ 86 Split the text into Japanese words using fugashi. 87 This will import fugashi and instantiate Tagger on first use. 88 """ 89 global _japanese_tagger 90 if _japanese_tagger is None: 91 fugashi = importlib.import_module("fugashi") 92 _japanese_tagger = fugashi.Tagger() 93 return [token.surface for token in _japanese_tagger(text)] 94 95 96class GenerateDedupLSH(Filter): 97 """ 98 Filter that uses MinHash + Locality-Sensitive Hashing (LSH) to assign 99 deduplication keys to documents, allowing fast near-duplicate detection. 100 101 Attributes: 102 num_perm (int): Number of permutations (hash functions) for MinHash. 103 threshold (float): Similarity threshold for tuning LSH parameters. 104 tokenizer (Callable[[str], Iterable[str]]): Function to tokenize text. 105 n_grams (int): n-gram size for token grouping. 106 seed (int): Random seed for MinHash. 107 num_bands (int): Number of LSH bands computed from threshold and num_perm. 108 band_size (int): Number of hashes per band. 109 110 Notes 111 ----- 112 `_optimal_param` searches for the optimal number of **bands** (`b`) and 113 **rows per band** (`r`) that minimise a weighted sum of false positives / 114 false negatives at the specified *threshold*. 115 """ 116 117 _BYTES_PER_U32: Final[int] = 4 118 119 def __init__( 120 self, 121 num_perm: int = 500, 122 threshold: float = 0.8, 123 tokenizer: Callable[[str], Iterable[str]] = char_level_splitter, 124 n_grams: int = 5, 125 seed: int = 42, 126 **kwargs: Any, 127 ) -> None: 128 """ 129 Initialize the deduplication filter with MinHash and LSH settings. 130 131 Args: 132 num_perm: Number of hash permutations for MinHash signature length. 133 threshold: Similarity threshold to decide optimal LSH parameters. 134 tokenizer: Function to split text into tokens. 135 n_grams: Number of tokens per n-gram for MinHash update. 136 seed: Seed for hash permutation consistency. 137 **kwargs: Additional keyword arguments for parent Filter. 138 """ 139 super().__init__(**kwargs) 140 self.num_perm = num_perm 141 self.threshold = threshold 142 self.tokenizer = tokenizer 143 self.n_grams = n_grams 144 self.seed = seed 145 146 # Compute optimal number of bands and band size based on threshold 147 self.num_bands, self.band_size = _optimal_param( 148 threshold=self.threshold, 149 num_perm=self.num_perm, 150 false_negative_weight=0.5, 151 false_positive_weight=0.5, 152 ) 153 154 def calculate_minhash_signature(self, text: str) -> NDArray[np.uint32]: 155 """ 156 Compute MinHash signature of input text as an array of uint32. 157 158 Steps: 159 1. Tokenize text using the provided tokenizer. 160 2. Generate n-gram tokens. 161 3. Update MinHash with n-gram tokens. 162 163 Args: 164 text: Input document text to be hashed. 165 166 Returns: 167 A 1D numpy array of shape (num_perm,) with dtype uint32. 168 """ 169 tokens = self.tokenizer(text) 170 n_gram_tokens = ngrams(tokens, self.n_grams) 171 # Join tokens into string n-grams for hashing 172 tokens = [" ".join(grams) for grams in n_gram_tokens] 173 # Initialize and update RMinHash 174 minhash = RMinHash(num_perm=self.num_perm, seed=self.seed) 175 minhash.update(tokens) 176 # Convert digest (list of ints) to numpy uint32 array 177 return np.asarray(minhash.digest(), dtype=np.uint32) 178 179 def signature_to_lsh_digest( 180 self, signature: NDArray[np.uint32], band_size: int, band_idx: int 181 ) -> int: 182 """ 183 Convert a slice of the MinHash signature into an LSH digest with less memory overhead. 184 185 This method is optimized for speed by avoiding copies: 186 - We view the uint32 array as raw bytes (uint8 view). 187 - We create a memoryview of the byte slice for the specified band. 188 - We compute a 128-bit hash directly on the slice. 189 190 Args: 191 signature: 1D numpy array of uint32 representing MinHash signature. 192 band_size: Number of hashes per LSH band. 193 band_idx: Index of the band to hash (0-based). 194 195 Returns: 196 An integer representing the 128-bit hash digest of the band. 197 198 Raises: 199 AssertionError: If signature shape/dtype or band index is invalid. 200 """ 201 assert signature.dtype == np.uint32 and signature.ndim == 1, ( 202 "signature must be a 1D numpy array of uint32" 203 ) 204 assert 0 <= band_idx < self.num_bands, ( 205 f"band_idx {band_idx} out of range [0, {self.num_bands})" 206 ) 207 assert len(signature) >= band_size * self.num_bands, ( 208 "signature length is too short for given band_size and num_bands" 209 ) 210 211 # Compute byte offsets for the selected band 212 start = band_idx * band_size * self._BYTES_PER_U32 213 stop = start + band_size * self._BYTES_PER_U32 214 215 # View signature as raw bytes without copy. memoryview avoids creating new bytes. 216 u8 = signature.view(np.uint8) 217 mv = memoryview(u8)[start:stop] # type: ignore[arg-type] 218 219 return xxhash.xxh128_intdigest(mv) 220 221 def _format_lsh_key(self, band_idx: int, digest: int) -> str: 222 """ 223 Format the LSH key for a given band index and digest. 224 """ 225 return f"{band_idx}+{digest:032x}" 226 227 def apply(self, document: Document) -> Document: 228 """ 229 Decorate the document with LSH deduplication keys. 230 231 For each band, compute the digest and format as a hex string: 232 '<band_idx>+<128-bit-digest-hex>'. 233 Keys are stored in document.extras['dedup_lsh']. 234 235 Args: 236 document: Document object with 'text' attribute. 237 238 Returns: 239 The same Document object with 'dedup_lsh' added in extras. 240 """ 241 signature = self.calculate_minhash_signature(document.text) 242 lsh_keys = [ 243 self._format_lsh_key( 244 band_idx, self.signature_to_lsh_digest(signature, self.band_size, band_idx) 245 ) 246 for band_idx in range(self.num_bands) 247 ] 248 249 document.extras["dedup_lsh"] = lsh_keys 250 return document 251 252 253class InlineDeduplicator(Filter): 254 """ 255 Simple in‑memory deduplicator. 256 257 Stores every LSH key in a local :pyclass:`set`. If any key of the incoming 258 document is already present, the document is marked as duplicate via 259 `document.is_rejected = True`. 260 261 **Limitations** 262 ------------- 263 *State is per‑process only.* Running multiple workers or machines will *not* 264 share the key set – use :class:`RedisDeduplicator` or 265 :class:`RedisBloomDeduplicator` for distributed setups. 266 """ 267 268 def __init__(self, **kwargs: Any): 269 super().__init__(**kwargs) 270 self.hash_pool: set[str] = set() 271 272 def apply(self, document: Document) -> Document: 273 """ 274 Inline deduplication based on the LSH keys in the document. 275 This filter cannot use in the distributed environment because it uses a local hash pool. 276 """ 277 lsh_keys = document.extras.get("dedup_lsh") 278 if lsh_keys is None: 279 raise ValueError( 280 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 281 ) 282 283 for lsh in lsh_keys: 284 if lsh in self.hash_pool: 285 document.is_rejected = True 286 else: 287 self.hash_pool.add(lsh) 288 return document 289 290 291class RedisDeduplicator(Filter): 292 """ 293 Distributed deduplicator using **plain Redis keys**. 294 You have to run a Redis server and pass its connection parameters. 295 """ 296 297 def __init__( 298 self, 299 *, 300 host: str = "localhost", 301 port: int = 6379, 302 db: int = 0, 303 key_prefix: str = "dedup", 304 **kwargs: Any, 305 ) -> None: 306 """ 307 Initialize the Redis deduplicator. 308 Args: 309 host (str): Redis server hostname. 310 port (int): Redis server port. 311 db (int): Redis database number. 312 key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. 313 **kwargs: Additional keyword arguments for parent Filter. 314 """ 315 super().__init__(**kwargs) 316 self.rds = redis.Redis(host=host, port=port, db=db, decode_responses=False) 317 self.key_prefix = key_prefix.encode() 318 319 try: 320 self.rds.ping() 321 except redis.exceptions.RedisError as exc: 322 raise RuntimeError(f"Cannot connect to Redis server {host}:{port}/{db}") from exc 323 324 def apply(self, document: Document) -> Document: 325 lsh_keys = document.extras.get("dedup_lsh") 326 if lsh_keys is None: 327 raise ValueError("Apply GenerateDedupLSH first") 328 329 pipe = self.rds.pipeline(transaction=False) 330 for k in lsh_keys: 331 pipe.set(self.key_prefix + b":" + k.encode(), b"1", nx=True) 332 results: list[bool | None] = pipe.execute() # If instance already exists, it returns None 333 334 if any(r is None for r in results): 335 document.is_rejected = True 336 return document 337 338 339class RedisBloomDeduplicator(Filter): 340 """ 341 Distributed deduplicator backed by **RedisBloom scalable Bloom filters**. 342 You can use this filter to store-LSHs with less memory than Redis keys with the risk of false positives. 343 344 Each *band* gets its own scalable Bloom filter on the Redis side: 345 346 ```text 347 BF.RESERVE <prefix>:<band_idx> <error> <capacity> EXPANSION <n> 348 ``` 349 350 """ 351 352 def __init__( 353 self, 354 *, 355 host: str = "localhost", 356 port: int = 6379, 357 db: int = 0, 358 key_prefix: str = "bloomdedup", 359 error_rate: float = 1e-4, 360 capacity: int = 1_000_000_000, 361 expansion: int = 2, 362 **kwargs: Any, 363 ): 364 """ 365 Initialize the RedisBloom deduplicator. 366 Args: 367 host (str): Redis server hostname. 368 port (int): Redis server port. 369 db (int): Redis database number. 370 key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. 371 error_rate (float): Desired error rate for the Bloom filter. 372 capacity (int): Initial capacity of the Bloom filter. Guide: set it to the expected number of unique LSH keys ~ num_doc * num_bands. 373 expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically. 374 **kwargs: Additional keyword arguments for parent Filter. 375 """ 376 super().__init__(**kwargs) 377 self.rds = redis.Redis(host=host, port=port, db=db) 378 self.key_prefix = key_prefix.encode() 379 380 try: 381 self.rds.execute_command( 382 "BF.RESERVE", 383 self.key_prefix, 384 error_rate, 385 capacity, 386 "EXPANSION", 387 expansion, 388 ) 389 except redis.ResponseError as e: 390 if "exists" not in str(e): 391 raise 392 393 def apply(self, document: Document) -> Document: 394 lsh_keys: list[str] | None = document.extras.get("dedup_lsh") 395 if lsh_keys is None: 396 raise ValueError( 397 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 398 ) 399 400 key_bytes = [k.encode() for k in lsh_keys] 401 402 # Return value of BF.MADD is [1,0,1,...] (0 = already exists, 1 = insertion successful) 403 flags: Iterable[int] = self.rds.execute_command("BF.MADD", self.key_prefix, *key_bytes) 404 if 0 in flags: 405 document.is_rejected = True 406 return document 407 408 409class InlineDuplicateAnalyzer(Filter): 410 def __init__(self, **kwargs: Any): 411 super().__init__(**kwargs) 412 self.docs: dict[int, Document] = dict() # doc_id -> Document mapping 413 self.hash_pool: dict[str, int] = dict() # LSH key -> doc_id mapping 414 415 self._current_doc_id = 0 416 417 def apply(self, document: Document) -> Document: 418 """ 419 Analyze duplicates inline based on the LSH keys in the document. 420 This filter cannot use in the distributed environment because it uses a local hash pool. 421 """ 422 lsh_keys = document.extras.get("dedup_lsh") 423 if lsh_keys is None: 424 raise ValueError( 425 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 426 ) 427 428 for lsh in lsh_keys: 429 if lsh in self.hash_pool: 430 document.is_rejected = True 431 document.extras["similar_doc"] = self.docs[self.hash_pool[lsh]].text 432 else: 433 self.hash_pool[lsh] = self._current_doc_id 434 self.docs[self._current_doc_id] = document 435 436 self._current_doc_id += 1 437 return document
69def char_level_splitter(text: str) -> list[str]: 70 """ 71 Split the text into characters. 72 This is a simple implementation that splits the text into individual characters. 73 """ 74 return list(text)
Split the text into characters. This is a simple implementation that splits the text into individual characters.
77def non_alpha_num_splitter(text: str) -> list[str]: 78 """ 79 Split the text into alphanumeric tokens. 80 This is a simple implementation that splits on non-alphanumeric characters. 81 """ 82 return [token for token in NON_ALPHA.split(text) if token]
Split the text into alphanumeric tokens. This is a simple implementation that splits on non-alphanumeric characters.
85def japanese_word_splitter(text: str) -> list[str]: 86 """ 87 Split the text into Japanese words using fugashi. 88 This will import fugashi and instantiate Tagger on first use. 89 """ 90 global _japanese_tagger 91 if _japanese_tagger is None: 92 fugashi = importlib.import_module("fugashi") 93 _japanese_tagger = fugashi.Tagger() 94 return [token.surface for token in _japanese_tagger(text)]
Split the text into Japanese words using fugashi. This will import fugashi and instantiate Tagger on first use.
97class GenerateDedupLSH(Filter): 98 """ 99 Filter that uses MinHash + Locality-Sensitive Hashing (LSH) to assign 100 deduplication keys to documents, allowing fast near-duplicate detection. 101 102 Attributes: 103 num_perm (int): Number of permutations (hash functions) for MinHash. 104 threshold (float): Similarity threshold for tuning LSH parameters. 105 tokenizer (Callable[[str], Iterable[str]]): Function to tokenize text. 106 n_grams (int): n-gram size for token grouping. 107 seed (int): Random seed for MinHash. 108 num_bands (int): Number of LSH bands computed from threshold and num_perm. 109 band_size (int): Number of hashes per band. 110 111 Notes 112 ----- 113 `_optimal_param` searches for the optimal number of **bands** (`b`) and 114 **rows per band** (`r`) that minimise a weighted sum of false positives / 115 false negatives at the specified *threshold*. 116 """ 117 118 _BYTES_PER_U32: Final[int] = 4 119 120 def __init__( 121 self, 122 num_perm: int = 500, 123 threshold: float = 0.8, 124 tokenizer: Callable[[str], Iterable[str]] = char_level_splitter, 125 n_grams: int = 5, 126 seed: int = 42, 127 **kwargs: Any, 128 ) -> None: 129 """ 130 Initialize the deduplication filter with MinHash and LSH settings. 131 132 Args: 133 num_perm: Number of hash permutations for MinHash signature length. 134 threshold: Similarity threshold to decide optimal LSH parameters. 135 tokenizer: Function to split text into tokens. 136 n_grams: Number of tokens per n-gram for MinHash update. 137 seed: Seed for hash permutation consistency. 138 **kwargs: Additional keyword arguments for parent Filter. 139 """ 140 super().__init__(**kwargs) 141 self.num_perm = num_perm 142 self.threshold = threshold 143 self.tokenizer = tokenizer 144 self.n_grams = n_grams 145 self.seed = seed 146 147 # Compute optimal number of bands and band size based on threshold 148 self.num_bands, self.band_size = _optimal_param( 149 threshold=self.threshold, 150 num_perm=self.num_perm, 151 false_negative_weight=0.5, 152 false_positive_weight=0.5, 153 ) 154 155 def calculate_minhash_signature(self, text: str) -> NDArray[np.uint32]: 156 """ 157 Compute MinHash signature of input text as an array of uint32. 158 159 Steps: 160 1. Tokenize text using the provided tokenizer. 161 2. Generate n-gram tokens. 162 3. Update MinHash with n-gram tokens. 163 164 Args: 165 text: Input document text to be hashed. 166 167 Returns: 168 A 1D numpy array of shape (num_perm,) with dtype uint32. 169 """ 170 tokens = self.tokenizer(text) 171 n_gram_tokens = ngrams(tokens, self.n_grams) 172 # Join tokens into string n-grams for hashing 173 tokens = [" ".join(grams) for grams in n_gram_tokens] 174 # Initialize and update RMinHash 175 minhash = RMinHash(num_perm=self.num_perm, seed=self.seed) 176 minhash.update(tokens) 177 # Convert digest (list of ints) to numpy uint32 array 178 return np.asarray(minhash.digest(), dtype=np.uint32) 179 180 def signature_to_lsh_digest( 181 self, signature: NDArray[np.uint32], band_size: int, band_idx: int 182 ) -> int: 183 """ 184 Convert a slice of the MinHash signature into an LSH digest with less memory overhead. 185 186 This method is optimized for speed by avoiding copies: 187 - We view the uint32 array as raw bytes (uint8 view). 188 - We create a memoryview of the byte slice for the specified band. 189 - We compute a 128-bit hash directly on the slice. 190 191 Args: 192 signature: 1D numpy array of uint32 representing MinHash signature. 193 band_size: Number of hashes per LSH band. 194 band_idx: Index of the band to hash (0-based). 195 196 Returns: 197 An integer representing the 128-bit hash digest of the band. 198 199 Raises: 200 AssertionError: If signature shape/dtype or band index is invalid. 201 """ 202 assert signature.dtype == np.uint32 and signature.ndim == 1, ( 203 "signature must be a 1D numpy array of uint32" 204 ) 205 assert 0 <= band_idx < self.num_bands, ( 206 f"band_idx {band_idx} out of range [0, {self.num_bands})" 207 ) 208 assert len(signature) >= band_size * self.num_bands, ( 209 "signature length is too short for given band_size and num_bands" 210 ) 211 212 # Compute byte offsets for the selected band 213 start = band_idx * band_size * self._BYTES_PER_U32 214 stop = start + band_size * self._BYTES_PER_U32 215 216 # View signature as raw bytes without copy. memoryview avoids creating new bytes. 217 u8 = signature.view(np.uint8) 218 mv = memoryview(u8)[start:stop] # type: ignore[arg-type] 219 220 return xxhash.xxh128_intdigest(mv) 221 222 def _format_lsh_key(self, band_idx: int, digest: int) -> str: 223 """ 224 Format the LSH key for a given band index and digest. 225 """ 226 return f"{band_idx}+{digest:032x}" 227 228 def apply(self, document: Document) -> Document: 229 """ 230 Decorate the document with LSH deduplication keys. 231 232 For each band, compute the digest and format as a hex string: 233 '<band_idx>+<128-bit-digest-hex>'. 234 Keys are stored in document.extras['dedup_lsh']. 235 236 Args: 237 document: Document object with 'text' attribute. 238 239 Returns: 240 The same Document object with 'dedup_lsh' added in extras. 241 """ 242 signature = self.calculate_minhash_signature(document.text) 243 lsh_keys = [ 244 self._format_lsh_key( 245 band_idx, self.signature_to_lsh_digest(signature, self.band_size, band_idx) 246 ) 247 for band_idx in range(self.num_bands) 248 ] 249 250 document.extras["dedup_lsh"] = lsh_keys 251 return document
Filter that uses MinHash + Locality-Sensitive Hashing (LSH) to assign deduplication keys to documents, allowing fast near-duplicate detection.
Attributes: num_perm (int): Number of permutations (hash functions) for MinHash. threshold (float): Similarity threshold for tuning LSH parameters. tokenizer (Callable[[str], Iterable[str]]): Function to tokenize text. n_grams (int): n-gram size for token grouping. seed (int): Random seed for MinHash. num_bands (int): Number of LSH bands computed from threshold and num_perm. band_size (int): Number of hashes per band.
Notes
_optimal_param
searches for the optimal number of bands (b
) and
rows per band (r
) that minimise a weighted sum of false positives /
false negatives at the specified threshold.
120 def __init__( 121 self, 122 num_perm: int = 500, 123 threshold: float = 0.8, 124 tokenizer: Callable[[str], Iterable[str]] = char_level_splitter, 125 n_grams: int = 5, 126 seed: int = 42, 127 **kwargs: Any, 128 ) -> None: 129 """ 130 Initialize the deduplication filter with MinHash and LSH settings. 131 132 Args: 133 num_perm: Number of hash permutations for MinHash signature length. 134 threshold: Similarity threshold to decide optimal LSH parameters. 135 tokenizer: Function to split text into tokens. 136 n_grams: Number of tokens per n-gram for MinHash update. 137 seed: Seed for hash permutation consistency. 138 **kwargs: Additional keyword arguments for parent Filter. 139 """ 140 super().__init__(**kwargs) 141 self.num_perm = num_perm 142 self.threshold = threshold 143 self.tokenizer = tokenizer 144 self.n_grams = n_grams 145 self.seed = seed 146 147 # Compute optimal number of bands and band size based on threshold 148 self.num_bands, self.band_size = _optimal_param( 149 threshold=self.threshold, 150 num_perm=self.num_perm, 151 false_negative_weight=0.5, 152 false_positive_weight=0.5, 153 )
Initialize the deduplication filter with MinHash and LSH settings.
Args: num_perm: Number of hash permutations for MinHash signature length. threshold: Similarity threshold to decide optimal LSH parameters. tokenizer: Function to split text into tokens. n_grams: Number of tokens per n-gram for MinHash update. seed: Seed for hash permutation consistency. **kwargs: Additional keyword arguments for parent Filter.
155 def calculate_minhash_signature(self, text: str) -> NDArray[np.uint32]: 156 """ 157 Compute MinHash signature of input text as an array of uint32. 158 159 Steps: 160 1. Tokenize text using the provided tokenizer. 161 2. Generate n-gram tokens. 162 3. Update MinHash with n-gram tokens. 163 164 Args: 165 text: Input document text to be hashed. 166 167 Returns: 168 A 1D numpy array of shape (num_perm,) with dtype uint32. 169 """ 170 tokens = self.tokenizer(text) 171 n_gram_tokens = ngrams(tokens, self.n_grams) 172 # Join tokens into string n-grams for hashing 173 tokens = [" ".join(grams) for grams in n_gram_tokens] 174 # Initialize and update RMinHash 175 minhash = RMinHash(num_perm=self.num_perm, seed=self.seed) 176 minhash.update(tokens) 177 # Convert digest (list of ints) to numpy uint32 array 178 return np.asarray(minhash.digest(), dtype=np.uint32)
Compute MinHash signature of input text as an array of uint32.
Steps: 1. Tokenize text using the provided tokenizer. 2. Generate n-gram tokens. 3. Update MinHash with n-gram tokens.
Args: text: Input document text to be hashed.
Returns: A 1D numpy array of shape (num_perm,) with dtype uint32.
180 def signature_to_lsh_digest( 181 self, signature: NDArray[np.uint32], band_size: int, band_idx: int 182 ) -> int: 183 """ 184 Convert a slice of the MinHash signature into an LSH digest with less memory overhead. 185 186 This method is optimized for speed by avoiding copies: 187 - We view the uint32 array as raw bytes (uint8 view). 188 - We create a memoryview of the byte slice for the specified band. 189 - We compute a 128-bit hash directly on the slice. 190 191 Args: 192 signature: 1D numpy array of uint32 representing MinHash signature. 193 band_size: Number of hashes per LSH band. 194 band_idx: Index of the band to hash (0-based). 195 196 Returns: 197 An integer representing the 128-bit hash digest of the band. 198 199 Raises: 200 AssertionError: If signature shape/dtype or band index is invalid. 201 """ 202 assert signature.dtype == np.uint32 and signature.ndim == 1, ( 203 "signature must be a 1D numpy array of uint32" 204 ) 205 assert 0 <= band_idx < self.num_bands, ( 206 f"band_idx {band_idx} out of range [0, {self.num_bands})" 207 ) 208 assert len(signature) >= band_size * self.num_bands, ( 209 "signature length is too short for given band_size and num_bands" 210 ) 211 212 # Compute byte offsets for the selected band 213 start = band_idx * band_size * self._BYTES_PER_U32 214 stop = start + band_size * self._BYTES_PER_U32 215 216 # View signature as raw bytes without copy. memoryview avoids creating new bytes. 217 u8 = signature.view(np.uint8) 218 mv = memoryview(u8)[start:stop] # type: ignore[arg-type] 219 220 return xxhash.xxh128_intdigest(mv)
Convert a slice of the MinHash signature into an LSH digest with less memory overhead.
This method is optimized for speed by avoiding copies:
- We view the uint32 array as raw bytes (uint8 view).
- We create a memoryview of the byte slice for the specified band.
- We compute a 128-bit hash directly on the slice.
Args: signature: 1D numpy array of uint32 representing MinHash signature. band_size: Number of hashes per LSH band. band_idx: Index of the band to hash (0-based).
Returns: An integer representing the 128-bit hash digest of the band.
Raises: AssertionError: If signature shape/dtype or band index is invalid.
228 def apply(self, document: Document) -> Document: 229 """ 230 Decorate the document with LSH deduplication keys. 231 232 For each band, compute the digest and format as a hex string: 233 '<band_idx>+<128-bit-digest-hex>'. 234 Keys are stored in document.extras['dedup_lsh']. 235 236 Args: 237 document: Document object with 'text' attribute. 238 239 Returns: 240 The same Document object with 'dedup_lsh' added in extras. 241 """ 242 signature = self.calculate_minhash_signature(document.text) 243 lsh_keys = [ 244 self._format_lsh_key( 245 band_idx, self.signature_to_lsh_digest(signature, self.band_size, band_idx) 246 ) 247 for band_idx in range(self.num_bands) 248 ] 249 250 document.extras["dedup_lsh"] = lsh_keys 251 return document
Decorate the document with LSH deduplication keys.
For each band, compute the digest and format as a hex string:
'
Args: document: Document object with 'text' attribute.
Returns: The same Document object with 'dedup_lsh' added in extras.
254class InlineDeduplicator(Filter): 255 """ 256 Simple in‑memory deduplicator. 257 258 Stores every LSH key in a local :pyclass:`set`. If any key of the incoming 259 document is already present, the document is marked as duplicate via 260 `document.is_rejected = True`. 261 262 **Limitations** 263 ------------- 264 *State is per‑process only.* Running multiple workers or machines will *not* 265 share the key set – use :class:`RedisDeduplicator` or 266 :class:`RedisBloomDeduplicator` for distributed setups. 267 """ 268 269 def __init__(self, **kwargs: Any): 270 super().__init__(**kwargs) 271 self.hash_pool: set[str] = set() 272 273 def apply(self, document: Document) -> Document: 274 """ 275 Inline deduplication based on the LSH keys in the document. 276 This filter cannot use in the distributed environment because it uses a local hash pool. 277 """ 278 lsh_keys = document.extras.get("dedup_lsh") 279 if lsh_keys is None: 280 raise ValueError( 281 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 282 ) 283 284 for lsh in lsh_keys: 285 if lsh in self.hash_pool: 286 document.is_rejected = True 287 else: 288 self.hash_pool.add(lsh) 289 return document
Simple in‑memory deduplicator.
Stores every LSH key in a local :pyclass:set
. If any key of the incoming
document is already present, the document is marked as duplicate via
document.is_rejected = True
.
Limitations
State is per‑process only. Running multiple workers or machines will not
share the key set – use RedisDeduplicator
or
RedisBloomDeduplicator
for distributed setups.
269 def __init__(self, **kwargs: Any): 270 super().__init__(**kwargs) 271 self.hash_pool: set[str] = set()
Initialize the filter.
Parameters
p : float
The probability of applying the filter.
If p
is 1, the filter will always be applied.
skip_rejected : bool
If True
, the filter will skip documents that are already rejected.
If you want to apply the filter to all documents (e.g., postprocess), set this to False
.
random_state : Optional[Union[int, np.random.Generator]]
Seed for the random number generator.
If None
, a new random number generator will be created.
If None
, and use in the Compose
class, the random state is shared with the Compose
object.
use_batch : bool
If True
, the filter will process documents in batches in the apply_stream
method.
batch_size : int
The size of the batch to process documents in the apply_stream
method.
kwargs : Any
Additional keyword arguments to pass to the filter.
273 def apply(self, document: Document) -> Document: 274 """ 275 Inline deduplication based on the LSH keys in the document. 276 This filter cannot use in the distributed environment because it uses a local hash pool. 277 """ 278 lsh_keys = document.extras.get("dedup_lsh") 279 if lsh_keys is None: 280 raise ValueError( 281 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 282 ) 283 284 for lsh in lsh_keys: 285 if lsh in self.hash_pool: 286 document.is_rejected = True 287 else: 288 self.hash_pool.add(lsh) 289 return document
Inline deduplication based on the LSH keys in the document. This filter cannot use in the distributed environment because it uses a local hash pool.
292class RedisDeduplicator(Filter): 293 """ 294 Distributed deduplicator using **plain Redis keys**. 295 You have to run a Redis server and pass its connection parameters. 296 """ 297 298 def __init__( 299 self, 300 *, 301 host: str = "localhost", 302 port: int = 6379, 303 db: int = 0, 304 key_prefix: str = "dedup", 305 **kwargs: Any, 306 ) -> None: 307 """ 308 Initialize the Redis deduplicator. 309 Args: 310 host (str): Redis server hostname. 311 port (int): Redis server port. 312 db (int): Redis database number. 313 key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. 314 **kwargs: Additional keyword arguments for parent Filter. 315 """ 316 super().__init__(**kwargs) 317 self.rds = redis.Redis(host=host, port=port, db=db, decode_responses=False) 318 self.key_prefix = key_prefix.encode() 319 320 try: 321 self.rds.ping() 322 except redis.exceptions.RedisError as exc: 323 raise RuntimeError(f"Cannot connect to Redis server {host}:{port}/{db}") from exc 324 325 def apply(self, document: Document) -> Document: 326 lsh_keys = document.extras.get("dedup_lsh") 327 if lsh_keys is None: 328 raise ValueError("Apply GenerateDedupLSH first") 329 330 pipe = self.rds.pipeline(transaction=False) 331 for k in lsh_keys: 332 pipe.set(self.key_prefix + b":" + k.encode(), b"1", nx=True) 333 results: list[bool | None] = pipe.execute() # If instance already exists, it returns None 334 335 if any(r is None for r in results): 336 document.is_rejected = True 337 return document
Distributed deduplicator using plain Redis keys. You have to run a Redis server and pass its connection parameters.
298 def __init__( 299 self, 300 *, 301 host: str = "localhost", 302 port: int = 6379, 303 db: int = 0, 304 key_prefix: str = "dedup", 305 **kwargs: Any, 306 ) -> None: 307 """ 308 Initialize the Redis deduplicator. 309 Args: 310 host (str): Redis server hostname. 311 port (int): Redis server port. 312 db (int): Redis database number. 313 key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. 314 **kwargs: Additional keyword arguments for parent Filter. 315 """ 316 super().__init__(**kwargs) 317 self.rds = redis.Redis(host=host, port=port, db=db, decode_responses=False) 318 self.key_prefix = key_prefix.encode() 319 320 try: 321 self.rds.ping() 322 except redis.exceptions.RedisError as exc: 323 raise RuntimeError(f"Cannot connect to Redis server {host}:{port}/{db}") from exc
Initialize the Redis deduplicator. Args: host (str): Redis server hostname. port (int): Redis server port. db (int): Redis database number. key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. **kwargs: Additional keyword arguments for parent Filter.
325 def apply(self, document: Document) -> Document: 326 lsh_keys = document.extras.get("dedup_lsh") 327 if lsh_keys is None: 328 raise ValueError("Apply GenerateDedupLSH first") 329 330 pipe = self.rds.pipeline(transaction=False) 331 for k in lsh_keys: 332 pipe.set(self.key_prefix + b":" + k.encode(), b"1", nx=True) 333 results: list[bool | None] = pipe.execute() # If instance already exists, it returns None 334 335 if any(r is None for r in results): 336 document.is_rejected = True 337 return document
Definition of filter behavior.
The document must have a protocol TextContent
,
and mostly used hojichar.Document class.
In this method, the filter will modify document.text
or
document.extras
and set document.is_rejected = True
to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
340class RedisBloomDeduplicator(Filter): 341 """ 342 Distributed deduplicator backed by **RedisBloom scalable Bloom filters**. 343 You can use this filter to store-LSHs with less memory than Redis keys with the risk of false positives. 344 345 Each *band* gets its own scalable Bloom filter on the Redis side: 346 347 ```text 348 BF.RESERVE <prefix>:<band_idx> <error> <capacity> EXPANSION <n> 349 ``` 350 351 """ 352 353 def __init__( 354 self, 355 *, 356 host: str = "localhost", 357 port: int = 6379, 358 db: int = 0, 359 key_prefix: str = "bloomdedup", 360 error_rate: float = 1e-4, 361 capacity: int = 1_000_000_000, 362 expansion: int = 2, 363 **kwargs: Any, 364 ): 365 """ 366 Initialize the RedisBloom deduplicator. 367 Args: 368 host (str): Redis server hostname. 369 port (int): Redis server port. 370 db (int): Redis database number. 371 key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. 372 error_rate (float): Desired error rate for the Bloom filter. 373 capacity (int): Initial capacity of the Bloom filter. Guide: set it to the expected number of unique LSH keys ~ num_doc * num_bands. 374 expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically. 375 **kwargs: Additional keyword arguments for parent Filter. 376 """ 377 super().__init__(**kwargs) 378 self.rds = redis.Redis(host=host, port=port, db=db) 379 self.key_prefix = key_prefix.encode() 380 381 try: 382 self.rds.execute_command( 383 "BF.RESERVE", 384 self.key_prefix, 385 error_rate, 386 capacity, 387 "EXPANSION", 388 expansion, 389 ) 390 except redis.ResponseError as e: 391 if "exists" not in str(e): 392 raise 393 394 def apply(self, document: Document) -> Document: 395 lsh_keys: list[str] | None = document.extras.get("dedup_lsh") 396 if lsh_keys is None: 397 raise ValueError( 398 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 399 ) 400 401 key_bytes = [k.encode() for k in lsh_keys] 402 403 # Return value of BF.MADD is [1,0,1,...] (0 = already exists, 1 = insertion successful) 404 flags: Iterable[int] = self.rds.execute_command("BF.MADD", self.key_prefix, *key_bytes) 405 if 0 in flags: 406 document.is_rejected = True 407 return document
Distributed deduplicator backed by RedisBloom scalable Bloom filters. You can use this filter to store-LSHs with less memory than Redis keys with the risk of false positives.
Each band gets its own scalable Bloom filter on the Redis side:
BF.RESERVE <prefix>:<band_idx> <error> <capacity> EXPANSION <n>
353 def __init__( 354 self, 355 *, 356 host: str = "localhost", 357 port: int = 6379, 358 db: int = 0, 359 key_prefix: str = "bloomdedup", 360 error_rate: float = 1e-4, 361 capacity: int = 1_000_000_000, 362 expansion: int = 2, 363 **kwargs: Any, 364 ): 365 """ 366 Initialize the RedisBloom deduplicator. 367 Args: 368 host (str): Redis server hostname. 369 port (int): Redis server port. 370 db (int): Redis database number. 371 key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. 372 error_rate (float): Desired error rate for the Bloom filter. 373 capacity (int): Initial capacity of the Bloom filter. Guide: set it to the expected number of unique LSH keys ~ num_doc * num_bands. 374 expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically. 375 **kwargs: Additional keyword arguments for parent Filter. 376 """ 377 super().__init__(**kwargs) 378 self.rds = redis.Redis(host=host, port=port, db=db) 379 self.key_prefix = key_prefix.encode() 380 381 try: 382 self.rds.execute_command( 383 "BF.RESERVE", 384 self.key_prefix, 385 error_rate, 386 capacity, 387 "EXPANSION", 388 expansion, 389 ) 390 except redis.ResponseError as e: 391 if "exists" not in str(e): 392 raise
Initialize the RedisBloom deduplicator. Args: host (str): Redis server hostname. port (int): Redis server port. db (int): Redis database number. key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. error_rate (float): Desired error rate for the Bloom filter. capacity (int): Initial capacity of the Bloom filter. Guide: set it to the expected number of unique LSH keys ~ num_doc * num_bands. expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically. **kwargs: Additional keyword arguments for parent Filter.
394 def apply(self, document: Document) -> Document: 395 lsh_keys: list[str] | None = document.extras.get("dedup_lsh") 396 if lsh_keys is None: 397 raise ValueError( 398 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 399 ) 400 401 key_bytes = [k.encode() for k in lsh_keys] 402 403 # Return value of BF.MADD is [1,0,1,...] (0 = already exists, 1 = insertion successful) 404 flags: Iterable[int] = self.rds.execute_command("BF.MADD", self.key_prefix, *key_bytes) 405 if 0 in flags: 406 document.is_rejected = True 407 return document
Definition of filter behavior.
The document must have a protocol TextContent
,
and mostly used hojichar.Document class.
In this method, the filter will modify document.text
or
document.extras
and set document.is_rejected = True
to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
410class InlineDuplicateAnalyzer(Filter): 411 def __init__(self, **kwargs: Any): 412 super().__init__(**kwargs) 413 self.docs: dict[int, Document] = dict() # doc_id -> Document mapping 414 self.hash_pool: dict[str, int] = dict() # LSH key -> doc_id mapping 415 416 self._current_doc_id = 0 417 418 def apply(self, document: Document) -> Document: 419 """ 420 Analyze duplicates inline based on the LSH keys in the document. 421 This filter cannot use in the distributed environment because it uses a local hash pool. 422 """ 423 lsh_keys = document.extras.get("dedup_lsh") 424 if lsh_keys is None: 425 raise ValueError( 426 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 427 ) 428 429 for lsh in lsh_keys: 430 if lsh in self.hash_pool: 431 document.is_rejected = True 432 document.extras["similar_doc"] = self.docs[self.hash_pool[lsh]].text 433 else: 434 self.hash_pool[lsh] = self._current_doc_id 435 self.docs[self._current_doc_id] = document 436 437 self._current_doc_id += 1 438 return document
Base class for all filters. Document-level filters must inherit from this class.
The definition of text processing is in apply
method.
If you define a new filter, override the method.
When this class is called, apply the filter from string to string.
With context manager, you can use the filter as follows:
with YourFilter(p=0.5) as filt:
text = filt("This is a sample text.")
411 def __init__(self, **kwargs: Any): 412 super().__init__(**kwargs) 413 self.docs: dict[int, Document] = dict() # doc_id -> Document mapping 414 self.hash_pool: dict[str, int] = dict() # LSH key -> doc_id mapping 415 416 self._current_doc_id = 0
Initialize the filter.
Parameters
p : float
The probability of applying the filter.
If p
is 1, the filter will always be applied.
skip_rejected : bool
If True
, the filter will skip documents that are already rejected.
If you want to apply the filter to all documents (e.g., postprocess), set this to False
.
random_state : Optional[Union[int, np.random.Generator]]
Seed for the random number generator.
If None
, a new random number generator will be created.
If None
, and use in the Compose
class, the random state is shared with the Compose
object.
use_batch : bool
If True
, the filter will process documents in batches in the apply_stream
method.
batch_size : int
The size of the batch to process documents in the apply_stream
method.
kwargs : Any
Additional keyword arguments to pass to the filter.
418 def apply(self, document: Document) -> Document: 419 """ 420 Analyze duplicates inline based on the LSH keys in the document. 421 This filter cannot use in the distributed environment because it uses a local hash pool. 422 """ 423 lsh_keys = document.extras.get("dedup_lsh") 424 if lsh_keys is None: 425 raise ValueError( 426 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 427 ) 428 429 for lsh in lsh_keys: 430 if lsh in self.hash_pool: 431 document.is_rejected = True 432 document.extras["similar_doc"] = self.docs[self.hash_pool[lsh]].text 433 else: 434 self.hash_pool[lsh] = self._current_doc_id 435 self.docs[self._current_doc_id] = document 436 437 self._current_doc_id += 1 438 return document
Analyze duplicates inline based on the LSH keys in the document. This filter cannot use in the distributed environment because it uses a local hash pool.