hojichar.filters.deduplication
This module provides filters for deduplication using MinHash and Locality-Sensitive Hashing (LSH).
If you want to use this module, install hojichar via pip install 'hojichar[dedup]'
What is MinHash LSH?
A gentle introduction for first‑time leaner for MinHash LSH
1 What problem does this solve?
When you have millions of documents it is too expensive to compare every pair directly. MinHash + Locality‑Sensitive Hashing (LSH) lets you
- estimate Jaccard similarity extremely fast, and memory‑efficiently
- retrieve near‑duplicates in sub‑linear time.
In practice you can keep a single set or Redis index of the generated LSH keys and ask:
"Does any existing document share at least one LSH key with mine?"
If the answer is yes the two documents are almost certainly similar; if no they are very likely different.
How the pipeline works
GenerateDedupLSH filter generates LSH keys for each document.
- Tokenize
- Split text into tokens. (Default: character‑level, but you can plug in any callable.)
- n-grams
- Group tokens into n‑grams (n_grams=5) to capture context.
- MinHash
- Hash each n‑gram with
num_permindependent permutations and keep only the minimum value. The resulting signature is a vector of num_perm 32‑bit integers. - This module uses
rensa.RMinHashwhich is a fast MinHash implementation by Rust language.
- Hash each n‑gram with
- Banding and compression
- Split the signature into b bands, each containing
rintegers (num_perm ≈ b×r). - Treat the
rintegers as raw bytes, hash them with xxhash‑128, and format as+ .
- Split the signature into b bands, each containing
- Output
- Store all band keys in
document.extras['dedup_lsh']as a list of strings.
- Store all band keys in
InlineDeduplicator, RedisDeduplicator, and RedisBloomDeduplicator filters use the generated LSH keys to mark documents as duplicates.
InlineDeduplicatorstores LSH keys in a local set, so it works only in a single process.RedisDeduplicatorstores LSH keys in Redis, so it works in a distributed environment.RedisBloomDeduplicatorstores LSH keys in RedisBloom, which is a scalable Bloom filter. It uses less memory than Redis keys but may return false positives.
1""" 2This module provides filters for deduplication using MinHash and Locality-Sensitive Hashing (LSH). 3If you want to use this module, install hojichar via `pip install 'hojichar[dedup]'` 4 5## What is MinHash LSH? 6*A gentle introduction for first‑time leaner for MinHash LSH* 7 8--- 9 10### 1 What problem does this solve? 11 12When you have **millions of documents** it is too expensive to compare every pair directly. 13**MinHash + Locality‑Sensitive Hashing (LSH)** lets you 14 15- estimate Jaccard similarity extremely fast, and memory‑efficiently 16- retrieve **near‑duplicates** in sub‑linear time. 17 18In practice you can keep a single `set` or Redis index of the generated **LSH keys** and ask: 19"Does any existing document share *at least one* LSH key with mine?" 20 21If the answer is yes the two documents are almost certainly similar; if no they are very likely different. 22 23### How the pipeline works 24 25`GenerateDedupLSH` filter generates LSH keys for each document. 26 271. Tokenize 28 - Split text into tokens. (Default: character‑level, but you can plug in any callable.) 292. n-grams 30 - Group tokens into n‑grams (n_grams=5) to capture context. 313. MinHash 32 - Hash each n‑gram with `num_perm` independent permutations and keep only the minimum value. The resulting **signature** is a vector of num_perm 32‑bit integers. 33 - This module uses `rensa.RMinHash` which is a fast MinHash implementation by Rust language. 344. Banding and compression 35 - Split the signature into b bands, each containing `r` integers (num_perm ≈ b×r). 36 - Treat the `r` integers as raw bytes, hash them with xxhash‑128, and format as <band_idx>+<digest32hex>. 375. Output 38 - Store all band keys in `document.extras['dedup_lsh']` as a list of strings. 39 40`InlineDeduplicator`, `RedisDeduplicator`, and `RedisBloomDeduplicator` filters use the generated LSH keys to mark documents as duplicates. 41 42- `InlineDeduplicator` stores LSH keys in a local set, so it works only in a single process. 43- `RedisDeduplicator` stores LSH keys in Redis, so it works in a distributed environment. 44- `RedisBloomDeduplicator` stores LSH keys in RedisBloom, which is a scalable Bloom filter. It uses less memory than Redis keys but may return false positives. 45 46""" 47 48from __future__ import annotations 49 50import importlib 51import re 52import sys 53from typing import Any, Callable, Final, Iterable, Optional 54 55import numpy as np 56from numpy.typing import NDArray 57 58try: 59 import redis 60 import xxhash 61 from datasketch.lsh import _optimal_param # type: ignore 62 from nltk.util import ngrams # type: ignore 63 from rensa import RMinHash # type: ignore 64 65 is_loaded_dedup = True 66except ImportError: 67 is_loaded_dedup = False 68 69 70from hojichar import Document, Filter 71 72_japanese_tagger: Optional["fugashi.Tagger"] = None # type: ignore[name-defined] # noqa: F821 73NON_ALPHA = re.compile("[^A-Za-z_0-9]") 74IS_LOADED_DEDUP_ERROR_MSG = ( 75 "Failed to import redis, xxhash, rensa, or datasketch. " 76 "Please install the extra dependencies with `pip install 'hojichar[dedup]'`" 77) 78 79 80def char_level_splitter(text: str) -> list[str]: 81 """ 82 Split the text into characters. 83 This is a simple implementation that splits the text into individual characters. 84 """ 85 return list(text) 86 87 88def non_alpha_num_splitter(text: str) -> list[str]: 89 """ 90 Split the text into alphanumeric tokens. 91 This is a simple implementation that splits on non-alphanumeric characters. 92 """ 93 return [token for token in NON_ALPHA.split(text) if token] 94 95 96def japanese_word_splitter(text: str) -> list[str]: 97 """ 98 Split the text into Japanese words using fugashi. 99 This will import fugashi and instantiate Tagger on first use. 100 """ 101 global _japanese_tagger 102 if _japanese_tagger is None: 103 fugashi = importlib.import_module("fugashi") 104 _japanese_tagger = fugashi.Tagger() 105 return [token.surface for token in _japanese_tagger(text)] 106 107 108class GenerateDedupLSH(Filter): 109 """ 110 Filter that uses MinHash + Locality-Sensitive Hashing (LSH) to assign 111 deduplication keys to documents, allowing fast near-duplicate detection. 112 113 Attributes: 114 num_perm (int): Number of permutations (hash functions) for MinHash. 115 threshold (float): Similarity threshold for tuning LSH parameters. 116 tokenizer (Callable[[str], Iterable[str]]): Function to tokenize text. 117 n_grams (int): n-gram size for token grouping. 118 seed (int): Random seed for MinHash. 119 num_bands (int): Number of LSH bands computed from threshold and num_perm. 120 band_size (int): Number of hashes per band. 121 122 Notes 123 ----- 124 `_optimal_param` searches for the optimal number of **bands** (`b`) and 125 **rows per band** (`r`) that minimise a weighted sum of false positives / 126 false negatives at the specified *threshold*. 127 """ 128 129 _BYTES_PER_U32: Final[int] = 4 130 131 def __init__( 132 self, 133 num_perm: int = 500, 134 threshold: float = 0.8, 135 tokenizer: Callable[[str], Iterable[str]] = char_level_splitter, 136 n_grams: int = 5, 137 seed: int = 42, 138 **kwargs: Any, 139 ) -> None: 140 """ 141 Initialize the deduplication filter with MinHash and LSH settings. 142 143 Args: 144 num_perm: Number of hash permutations for MinHash signature length. 145 threshold: Similarity threshold to decide optimal LSH parameters. 146 tokenizer: Function to split text into tokens. 147 n_grams: Number of tokens per n-gram for MinHash update. 148 seed: Seed for hash permutation consistency. 149 **kwargs: Additional keyword arguments for parent Filter. 150 """ 151 super().__init__(**kwargs) 152 if not is_loaded_dedup: 153 raise ImportError(IS_LOADED_DEDUP_ERROR_MSG) 154 self.num_perm = num_perm 155 self.threshold = threshold 156 self.tokenizer = tokenizer 157 self.n_grams = n_grams 158 self.seed = seed 159 160 # Compute optimal number of bands and band size based on threshold 161 self.num_bands, self.band_size = _optimal_param( 162 threshold=self.threshold, 163 num_perm=self.num_perm, 164 false_negative_weight=0.5, 165 false_positive_weight=0.5, 166 ) 167 168 def calculate_minhash_signature(self, text: str) -> NDArray[np.uint32]: 169 """ 170 Compute MinHash signature of input text as an array of uint32. 171 172 Steps: 173 1. Tokenize text using the provided tokenizer. 174 2. Generate n-gram tokens. 175 3. Update MinHash with n-gram tokens. 176 177 Args: 178 text: Input document text to be hashed. 179 180 Returns: 181 A 1D numpy array of shape (num_perm,) with dtype uint32. 182 """ 183 tokens = self.tokenizer(text) 184 n_gram_tokens = ngrams(tokens, self.n_grams) 185 # Join tokens into string n-grams for hashing 186 tokens = [" ".join(grams) for grams in n_gram_tokens] 187 # Initialize and update RMinHash 188 minhash = RMinHash(num_perm=self.num_perm, seed=self.seed) 189 minhash.update(tokens) 190 # Convert digest (list of ints) to numpy uint32 array 191 return np.asarray(minhash.digest(), dtype=np.uint32) 192 193 def _sig_bytes_le(self, sig: NDArray[np.uint32]) -> memoryview: 194 """ 195 Return the signature as *little-endian* byte view. 196 Platform‑independent way to get bytes from a numpy array. 197 """ 198 if sys.byteorder == "little": 199 # amd64 / arm64 (little) 200 return memoryview(sig).cast("B") # type: ignore[arg-type] # zero-copy 201 else: 202 # big-endian CPU 203 return memoryview(sig.byteswap()).cast("B") # type: ignore[arg-type] # 1 copy 204 205 def signature_to_lsh_digest( 206 self, signature: NDArray[np.uint32], band_size: int, band_idx: int 207 ) -> int: 208 """ 209 Convert a slice of the MinHash signature into an LSH digest with less memory overhead. 210 211 This method is optimized for speed by avoiding copies: 212 - We view the uint32 array as raw bytes (uint8 view). 213 - We create a memoryview of the byte slice for the specified band. 214 - We compute a 128-bit hash directly on the slice. 215 216 Args: 217 signature: 1D numpy array of uint32 representing MinHash signature. 218 band_size: Number of hashes per LSH band. 219 band_idx: Index of the band to hash (0-based). 220 221 Returns: 222 An integer representing the 128-bit hash digest of the band. 223 224 Raises: 225 AssertionError: If signature shape/dtype or band index is invalid. 226 """ 227 assert signature.dtype == np.uint32 and signature.ndim == 1, ( 228 "signature must be a 1D numpy array of uint32" 229 ) 230 assert 0 <= band_idx < self.num_bands, ( 231 f"band_idx {band_idx} out of range [0, {self.num_bands})" 232 ) 233 assert len(signature) >= band_size * self.num_bands, ( 234 "signature length is too short for given band_size and num_bands" 235 ) 236 237 # Compute byte offsets for the selected band 238 start = band_idx * band_size * self._BYTES_PER_U32 239 stop = start + band_size * self._BYTES_PER_U32 240 241 # View signature as raw bytes without copy. memoryview avoids creating new bytes. 242 mv = self._sig_bytes_le(signature)[start:stop] # slice view 243 244 return xxhash.xxh128_intdigest(mv) 245 246 def _format_lsh_key(self, band_idx: int, digest: int) -> str: 247 """ 248 Format the LSH key for a given band index and digest. 249 """ 250 return f"{band_idx}+{digest:032x}" 251 252 def apply(self, document: Document) -> Document: 253 """ 254 Decorate the document with LSH deduplication keys. 255 256 For each band, compute the digest and format as a hex string: 257 '<band_idx>+<128-bit-digest-hex>'. 258 Keys are stored in document.extras['dedup_lsh']. 259 260 Args: 261 document: Document object with 'text' attribute. 262 263 Returns: 264 The same Document object with 'dedup_lsh' added in extras. 265 """ 266 signature = self.calculate_minhash_signature(document.text) 267 lsh_keys = [ 268 self._format_lsh_key( 269 band_idx, self.signature_to_lsh_digest(signature, self.band_size, band_idx) 270 ) 271 for band_idx in range(self.num_bands) 272 ] 273 274 document.extras["dedup_lsh"] = lsh_keys 275 return document 276 277 278class InlineDeduplicator(Filter): 279 """ 280 Simple in‑memory deduplicator. 281 282 Stores every LSH key in a local :pyclass:`set`. If any key of the incoming 283 document is already present, the document is marked as duplicate via 284 `document.is_rejected = True`. 285 286 **Limitations** 287 ------------- 288 *State is per‑process only.* Running multiple workers or machines will *not* 289 share the key set – use :class:`RedisDeduplicator` or 290 :class:`RedisBloomDeduplicator` for distributed setups. 291 """ 292 293 def __init__(self, **kwargs: Any): 294 super().__init__(**kwargs) 295 self.hash_pool: set[str] = set() 296 297 def apply(self, document: Document) -> Document: 298 """ 299 Inline deduplication based on the LSH keys in the document. 300 This filter cannot use in the distributed environment because it uses a local hash pool. 301 """ 302 lsh_keys = document.extras.get("dedup_lsh") 303 if lsh_keys is None: 304 raise ValueError( 305 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 306 ) 307 308 for lsh in lsh_keys: 309 if lsh in self.hash_pool: 310 document.is_rejected = True 311 else: 312 self.hash_pool.add(lsh) 313 return document 314 315 316class RedisDeduplicator(Filter): 317 """ 318 Distributed deduplicator using **plain Redis keys**. 319 You have to run a Redis server and pass its connection parameters. 320 """ 321 322 def __init__( 323 self, 324 *, 325 host: str = "localhost", 326 port: int = 6379, 327 db: int = 0, 328 key_prefix: str = "dedup", 329 **kwargs: Any, 330 ) -> None: 331 """ 332 Initialize the Redis deduplicator. 333 Args: 334 host (str): Redis server hostname. 335 port (int): Redis server port. 336 db (int): Redis database number. 337 key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. 338 **kwargs: Additional keyword arguments for parent Filter. 339 """ 340 if not is_loaded_dedup: 341 raise ImportError(IS_LOADED_DEDUP_ERROR_MSG) 342 super().__init__(**kwargs) 343 self.rds = redis.Redis(host=host, port=port, db=db, decode_responses=False) 344 self.key_prefix = key_prefix.encode() 345 346 try: 347 self.rds.ping() 348 except redis.exceptions.RedisError as exc: 349 raise RuntimeError(f"Cannot connect to Redis server {host}:{port}/{db}") from exc 350 351 def apply(self, document: Document) -> Document: 352 lsh_keys = document.extras.get("dedup_lsh") 353 if lsh_keys is None: 354 raise ValueError("Apply GenerateDedupLSH first") 355 356 pipe = self.rds.pipeline(transaction=False) 357 for k in lsh_keys: 358 pipe.set(self.key_prefix + b":" + k.encode(), b"1", nx=True) 359 results: list[bool | None] = pipe.execute() # If instance already exists, it returns None 360 361 if any(r is None for r in results): 362 document.is_rejected = True 363 return document 364 365 366class RedisBloomDeduplicator(Filter): 367 """ 368 Distributed deduplicator backed by **RedisBloom scalable Bloom filters**. 369 You can use this filter to store-LSHs with less memory than Redis keys with the risk of false positives. 370 371 Each *band* gets its own scalable Bloom filter on the Redis side: 372 373 ```text 374 BF.RESERVE <prefix>:<band_idx> <error> <capacity> EXPANSION <n> 375 ``` 376 377 """ 378 379 def __init__( 380 self, 381 *, 382 expected_docs: int, 383 host: str = "localhost", 384 port: int = 6379, 385 db: int = 0, 386 key_prefix: str = "bloomdedup", 387 error_rate: float = 1e-7, 388 expansion: int = 2, 389 num_bands: int | None = None, 390 **kwargs: Any, 391 ): 392 """ 393 Initialize the RedisBloom deduplicator. 394 Args: 395 expected_docs (int): Expected number of documents to deduplicate. This is used to set the initial capacity of the Bloom filter. 396 host (str): Redis server hostname. 397 port (int): Redis server port. 398 db (int): Redis database number. 399 key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. 400 error_rate (float): Desired error rate for the Bloom filter. 401 expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically. 402 num_bands (int | None): Number of bands to use for LSH to calculate the capacity of BloomFilter. If None, it will be set to 32. 403 **kwargs: Additional keyword arguments for parent Filter. 404 """ 405 if not is_loaded_dedup: 406 raise ImportError(IS_LOADED_DEDUP_ERROR_MSG) 407 super().__init__(**kwargs) 408 self.rds = redis.Redis(host=host, port=port, db=db) 409 self.key_prefix = key_prefix.encode() 410 411 _num_bands = num_bands if num_bands is not None else 32 412 413 try: 414 self.rds.execute_command( 415 "BF.RESERVE", 416 self.key_prefix, 417 error_rate, 418 expected_docs * _num_bands, 419 "EXPANSION", 420 expansion, 421 ) 422 except redis.ResponseError as e: 423 if "exists" not in str(e): 424 raise 425 426 def apply(self, document: Document) -> Document: 427 lsh_keys: list[str] | None = document.extras.get("dedup_lsh") 428 if lsh_keys is None: 429 raise ValueError( 430 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 431 ) 432 433 key_bytes = [k.encode() for k in lsh_keys] 434 435 # Return value of BF.MADD is [1,0,1,...] (0 = already exists, 1 = insertion successful) 436 flags: Iterable[int] = self.rds.execute_command("BF.MADD", self.key_prefix, *key_bytes) 437 if 0 in flags: 438 document.is_rejected = True 439 return document 440 441 442class InlineDuplicateAnalyzer(Filter): 443 def __init__(self, **kwargs: Any): 444 super().__init__(**kwargs) 445 self.docs: dict[int, Document] = dict() # doc_id -> Document mapping 446 self.hash_pool: dict[str, int] = dict() # LSH key -> doc_id mapping 447 448 self._current_doc_id = 0 449 450 def apply(self, document: Document) -> Document: 451 """ 452 Analyze duplicates inline based on the LSH keys in the document. 453 This filter cannot use in the distributed environment because it uses a local hash pool. 454 """ 455 lsh_keys = document.extras.get("dedup_lsh") 456 if lsh_keys is None: 457 raise ValueError( 458 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 459 ) 460 461 for lsh in lsh_keys: 462 if lsh in self.hash_pool: 463 document.is_rejected = True 464 document.extras["similar_doc"] = self.docs[self.hash_pool[lsh]].text 465 else: 466 self.hash_pool[lsh] = self._current_doc_id 467 self.docs[self._current_doc_id] = document 468 469 self._current_doc_id += 1 470 return document
81def char_level_splitter(text: str) -> list[str]: 82 """ 83 Split the text into characters. 84 This is a simple implementation that splits the text into individual characters. 85 """ 86 return list(text)
Split the text into characters. This is a simple implementation that splits the text into individual characters.
89def non_alpha_num_splitter(text: str) -> list[str]: 90 """ 91 Split the text into alphanumeric tokens. 92 This is a simple implementation that splits on non-alphanumeric characters. 93 """ 94 return [token for token in NON_ALPHA.split(text) if token]
Split the text into alphanumeric tokens. This is a simple implementation that splits on non-alphanumeric characters.
97def japanese_word_splitter(text: str) -> list[str]: 98 """ 99 Split the text into Japanese words using fugashi. 100 This will import fugashi and instantiate Tagger on first use. 101 """ 102 global _japanese_tagger 103 if _japanese_tagger is None: 104 fugashi = importlib.import_module("fugashi") 105 _japanese_tagger = fugashi.Tagger() 106 return [token.surface for token in _japanese_tagger(text)]
Split the text into Japanese words using fugashi. This will import fugashi and instantiate Tagger on first use.
109class GenerateDedupLSH(Filter): 110 """ 111 Filter that uses MinHash + Locality-Sensitive Hashing (LSH) to assign 112 deduplication keys to documents, allowing fast near-duplicate detection. 113 114 Attributes: 115 num_perm (int): Number of permutations (hash functions) for MinHash. 116 threshold (float): Similarity threshold for tuning LSH parameters. 117 tokenizer (Callable[[str], Iterable[str]]): Function to tokenize text. 118 n_grams (int): n-gram size for token grouping. 119 seed (int): Random seed for MinHash. 120 num_bands (int): Number of LSH bands computed from threshold and num_perm. 121 band_size (int): Number of hashes per band. 122 123 Notes 124 ----- 125 `_optimal_param` searches for the optimal number of **bands** (`b`) and 126 **rows per band** (`r`) that minimise a weighted sum of false positives / 127 false negatives at the specified *threshold*. 128 """ 129 130 _BYTES_PER_U32: Final[int] = 4 131 132 def __init__( 133 self, 134 num_perm: int = 500, 135 threshold: float = 0.8, 136 tokenizer: Callable[[str], Iterable[str]] = char_level_splitter, 137 n_grams: int = 5, 138 seed: int = 42, 139 **kwargs: Any, 140 ) -> None: 141 """ 142 Initialize the deduplication filter with MinHash and LSH settings. 143 144 Args: 145 num_perm: Number of hash permutations for MinHash signature length. 146 threshold: Similarity threshold to decide optimal LSH parameters. 147 tokenizer: Function to split text into tokens. 148 n_grams: Number of tokens per n-gram for MinHash update. 149 seed: Seed for hash permutation consistency. 150 **kwargs: Additional keyword arguments for parent Filter. 151 """ 152 super().__init__(**kwargs) 153 if not is_loaded_dedup: 154 raise ImportError(IS_LOADED_DEDUP_ERROR_MSG) 155 self.num_perm = num_perm 156 self.threshold = threshold 157 self.tokenizer = tokenizer 158 self.n_grams = n_grams 159 self.seed = seed 160 161 # Compute optimal number of bands and band size based on threshold 162 self.num_bands, self.band_size = _optimal_param( 163 threshold=self.threshold, 164 num_perm=self.num_perm, 165 false_negative_weight=0.5, 166 false_positive_weight=0.5, 167 ) 168 169 def calculate_minhash_signature(self, text: str) -> NDArray[np.uint32]: 170 """ 171 Compute MinHash signature of input text as an array of uint32. 172 173 Steps: 174 1. Tokenize text using the provided tokenizer. 175 2. Generate n-gram tokens. 176 3. Update MinHash with n-gram tokens. 177 178 Args: 179 text: Input document text to be hashed. 180 181 Returns: 182 A 1D numpy array of shape (num_perm,) with dtype uint32. 183 """ 184 tokens = self.tokenizer(text) 185 n_gram_tokens = ngrams(tokens, self.n_grams) 186 # Join tokens into string n-grams for hashing 187 tokens = [" ".join(grams) for grams in n_gram_tokens] 188 # Initialize and update RMinHash 189 minhash = RMinHash(num_perm=self.num_perm, seed=self.seed) 190 minhash.update(tokens) 191 # Convert digest (list of ints) to numpy uint32 array 192 return np.asarray(minhash.digest(), dtype=np.uint32) 193 194 def _sig_bytes_le(self, sig: NDArray[np.uint32]) -> memoryview: 195 """ 196 Return the signature as *little-endian* byte view. 197 Platform‑independent way to get bytes from a numpy array. 198 """ 199 if sys.byteorder == "little": 200 # amd64 / arm64 (little) 201 return memoryview(sig).cast("B") # type: ignore[arg-type] # zero-copy 202 else: 203 # big-endian CPU 204 return memoryview(sig.byteswap()).cast("B") # type: ignore[arg-type] # 1 copy 205 206 def signature_to_lsh_digest( 207 self, signature: NDArray[np.uint32], band_size: int, band_idx: int 208 ) -> int: 209 """ 210 Convert a slice of the MinHash signature into an LSH digest with less memory overhead. 211 212 This method is optimized for speed by avoiding copies: 213 - We view the uint32 array as raw bytes (uint8 view). 214 - We create a memoryview of the byte slice for the specified band. 215 - We compute a 128-bit hash directly on the slice. 216 217 Args: 218 signature: 1D numpy array of uint32 representing MinHash signature. 219 band_size: Number of hashes per LSH band. 220 band_idx: Index of the band to hash (0-based). 221 222 Returns: 223 An integer representing the 128-bit hash digest of the band. 224 225 Raises: 226 AssertionError: If signature shape/dtype or band index is invalid. 227 """ 228 assert signature.dtype == np.uint32 and signature.ndim == 1, ( 229 "signature must be a 1D numpy array of uint32" 230 ) 231 assert 0 <= band_idx < self.num_bands, ( 232 f"band_idx {band_idx} out of range [0, {self.num_bands})" 233 ) 234 assert len(signature) >= band_size * self.num_bands, ( 235 "signature length is too short for given band_size and num_bands" 236 ) 237 238 # Compute byte offsets for the selected band 239 start = band_idx * band_size * self._BYTES_PER_U32 240 stop = start + band_size * self._BYTES_PER_U32 241 242 # View signature as raw bytes without copy. memoryview avoids creating new bytes. 243 mv = self._sig_bytes_le(signature)[start:stop] # slice view 244 245 return xxhash.xxh128_intdigest(mv) 246 247 def _format_lsh_key(self, band_idx: int, digest: int) -> str: 248 """ 249 Format the LSH key for a given band index and digest. 250 """ 251 return f"{band_idx}+{digest:032x}" 252 253 def apply(self, document: Document) -> Document: 254 """ 255 Decorate the document with LSH deduplication keys. 256 257 For each band, compute the digest and format as a hex string: 258 '<band_idx>+<128-bit-digest-hex>'. 259 Keys are stored in document.extras['dedup_lsh']. 260 261 Args: 262 document: Document object with 'text' attribute. 263 264 Returns: 265 The same Document object with 'dedup_lsh' added in extras. 266 """ 267 signature = self.calculate_minhash_signature(document.text) 268 lsh_keys = [ 269 self._format_lsh_key( 270 band_idx, self.signature_to_lsh_digest(signature, self.band_size, band_idx) 271 ) 272 for band_idx in range(self.num_bands) 273 ] 274 275 document.extras["dedup_lsh"] = lsh_keys 276 return document
Filter that uses MinHash + Locality-Sensitive Hashing (LSH) to assign deduplication keys to documents, allowing fast near-duplicate detection.
Attributes: num_perm (int): Number of permutations (hash functions) for MinHash. threshold (float): Similarity threshold for tuning LSH parameters. tokenizer (Callable[[str], Iterable[str]]): Function to tokenize text. n_grams (int): n-gram size for token grouping. seed (int): Random seed for MinHash. num_bands (int): Number of LSH bands computed from threshold and num_perm. band_size (int): Number of hashes per band.
Notes
_optimal_param searches for the optimal number of bands (b) and
rows per band (r) that minimise a weighted sum of false positives /
false negatives at the specified threshold.
132 def __init__( 133 self, 134 num_perm: int = 500, 135 threshold: float = 0.8, 136 tokenizer: Callable[[str], Iterable[str]] = char_level_splitter, 137 n_grams: int = 5, 138 seed: int = 42, 139 **kwargs: Any, 140 ) -> None: 141 """ 142 Initialize the deduplication filter with MinHash and LSH settings. 143 144 Args: 145 num_perm: Number of hash permutations for MinHash signature length. 146 threshold: Similarity threshold to decide optimal LSH parameters. 147 tokenizer: Function to split text into tokens. 148 n_grams: Number of tokens per n-gram for MinHash update. 149 seed: Seed for hash permutation consistency. 150 **kwargs: Additional keyword arguments for parent Filter. 151 """ 152 super().__init__(**kwargs) 153 if not is_loaded_dedup: 154 raise ImportError(IS_LOADED_DEDUP_ERROR_MSG) 155 self.num_perm = num_perm 156 self.threshold = threshold 157 self.tokenizer = tokenizer 158 self.n_grams = n_grams 159 self.seed = seed 160 161 # Compute optimal number of bands and band size based on threshold 162 self.num_bands, self.band_size = _optimal_param( 163 threshold=self.threshold, 164 num_perm=self.num_perm, 165 false_negative_weight=0.5, 166 false_positive_weight=0.5, 167 )
Initialize the deduplication filter with MinHash and LSH settings.
Args: num_perm: Number of hash permutations for MinHash signature length. threshold: Similarity threshold to decide optimal LSH parameters. tokenizer: Function to split text into tokens. n_grams: Number of tokens per n-gram for MinHash update. seed: Seed for hash permutation consistency. **kwargs: Additional keyword arguments for parent Filter.
169 def calculate_minhash_signature(self, text: str) -> NDArray[np.uint32]: 170 """ 171 Compute MinHash signature of input text as an array of uint32. 172 173 Steps: 174 1. Tokenize text using the provided tokenizer. 175 2. Generate n-gram tokens. 176 3. Update MinHash with n-gram tokens. 177 178 Args: 179 text: Input document text to be hashed. 180 181 Returns: 182 A 1D numpy array of shape (num_perm,) with dtype uint32. 183 """ 184 tokens = self.tokenizer(text) 185 n_gram_tokens = ngrams(tokens, self.n_grams) 186 # Join tokens into string n-grams for hashing 187 tokens = [" ".join(grams) for grams in n_gram_tokens] 188 # Initialize and update RMinHash 189 minhash = RMinHash(num_perm=self.num_perm, seed=self.seed) 190 minhash.update(tokens) 191 # Convert digest (list of ints) to numpy uint32 array 192 return np.asarray(minhash.digest(), dtype=np.uint32)
Compute MinHash signature of input text as an array of uint32.
Steps: 1. Tokenize text using the provided tokenizer. 2. Generate n-gram tokens. 3. Update MinHash with n-gram tokens.
Args: text: Input document text to be hashed.
Returns: A 1D numpy array of shape (num_perm,) with dtype uint32.
206 def signature_to_lsh_digest( 207 self, signature: NDArray[np.uint32], band_size: int, band_idx: int 208 ) -> int: 209 """ 210 Convert a slice of the MinHash signature into an LSH digest with less memory overhead. 211 212 This method is optimized for speed by avoiding copies: 213 - We view the uint32 array as raw bytes (uint8 view). 214 - We create a memoryview of the byte slice for the specified band. 215 - We compute a 128-bit hash directly on the slice. 216 217 Args: 218 signature: 1D numpy array of uint32 representing MinHash signature. 219 band_size: Number of hashes per LSH band. 220 band_idx: Index of the band to hash (0-based). 221 222 Returns: 223 An integer representing the 128-bit hash digest of the band. 224 225 Raises: 226 AssertionError: If signature shape/dtype or band index is invalid. 227 """ 228 assert signature.dtype == np.uint32 and signature.ndim == 1, ( 229 "signature must be a 1D numpy array of uint32" 230 ) 231 assert 0 <= band_idx < self.num_bands, ( 232 f"band_idx {band_idx} out of range [0, {self.num_bands})" 233 ) 234 assert len(signature) >= band_size * self.num_bands, ( 235 "signature length is too short for given band_size and num_bands" 236 ) 237 238 # Compute byte offsets for the selected band 239 start = band_idx * band_size * self._BYTES_PER_U32 240 stop = start + band_size * self._BYTES_PER_U32 241 242 # View signature as raw bytes without copy. memoryview avoids creating new bytes. 243 mv = self._sig_bytes_le(signature)[start:stop] # slice view 244 245 return xxhash.xxh128_intdigest(mv)
Convert a slice of the MinHash signature into an LSH digest with less memory overhead.
This method is optimized for speed by avoiding copies:
- We view the uint32 array as raw bytes (uint8 view).
- We create a memoryview of the byte slice for the specified band.
- We compute a 128-bit hash directly on the slice.
Args: signature: 1D numpy array of uint32 representing MinHash signature. band_size: Number of hashes per LSH band. band_idx: Index of the band to hash (0-based).
Returns: An integer representing the 128-bit hash digest of the band.
Raises: AssertionError: If signature shape/dtype or band index is invalid.
253 def apply(self, document: Document) -> Document: 254 """ 255 Decorate the document with LSH deduplication keys. 256 257 For each band, compute the digest and format as a hex string: 258 '<band_idx>+<128-bit-digest-hex>'. 259 Keys are stored in document.extras['dedup_lsh']. 260 261 Args: 262 document: Document object with 'text' attribute. 263 264 Returns: 265 The same Document object with 'dedup_lsh' added in extras. 266 """ 267 signature = self.calculate_minhash_signature(document.text) 268 lsh_keys = [ 269 self._format_lsh_key( 270 band_idx, self.signature_to_lsh_digest(signature, self.band_size, band_idx) 271 ) 272 for band_idx in range(self.num_bands) 273 ] 274 275 document.extras["dedup_lsh"] = lsh_keys 276 return document
Decorate the document with LSH deduplication keys.
For each band, compute the digest and format as a hex string:
'
Args: document: Document object with 'text' attribute.
Returns: The same Document object with 'dedup_lsh' added in extras.
279class InlineDeduplicator(Filter): 280 """ 281 Simple in‑memory deduplicator. 282 283 Stores every LSH key in a local :pyclass:`set`. If any key of the incoming 284 document is already present, the document is marked as duplicate via 285 `document.is_rejected = True`. 286 287 **Limitations** 288 ------------- 289 *State is per‑process only.* Running multiple workers or machines will *not* 290 share the key set – use :class:`RedisDeduplicator` or 291 :class:`RedisBloomDeduplicator` for distributed setups. 292 """ 293 294 def __init__(self, **kwargs: Any): 295 super().__init__(**kwargs) 296 self.hash_pool: set[str] = set() 297 298 def apply(self, document: Document) -> Document: 299 """ 300 Inline deduplication based on the LSH keys in the document. 301 This filter cannot use in the distributed environment because it uses a local hash pool. 302 """ 303 lsh_keys = document.extras.get("dedup_lsh") 304 if lsh_keys is None: 305 raise ValueError( 306 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 307 ) 308 309 for lsh in lsh_keys: 310 if lsh in self.hash_pool: 311 document.is_rejected = True 312 else: 313 self.hash_pool.add(lsh) 314 return document
Simple in‑memory deduplicator.
Stores every LSH key in a local :pyclass:set. If any key of the incoming
document is already present, the document is marked as duplicate via
document.is_rejected = True.
Limitations
State is per‑process only. Running multiple workers or machines will not
share the key set – use RedisDeduplicator or
RedisBloomDeduplicator for distributed setups.
294 def __init__(self, **kwargs: Any): 295 super().__init__(**kwargs) 296 self.hash_pool: set[str] = set()
Initialize the filter.
Parameters
p : float
The probability of applying the filter.
If p is 1, the filter will always be applied.
skip_rejected : bool
If True, the filter will skip documents that are already rejected.
If you want to apply the filter to all documents (e.g., postprocess), set this to False.
random_state : Optional[Union[int, np.random.Generator]]
Seed for the random number generator.
If None, a new random number generator will be created.
If None, and use in the Compose class, the random state is shared with the Compose object.
use_batch : bool
If True, the filter will process documents in batches in the apply_stream method.
batch_size : int
The size of the batch to process documents in the apply_stream method.
kwargs : Any
Additional keyword arguments to pass to the filter.
298 def apply(self, document: Document) -> Document: 299 """ 300 Inline deduplication based on the LSH keys in the document. 301 This filter cannot use in the distributed environment because it uses a local hash pool. 302 """ 303 lsh_keys = document.extras.get("dedup_lsh") 304 if lsh_keys is None: 305 raise ValueError( 306 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 307 ) 308 309 for lsh in lsh_keys: 310 if lsh in self.hash_pool: 311 document.is_rejected = True 312 else: 313 self.hash_pool.add(lsh) 314 return document
Inline deduplication based on the LSH keys in the document. This filter cannot use in the distributed environment because it uses a local hash pool.
317class RedisDeduplicator(Filter): 318 """ 319 Distributed deduplicator using **plain Redis keys**. 320 You have to run a Redis server and pass its connection parameters. 321 """ 322 323 def __init__( 324 self, 325 *, 326 host: str = "localhost", 327 port: int = 6379, 328 db: int = 0, 329 key_prefix: str = "dedup", 330 **kwargs: Any, 331 ) -> None: 332 """ 333 Initialize the Redis deduplicator. 334 Args: 335 host (str): Redis server hostname. 336 port (int): Redis server port. 337 db (int): Redis database number. 338 key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. 339 **kwargs: Additional keyword arguments for parent Filter. 340 """ 341 if not is_loaded_dedup: 342 raise ImportError(IS_LOADED_DEDUP_ERROR_MSG) 343 super().__init__(**kwargs) 344 self.rds = redis.Redis(host=host, port=port, db=db, decode_responses=False) 345 self.key_prefix = key_prefix.encode() 346 347 try: 348 self.rds.ping() 349 except redis.exceptions.RedisError as exc: 350 raise RuntimeError(f"Cannot connect to Redis server {host}:{port}/{db}") from exc 351 352 def apply(self, document: Document) -> Document: 353 lsh_keys = document.extras.get("dedup_lsh") 354 if lsh_keys is None: 355 raise ValueError("Apply GenerateDedupLSH first") 356 357 pipe = self.rds.pipeline(transaction=False) 358 for k in lsh_keys: 359 pipe.set(self.key_prefix + b":" + k.encode(), b"1", nx=True) 360 results: list[bool | None] = pipe.execute() # If instance already exists, it returns None 361 362 if any(r is None for r in results): 363 document.is_rejected = True 364 return document
Distributed deduplicator using plain Redis keys. You have to run a Redis server and pass its connection parameters.
323 def __init__( 324 self, 325 *, 326 host: str = "localhost", 327 port: int = 6379, 328 db: int = 0, 329 key_prefix: str = "dedup", 330 **kwargs: Any, 331 ) -> None: 332 """ 333 Initialize the Redis deduplicator. 334 Args: 335 host (str): Redis server hostname. 336 port (int): Redis server port. 337 db (int): Redis database number. 338 key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. 339 **kwargs: Additional keyword arguments for parent Filter. 340 """ 341 if not is_loaded_dedup: 342 raise ImportError(IS_LOADED_DEDUP_ERROR_MSG) 343 super().__init__(**kwargs) 344 self.rds = redis.Redis(host=host, port=port, db=db, decode_responses=False) 345 self.key_prefix = key_prefix.encode() 346 347 try: 348 self.rds.ping() 349 except redis.exceptions.RedisError as exc: 350 raise RuntimeError(f"Cannot connect to Redis server {host}:{port}/{db}") from exc
Initialize the Redis deduplicator. Args: host (str): Redis server hostname. port (int): Redis server port. db (int): Redis database number. key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. **kwargs: Additional keyword arguments for parent Filter.
352 def apply(self, document: Document) -> Document: 353 lsh_keys = document.extras.get("dedup_lsh") 354 if lsh_keys is None: 355 raise ValueError("Apply GenerateDedupLSH first") 356 357 pipe = self.rds.pipeline(transaction=False) 358 for k in lsh_keys: 359 pipe.set(self.key_prefix + b":" + k.encode(), b"1", nx=True) 360 results: list[bool | None] = pipe.execute() # If instance already exists, it returns None 361 362 if any(r is None for r in results): 363 document.is_rejected = True 364 return document
Definition of filter behavior.
The document must have a protocol TextContent,
and mostly used hojichar.Document class.
In this method, the filter will modify document.text or
document.extras and set document.is_rejected = True to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
367class RedisBloomDeduplicator(Filter): 368 """ 369 Distributed deduplicator backed by **RedisBloom scalable Bloom filters**. 370 You can use this filter to store-LSHs with less memory than Redis keys with the risk of false positives. 371 372 Each *band* gets its own scalable Bloom filter on the Redis side: 373 374 ```text 375 BF.RESERVE <prefix>:<band_idx> <error> <capacity> EXPANSION <n> 376 ``` 377 378 """ 379 380 def __init__( 381 self, 382 *, 383 expected_docs: int, 384 host: str = "localhost", 385 port: int = 6379, 386 db: int = 0, 387 key_prefix: str = "bloomdedup", 388 error_rate: float = 1e-7, 389 expansion: int = 2, 390 num_bands: int | None = None, 391 **kwargs: Any, 392 ): 393 """ 394 Initialize the RedisBloom deduplicator. 395 Args: 396 expected_docs (int): Expected number of documents to deduplicate. This is used to set the initial capacity of the Bloom filter. 397 host (str): Redis server hostname. 398 port (int): Redis server port. 399 db (int): Redis database number. 400 key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. 401 error_rate (float): Desired error rate for the Bloom filter. 402 expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically. 403 num_bands (int | None): Number of bands to use for LSH to calculate the capacity of BloomFilter. If None, it will be set to 32. 404 **kwargs: Additional keyword arguments for parent Filter. 405 """ 406 if not is_loaded_dedup: 407 raise ImportError(IS_LOADED_DEDUP_ERROR_MSG) 408 super().__init__(**kwargs) 409 self.rds = redis.Redis(host=host, port=port, db=db) 410 self.key_prefix = key_prefix.encode() 411 412 _num_bands = num_bands if num_bands is not None else 32 413 414 try: 415 self.rds.execute_command( 416 "BF.RESERVE", 417 self.key_prefix, 418 error_rate, 419 expected_docs * _num_bands, 420 "EXPANSION", 421 expansion, 422 ) 423 except redis.ResponseError as e: 424 if "exists" not in str(e): 425 raise 426 427 def apply(self, document: Document) -> Document: 428 lsh_keys: list[str] | None = document.extras.get("dedup_lsh") 429 if lsh_keys is None: 430 raise ValueError( 431 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 432 ) 433 434 key_bytes = [k.encode() for k in lsh_keys] 435 436 # Return value of BF.MADD is [1,0,1,...] (0 = already exists, 1 = insertion successful) 437 flags: Iterable[int] = self.rds.execute_command("BF.MADD", self.key_prefix, *key_bytes) 438 if 0 in flags: 439 document.is_rejected = True 440 return document
Distributed deduplicator backed by RedisBloom scalable Bloom filters. You can use this filter to store-LSHs with less memory than Redis keys with the risk of false positives.
Each band gets its own scalable Bloom filter on the Redis side:
BF.RESERVE <prefix>:<band_idx> <error> <capacity> EXPANSION <n>
380 def __init__( 381 self, 382 *, 383 expected_docs: int, 384 host: str = "localhost", 385 port: int = 6379, 386 db: int = 0, 387 key_prefix: str = "bloomdedup", 388 error_rate: float = 1e-7, 389 expansion: int = 2, 390 num_bands: int | None = None, 391 **kwargs: Any, 392 ): 393 """ 394 Initialize the RedisBloom deduplicator. 395 Args: 396 expected_docs (int): Expected number of documents to deduplicate. This is used to set the initial capacity of the Bloom filter. 397 host (str): Redis server hostname. 398 port (int): Redis server port. 399 db (int): Redis database number. 400 key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. 401 error_rate (float): Desired error rate for the Bloom filter. 402 expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically. 403 num_bands (int | None): Number of bands to use for LSH to calculate the capacity of BloomFilter. If None, it will be set to 32. 404 **kwargs: Additional keyword arguments for parent Filter. 405 """ 406 if not is_loaded_dedup: 407 raise ImportError(IS_LOADED_DEDUP_ERROR_MSG) 408 super().__init__(**kwargs) 409 self.rds = redis.Redis(host=host, port=port, db=db) 410 self.key_prefix = key_prefix.encode() 411 412 _num_bands = num_bands if num_bands is not None else 32 413 414 try: 415 self.rds.execute_command( 416 "BF.RESERVE", 417 self.key_prefix, 418 error_rate, 419 expected_docs * _num_bands, 420 "EXPANSION", 421 expansion, 422 ) 423 except redis.ResponseError as e: 424 if "exists" not in str(e): 425 raise
Initialize the RedisBloom deduplicator. Args: expected_docs (int): Expected number of documents to deduplicate. This is used to set the initial capacity of the Bloom filter. host (str): Redis server hostname. port (int): Redis server port. db (int): Redis database number. key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. error_rate (float): Desired error rate for the Bloom filter. expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically. num_bands (int | None): Number of bands to use for LSH to calculate the capacity of BloomFilter. If None, it will be set to 32. **kwargs: Additional keyword arguments for parent Filter.
427 def apply(self, document: Document) -> Document: 428 lsh_keys: list[str] | None = document.extras.get("dedup_lsh") 429 if lsh_keys is None: 430 raise ValueError( 431 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 432 ) 433 434 key_bytes = [k.encode() for k in lsh_keys] 435 436 # Return value of BF.MADD is [1,0,1,...] (0 = already exists, 1 = insertion successful) 437 flags: Iterable[int] = self.rds.execute_command("BF.MADD", self.key_prefix, *key_bytes) 438 if 0 in flags: 439 document.is_rejected = True 440 return document
Definition of filter behavior.
The document must have a protocol TextContent,
and mostly used hojichar.Document class.
In this method, the filter will modify document.text or
document.extras and set document.is_rejected = True to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
443class InlineDuplicateAnalyzer(Filter): 444 def __init__(self, **kwargs: Any): 445 super().__init__(**kwargs) 446 self.docs: dict[int, Document] = dict() # doc_id -> Document mapping 447 self.hash_pool: dict[str, int] = dict() # LSH key -> doc_id mapping 448 449 self._current_doc_id = 0 450 451 def apply(self, document: Document) -> Document: 452 """ 453 Analyze duplicates inline based on the LSH keys in the document. 454 This filter cannot use in the distributed environment because it uses a local hash pool. 455 """ 456 lsh_keys = document.extras.get("dedup_lsh") 457 if lsh_keys is None: 458 raise ValueError( 459 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 460 ) 461 462 for lsh in lsh_keys: 463 if lsh in self.hash_pool: 464 document.is_rejected = True 465 document.extras["similar_doc"] = self.docs[self.hash_pool[lsh]].text 466 else: 467 self.hash_pool[lsh] = self._current_doc_id 468 self.docs[self._current_doc_id] = document 469 470 self._current_doc_id += 1 471 return document
Base class for all filters. Document-level filters must inherit from this class.
The definition of text processing is in apply method.
If you define a new filter, override the method.
When this class is called, apply the filter from string to string.
With context manager, you can use the filter as follows:
with YourFilter(p=0.5) as filt:
text = filt("This is a sample text.")
444 def __init__(self, **kwargs: Any): 445 super().__init__(**kwargs) 446 self.docs: dict[int, Document] = dict() # doc_id -> Document mapping 447 self.hash_pool: dict[str, int] = dict() # LSH key -> doc_id mapping 448 449 self._current_doc_id = 0
Initialize the filter.
Parameters
p : float
The probability of applying the filter.
If p is 1, the filter will always be applied.
skip_rejected : bool
If True, the filter will skip documents that are already rejected.
If you want to apply the filter to all documents (e.g., postprocess), set this to False.
random_state : Optional[Union[int, np.random.Generator]]
Seed for the random number generator.
If None, a new random number generator will be created.
If None, and use in the Compose class, the random state is shared with the Compose object.
use_batch : bool
If True, the filter will process documents in batches in the apply_stream method.
batch_size : int
The size of the batch to process documents in the apply_stream method.
kwargs : Any
Additional keyword arguments to pass to the filter.
451 def apply(self, document: Document) -> Document: 452 """ 453 Analyze duplicates inline based on the LSH keys in the document. 454 This filter cannot use in the distributed environment because it uses a local hash pool. 455 """ 456 lsh_keys = document.extras.get("dedup_lsh") 457 if lsh_keys is None: 458 raise ValueError( 459 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 460 ) 461 462 for lsh in lsh_keys: 463 if lsh in self.hash_pool: 464 document.is_rejected = True 465 document.extras["similar_doc"] = self.docs[self.hash_pool[lsh]].text 466 else: 467 self.hash_pool[lsh] = self._current_doc_id 468 self.docs[self._current_doc_id] = document 469 470 self._current_doc_id += 1 471 return document
Analyze duplicates inline based on the LSH keys in the document. This filter cannot use in the distributed environment because it uses a local hash pool.