hojichar.filters.deduplication
This module provides filters for deduplication using MinHash and Locality-Sensitive Hashing (LSH).
If you want to use this module, install hojichar via pip install 'hojichar[dedup]'
What is MinHash LSH?
A gentle introduction for first‑time leaner for MinHash LSH
1 What problem does this solve?
When you have millions of documents it is too expensive to compare every pair directly. MinHash + Locality‑Sensitive Hashing (LSH) lets you
- estimate Jaccard similarity extremely fast, and memory‑efficiently
- retrieve near‑duplicates in sub‑linear time.
In practice you can keep a single set
or Redis index of the generated LSH keys and ask:
"Does any existing document share at least one LSH key with mine?"
If the answer is yes the two documents are almost certainly similar; if no they are very likely different.
How the pipeline works
GenerateDedupLSH
filter generates LSH keys for each document.
- Tokenize
- Split text into tokens. (Default: character‑level, but you can plug in any callable.)
- n-grams
- Group tokens into n‑grams (n_grams=5) to capture context.
- MinHash
- Hash each n‑gram with
num_perm
independent permutations and keep only the minimum value. The resulting signature is a vector of num_perm 32‑bit integers. - This module uses
rensa.RMinHash
which is a fast MinHash implementation by Rust language.
- Hash each n‑gram with
- Banding and compression
- Split the signature into b bands, each containing
r
integers (num_perm ≈ b×r). - Treat the
r
integers as raw bytes, hash them with xxhash‑128, and format as+ .
- Split the signature into b bands, each containing
- Output
- Store all band keys in
document.extras['dedup_lsh']
as a list of strings.
- Store all band keys in
InlineDeduplicator
, RedisDeduplicator
, and RedisBloomDeduplicator
filters use the generated LSH keys to mark documents as duplicates.
InlineDeduplicator
stores LSH keys in a local set, so it works only in a single process.RedisDeduplicator
stores LSH keys in Redis, so it works in a distributed environment.RedisBloomDeduplicator
stores LSH keys in RedisBloom, which is a scalable Bloom filter. It uses less memory than Redis keys but may return false positives.
1""" 2This module provides filters for deduplication using MinHash and Locality-Sensitive Hashing (LSH). 3If you want to use this module, install hojichar via `pip install 'hojichar[dedup]'` 4 5## What is MinHash LSH? 6*A gentle introduction for first‑time leaner for MinHash LSH* 7 8--- 9 10### 1 What problem does this solve? 11 12When you have **millions of documents** it is too expensive to compare every pair directly. 13**MinHash + Locality‑Sensitive Hashing (LSH)** lets you 14 15- estimate Jaccard similarity extremely fast, and memory‑efficiently 16- retrieve **near‑duplicates** in sub‑linear time. 17 18In practice you can keep a single `set` or Redis index of the generated **LSH keys** and ask: 19"Does any existing document share *at least one* LSH key with mine?" 20 21If the answer is yes the two documents are almost certainly similar; if no they are very likely different. 22 23### How the pipeline works 24 25`GenerateDedupLSH` filter generates LSH keys for each document. 26 271. Tokenize 28 - Split text into tokens. (Default: character‑level, but you can plug in any callable.) 292. n-grams 30 - Group tokens into n‑grams (n_grams=5) to capture context. 313. MinHash 32 - Hash each n‑gram with `num_perm` independent permutations and keep only the minimum value. The resulting **signature** is a vector of num_perm 32‑bit integers. 33 - This module uses `rensa.RMinHash` which is a fast MinHash implementation by Rust language. 344. Banding and compression 35 - Split the signature into b bands, each containing `r` integers (num_perm ≈ b×r). 36 - Treat the `r` integers as raw bytes, hash them with xxhash‑128, and format as <band_idx>+<digest32hex>. 375. Output 38 - Store all band keys in `document.extras['dedup_lsh']` as a list of strings. 39 40`InlineDeduplicator`, `RedisDeduplicator`, and `RedisBloomDeduplicator` filters use the generated LSH keys to mark documents as duplicates. 41 42- `InlineDeduplicator` stores LSH keys in a local set, so it works only in a single process. 43- `RedisDeduplicator` stores LSH keys in Redis, so it works in a distributed environment. 44- `RedisBloomDeduplicator` stores LSH keys in RedisBloom, which is a scalable Bloom filter. It uses less memory than Redis keys but may return false positives. 45 46""" 47 48from __future__ import annotations 49 50import importlib 51import re 52import sys 53from typing import Any, Callable, Final, Iterable, Optional 54 55import numpy as np 56import redis 57import xxhash 58from datasketch.lsh import _optimal_param # type: ignore 59from nltk.util import ngrams # type: ignore 60from numpy.typing import NDArray 61from rensa import RMinHash # type: ignore 62 63from hojichar import Document, Filter 64 65_japanese_tagger: Optional["fugashi.Tagger"] = None # type: ignore[name-defined] # noqa: F821 66NON_ALPHA = re.compile("[^A-Za-z_0-9]") 67 68 69def char_level_splitter(text: str) -> list[str]: 70 """ 71 Split the text into characters. 72 This is a simple implementation that splits the text into individual characters. 73 """ 74 return list(text) 75 76 77def non_alpha_num_splitter(text: str) -> list[str]: 78 """ 79 Split the text into alphanumeric tokens. 80 This is a simple implementation that splits on non-alphanumeric characters. 81 """ 82 return [token for token in NON_ALPHA.split(text) if token] 83 84 85def japanese_word_splitter(text: str) -> list[str]: 86 """ 87 Split the text into Japanese words using fugashi. 88 This will import fugashi and instantiate Tagger on first use. 89 """ 90 global _japanese_tagger 91 if _japanese_tagger is None: 92 fugashi = importlib.import_module("fugashi") 93 _japanese_tagger = fugashi.Tagger() 94 return [token.surface for token in _japanese_tagger(text)] 95 96 97class GenerateDedupLSH(Filter): 98 """ 99 Filter that uses MinHash + Locality-Sensitive Hashing (LSH) to assign 100 deduplication keys to documents, allowing fast near-duplicate detection. 101 102 Attributes: 103 num_perm (int): Number of permutations (hash functions) for MinHash. 104 threshold (float): Similarity threshold for tuning LSH parameters. 105 tokenizer (Callable[[str], Iterable[str]]): Function to tokenize text. 106 n_grams (int): n-gram size for token grouping. 107 seed (int): Random seed for MinHash. 108 num_bands (int): Number of LSH bands computed from threshold and num_perm. 109 band_size (int): Number of hashes per band. 110 111 Notes 112 ----- 113 `_optimal_param` searches for the optimal number of **bands** (`b`) and 114 **rows per band** (`r`) that minimise a weighted sum of false positives / 115 false negatives at the specified *threshold*. 116 """ 117 118 _BYTES_PER_U32: Final[int] = 4 119 120 def __init__( 121 self, 122 num_perm: int = 500, 123 threshold: float = 0.8, 124 tokenizer: Callable[[str], Iterable[str]] = char_level_splitter, 125 n_grams: int = 5, 126 seed: int = 42, 127 **kwargs: Any, 128 ) -> None: 129 """ 130 Initialize the deduplication filter with MinHash and LSH settings. 131 132 Args: 133 num_perm: Number of hash permutations for MinHash signature length. 134 threshold: Similarity threshold to decide optimal LSH parameters. 135 tokenizer: Function to split text into tokens. 136 n_grams: Number of tokens per n-gram for MinHash update. 137 seed: Seed for hash permutation consistency. 138 **kwargs: Additional keyword arguments for parent Filter. 139 """ 140 super().__init__(**kwargs) 141 self.num_perm = num_perm 142 self.threshold = threshold 143 self.tokenizer = tokenizer 144 self.n_grams = n_grams 145 self.seed = seed 146 147 # Compute optimal number of bands and band size based on threshold 148 self.num_bands, self.band_size = _optimal_param( 149 threshold=self.threshold, 150 num_perm=self.num_perm, 151 false_negative_weight=0.5, 152 false_positive_weight=0.5, 153 ) 154 155 def calculate_minhash_signature(self, text: str) -> NDArray[np.uint32]: 156 """ 157 Compute MinHash signature of input text as an array of uint32. 158 159 Steps: 160 1. Tokenize text using the provided tokenizer. 161 2. Generate n-gram tokens. 162 3. Update MinHash with n-gram tokens. 163 164 Args: 165 text: Input document text to be hashed. 166 167 Returns: 168 A 1D numpy array of shape (num_perm,) with dtype uint32. 169 """ 170 tokens = self.tokenizer(text) 171 n_gram_tokens = ngrams(tokens, self.n_grams) 172 # Join tokens into string n-grams for hashing 173 tokens = [" ".join(grams) for grams in n_gram_tokens] 174 # Initialize and update RMinHash 175 minhash = RMinHash(num_perm=self.num_perm, seed=self.seed) 176 minhash.update(tokens) 177 # Convert digest (list of ints) to numpy uint32 array 178 return np.asarray(minhash.digest(), dtype=np.uint32) 179 180 def _sig_bytes_le(self, sig: NDArray[np.uint32]) -> memoryview: 181 """ 182 Return the signature as *little-endian* byte view. 183 Platform‑independent way to get bytes from a numpy array. 184 """ 185 if sys.byteorder == "little": 186 # amd64 / arm64 (little) 187 return memoryview(sig).cast("B") # type: ignore[arg-type] # zero-copy 188 else: 189 # big-endian CPU 190 return memoryview(sig.byteswap()).cast("B") # type: ignore[arg-type] # 1 copy 191 192 def signature_to_lsh_digest( 193 self, signature: NDArray[np.uint32], band_size: int, band_idx: int 194 ) -> int: 195 """ 196 Convert a slice of the MinHash signature into an LSH digest with less memory overhead. 197 198 This method is optimized for speed by avoiding copies: 199 - We view the uint32 array as raw bytes (uint8 view). 200 - We create a memoryview of the byte slice for the specified band. 201 - We compute a 128-bit hash directly on the slice. 202 203 Args: 204 signature: 1D numpy array of uint32 representing MinHash signature. 205 band_size: Number of hashes per LSH band. 206 band_idx: Index of the band to hash (0-based). 207 208 Returns: 209 An integer representing the 128-bit hash digest of the band. 210 211 Raises: 212 AssertionError: If signature shape/dtype or band index is invalid. 213 """ 214 assert signature.dtype == np.uint32 and signature.ndim == 1, ( 215 "signature must be a 1D numpy array of uint32" 216 ) 217 assert 0 <= band_idx < self.num_bands, ( 218 f"band_idx {band_idx} out of range [0, {self.num_bands})" 219 ) 220 assert len(signature) >= band_size * self.num_bands, ( 221 "signature length is too short for given band_size and num_bands" 222 ) 223 224 # Compute byte offsets for the selected band 225 start = band_idx * band_size * self._BYTES_PER_U32 226 stop = start + band_size * self._BYTES_PER_U32 227 228 # View signature as raw bytes without copy. memoryview avoids creating new bytes. 229 mv = self._sig_bytes_le(signature)[start:stop] # slice view 230 231 return xxhash.xxh128_intdigest(mv) 232 233 def _format_lsh_key(self, band_idx: int, digest: int) -> str: 234 """ 235 Format the LSH key for a given band index and digest. 236 """ 237 return f"{band_idx}+{digest:032x}" 238 239 def apply(self, document: Document) -> Document: 240 """ 241 Decorate the document with LSH deduplication keys. 242 243 For each band, compute the digest and format as a hex string: 244 '<band_idx>+<128-bit-digest-hex>'. 245 Keys are stored in document.extras['dedup_lsh']. 246 247 Args: 248 document: Document object with 'text' attribute. 249 250 Returns: 251 The same Document object with 'dedup_lsh' added in extras. 252 """ 253 signature = self.calculate_minhash_signature(document.text) 254 lsh_keys = [ 255 self._format_lsh_key( 256 band_idx, self.signature_to_lsh_digest(signature, self.band_size, band_idx) 257 ) 258 for band_idx in range(self.num_bands) 259 ] 260 261 document.extras["dedup_lsh"] = lsh_keys 262 return document 263 264 265class InlineDeduplicator(Filter): 266 """ 267 Simple in‑memory deduplicator. 268 269 Stores every LSH key in a local :pyclass:`set`. If any key of the incoming 270 document is already present, the document is marked as duplicate via 271 `document.is_rejected = True`. 272 273 **Limitations** 274 ------------- 275 *State is per‑process only.* Running multiple workers or machines will *not* 276 share the key set – use :class:`RedisDeduplicator` or 277 :class:`RedisBloomDeduplicator` for distributed setups. 278 """ 279 280 def __init__(self, **kwargs: Any): 281 super().__init__(**kwargs) 282 self.hash_pool: set[str] = set() 283 284 def apply(self, document: Document) -> Document: 285 """ 286 Inline deduplication based on the LSH keys in the document. 287 This filter cannot use in the distributed environment because it uses a local hash pool. 288 """ 289 lsh_keys = document.extras.get("dedup_lsh") 290 if lsh_keys is None: 291 raise ValueError( 292 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 293 ) 294 295 for lsh in lsh_keys: 296 if lsh in self.hash_pool: 297 document.is_rejected = True 298 else: 299 self.hash_pool.add(lsh) 300 return document 301 302 303class RedisDeduplicator(Filter): 304 """ 305 Distributed deduplicator using **plain Redis keys**. 306 You have to run a Redis server and pass its connection parameters. 307 """ 308 309 def __init__( 310 self, 311 *, 312 host: str = "localhost", 313 port: int = 6379, 314 db: int = 0, 315 key_prefix: str = "dedup", 316 **kwargs: Any, 317 ) -> None: 318 """ 319 Initialize the Redis deduplicator. 320 Args: 321 host (str): Redis server hostname. 322 port (int): Redis server port. 323 db (int): Redis database number. 324 key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. 325 **kwargs: Additional keyword arguments for parent Filter. 326 """ 327 super().__init__(**kwargs) 328 self.rds = redis.Redis(host=host, port=port, db=db, decode_responses=False) 329 self.key_prefix = key_prefix.encode() 330 331 try: 332 self.rds.ping() 333 except redis.exceptions.RedisError as exc: 334 raise RuntimeError(f"Cannot connect to Redis server {host}:{port}/{db}") from exc 335 336 def apply(self, document: Document) -> Document: 337 lsh_keys = document.extras.get("dedup_lsh") 338 if lsh_keys is None: 339 raise ValueError("Apply GenerateDedupLSH first") 340 341 pipe = self.rds.pipeline(transaction=False) 342 for k in lsh_keys: 343 pipe.set(self.key_prefix + b":" + k.encode(), b"1", nx=True) 344 results: list[bool | None] = pipe.execute() # If instance already exists, it returns None 345 346 if any(r is None for r in results): 347 document.is_rejected = True 348 return document 349 350 351class RedisBloomDeduplicator(Filter): 352 """ 353 Distributed deduplicator backed by **RedisBloom scalable Bloom filters**. 354 You can use this filter to store-LSHs with less memory than Redis keys with the risk of false positives. 355 356 Each *band* gets its own scalable Bloom filter on the Redis side: 357 358 ```text 359 BF.RESERVE <prefix>:<band_idx> <error> <capacity> EXPANSION <n> 360 ``` 361 362 """ 363 364 def __init__( 365 self, 366 *, 367 expected_docs: int, 368 host: str = "localhost", 369 port: int = 6379, 370 db: int = 0, 371 key_prefix: str = "bloomdedup", 372 error_rate: float = 1e-7, 373 expansion: int = 2, 374 num_bands: int | None = None, 375 **kwargs: Any, 376 ): 377 """ 378 Initialize the RedisBloom deduplicator. 379 Args: 380 expected_docs (int): Expected number of documents to deduplicate. This is used to set the initial capacity of the Bloom filter. 381 host (str): Redis server hostname. 382 port (int): Redis server port. 383 db (int): Redis database number. 384 key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. 385 error_rate (float): Desired error rate for the Bloom filter. 386 expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically. 387 num_bands (int | None): Number of bands to use for LSH to calculate the capacity of BloomFilter. If None, it will be set to 32. 388 **kwargs: Additional keyword arguments for parent Filter. 389 """ 390 super().__init__(**kwargs) 391 self.rds = redis.Redis(host=host, port=port, db=db) 392 self.key_prefix = key_prefix.encode() 393 394 _num_bands = num_bands if num_bands is not None else 32 395 396 try: 397 self.rds.execute_command( 398 "BF.RESERVE", 399 self.key_prefix, 400 error_rate, 401 expected_docs * _num_bands, 402 "EXPANSION", 403 expansion, 404 ) 405 except redis.ResponseError as e: 406 if "exists" not in str(e): 407 raise 408 409 def apply(self, document: Document) -> Document: 410 lsh_keys: list[str] | None = document.extras.get("dedup_lsh") 411 if lsh_keys is None: 412 raise ValueError( 413 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 414 ) 415 416 key_bytes = [k.encode() for k in lsh_keys] 417 418 # Return value of BF.MADD is [1,0,1,...] (0 = already exists, 1 = insertion successful) 419 flags: Iterable[int] = self.rds.execute_command("BF.MADD", self.key_prefix, *key_bytes) 420 if 0 in flags: 421 document.is_rejected = True 422 return document 423 424 425class InlineDuplicateAnalyzer(Filter): 426 def __init__(self, **kwargs: Any): 427 super().__init__(**kwargs) 428 self.docs: dict[int, Document] = dict() # doc_id -> Document mapping 429 self.hash_pool: dict[str, int] = dict() # LSH key -> doc_id mapping 430 431 self._current_doc_id = 0 432 433 def apply(self, document: Document) -> Document: 434 """ 435 Analyze duplicates inline based on the LSH keys in the document. 436 This filter cannot use in the distributed environment because it uses a local hash pool. 437 """ 438 lsh_keys = document.extras.get("dedup_lsh") 439 if lsh_keys is None: 440 raise ValueError( 441 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 442 ) 443 444 for lsh in lsh_keys: 445 if lsh in self.hash_pool: 446 document.is_rejected = True 447 document.extras["similar_doc"] = self.docs[self.hash_pool[lsh]].text 448 else: 449 self.hash_pool[lsh] = self._current_doc_id 450 self.docs[self._current_doc_id] = document 451 452 self._current_doc_id += 1 453 return document
70def char_level_splitter(text: str) -> list[str]: 71 """ 72 Split the text into characters. 73 This is a simple implementation that splits the text into individual characters. 74 """ 75 return list(text)
Split the text into characters. This is a simple implementation that splits the text into individual characters.
78def non_alpha_num_splitter(text: str) -> list[str]: 79 """ 80 Split the text into alphanumeric tokens. 81 This is a simple implementation that splits on non-alphanumeric characters. 82 """ 83 return [token for token in NON_ALPHA.split(text) if token]
Split the text into alphanumeric tokens. This is a simple implementation that splits on non-alphanumeric characters.
86def japanese_word_splitter(text: str) -> list[str]: 87 """ 88 Split the text into Japanese words using fugashi. 89 This will import fugashi and instantiate Tagger on first use. 90 """ 91 global _japanese_tagger 92 if _japanese_tagger is None: 93 fugashi = importlib.import_module("fugashi") 94 _japanese_tagger = fugashi.Tagger() 95 return [token.surface for token in _japanese_tagger(text)]
Split the text into Japanese words using fugashi. This will import fugashi and instantiate Tagger on first use.
98class GenerateDedupLSH(Filter): 99 """ 100 Filter that uses MinHash + Locality-Sensitive Hashing (LSH) to assign 101 deduplication keys to documents, allowing fast near-duplicate detection. 102 103 Attributes: 104 num_perm (int): Number of permutations (hash functions) for MinHash. 105 threshold (float): Similarity threshold for tuning LSH parameters. 106 tokenizer (Callable[[str], Iterable[str]]): Function to tokenize text. 107 n_grams (int): n-gram size for token grouping. 108 seed (int): Random seed for MinHash. 109 num_bands (int): Number of LSH bands computed from threshold and num_perm. 110 band_size (int): Number of hashes per band. 111 112 Notes 113 ----- 114 `_optimal_param` searches for the optimal number of **bands** (`b`) and 115 **rows per band** (`r`) that minimise a weighted sum of false positives / 116 false negatives at the specified *threshold*. 117 """ 118 119 _BYTES_PER_U32: Final[int] = 4 120 121 def __init__( 122 self, 123 num_perm: int = 500, 124 threshold: float = 0.8, 125 tokenizer: Callable[[str], Iterable[str]] = char_level_splitter, 126 n_grams: int = 5, 127 seed: int = 42, 128 **kwargs: Any, 129 ) -> None: 130 """ 131 Initialize the deduplication filter with MinHash and LSH settings. 132 133 Args: 134 num_perm: Number of hash permutations for MinHash signature length. 135 threshold: Similarity threshold to decide optimal LSH parameters. 136 tokenizer: Function to split text into tokens. 137 n_grams: Number of tokens per n-gram for MinHash update. 138 seed: Seed for hash permutation consistency. 139 **kwargs: Additional keyword arguments for parent Filter. 140 """ 141 super().__init__(**kwargs) 142 self.num_perm = num_perm 143 self.threshold = threshold 144 self.tokenizer = tokenizer 145 self.n_grams = n_grams 146 self.seed = seed 147 148 # Compute optimal number of bands and band size based on threshold 149 self.num_bands, self.band_size = _optimal_param( 150 threshold=self.threshold, 151 num_perm=self.num_perm, 152 false_negative_weight=0.5, 153 false_positive_weight=0.5, 154 ) 155 156 def calculate_minhash_signature(self, text: str) -> NDArray[np.uint32]: 157 """ 158 Compute MinHash signature of input text as an array of uint32. 159 160 Steps: 161 1. Tokenize text using the provided tokenizer. 162 2. Generate n-gram tokens. 163 3. Update MinHash with n-gram tokens. 164 165 Args: 166 text: Input document text to be hashed. 167 168 Returns: 169 A 1D numpy array of shape (num_perm,) with dtype uint32. 170 """ 171 tokens = self.tokenizer(text) 172 n_gram_tokens = ngrams(tokens, self.n_grams) 173 # Join tokens into string n-grams for hashing 174 tokens = [" ".join(grams) for grams in n_gram_tokens] 175 # Initialize and update RMinHash 176 minhash = RMinHash(num_perm=self.num_perm, seed=self.seed) 177 minhash.update(tokens) 178 # Convert digest (list of ints) to numpy uint32 array 179 return np.asarray(minhash.digest(), dtype=np.uint32) 180 181 def _sig_bytes_le(self, sig: NDArray[np.uint32]) -> memoryview: 182 """ 183 Return the signature as *little-endian* byte view. 184 Platform‑independent way to get bytes from a numpy array. 185 """ 186 if sys.byteorder == "little": 187 # amd64 / arm64 (little) 188 return memoryview(sig).cast("B") # type: ignore[arg-type] # zero-copy 189 else: 190 # big-endian CPU 191 return memoryview(sig.byteswap()).cast("B") # type: ignore[arg-type] # 1 copy 192 193 def signature_to_lsh_digest( 194 self, signature: NDArray[np.uint32], band_size: int, band_idx: int 195 ) -> int: 196 """ 197 Convert a slice of the MinHash signature into an LSH digest with less memory overhead. 198 199 This method is optimized for speed by avoiding copies: 200 - We view the uint32 array as raw bytes (uint8 view). 201 - We create a memoryview of the byte slice for the specified band. 202 - We compute a 128-bit hash directly on the slice. 203 204 Args: 205 signature: 1D numpy array of uint32 representing MinHash signature. 206 band_size: Number of hashes per LSH band. 207 band_idx: Index of the band to hash (0-based). 208 209 Returns: 210 An integer representing the 128-bit hash digest of the band. 211 212 Raises: 213 AssertionError: If signature shape/dtype or band index is invalid. 214 """ 215 assert signature.dtype == np.uint32 and signature.ndim == 1, ( 216 "signature must be a 1D numpy array of uint32" 217 ) 218 assert 0 <= band_idx < self.num_bands, ( 219 f"band_idx {band_idx} out of range [0, {self.num_bands})" 220 ) 221 assert len(signature) >= band_size * self.num_bands, ( 222 "signature length is too short for given band_size and num_bands" 223 ) 224 225 # Compute byte offsets for the selected band 226 start = band_idx * band_size * self._BYTES_PER_U32 227 stop = start + band_size * self._BYTES_PER_U32 228 229 # View signature as raw bytes without copy. memoryview avoids creating new bytes. 230 mv = self._sig_bytes_le(signature)[start:stop] # slice view 231 232 return xxhash.xxh128_intdigest(mv) 233 234 def _format_lsh_key(self, band_idx: int, digest: int) -> str: 235 """ 236 Format the LSH key for a given band index and digest. 237 """ 238 return f"{band_idx}+{digest:032x}" 239 240 def apply(self, document: Document) -> Document: 241 """ 242 Decorate the document with LSH deduplication keys. 243 244 For each band, compute the digest and format as a hex string: 245 '<band_idx>+<128-bit-digest-hex>'. 246 Keys are stored in document.extras['dedup_lsh']. 247 248 Args: 249 document: Document object with 'text' attribute. 250 251 Returns: 252 The same Document object with 'dedup_lsh' added in extras. 253 """ 254 signature = self.calculate_minhash_signature(document.text) 255 lsh_keys = [ 256 self._format_lsh_key( 257 band_idx, self.signature_to_lsh_digest(signature, self.band_size, band_idx) 258 ) 259 for band_idx in range(self.num_bands) 260 ] 261 262 document.extras["dedup_lsh"] = lsh_keys 263 return document
Filter that uses MinHash + Locality-Sensitive Hashing (LSH) to assign deduplication keys to documents, allowing fast near-duplicate detection.
Attributes: num_perm (int): Number of permutations (hash functions) for MinHash. threshold (float): Similarity threshold for tuning LSH parameters. tokenizer (Callable[[str], Iterable[str]]): Function to tokenize text. n_grams (int): n-gram size for token grouping. seed (int): Random seed for MinHash. num_bands (int): Number of LSH bands computed from threshold and num_perm. band_size (int): Number of hashes per band.
Notes
_optimal_param
searches for the optimal number of bands (b
) and
rows per band (r
) that minimise a weighted sum of false positives /
false negatives at the specified threshold.
121 def __init__( 122 self, 123 num_perm: int = 500, 124 threshold: float = 0.8, 125 tokenizer: Callable[[str], Iterable[str]] = char_level_splitter, 126 n_grams: int = 5, 127 seed: int = 42, 128 **kwargs: Any, 129 ) -> None: 130 """ 131 Initialize the deduplication filter with MinHash and LSH settings. 132 133 Args: 134 num_perm: Number of hash permutations for MinHash signature length. 135 threshold: Similarity threshold to decide optimal LSH parameters. 136 tokenizer: Function to split text into tokens. 137 n_grams: Number of tokens per n-gram for MinHash update. 138 seed: Seed for hash permutation consistency. 139 **kwargs: Additional keyword arguments for parent Filter. 140 """ 141 super().__init__(**kwargs) 142 self.num_perm = num_perm 143 self.threshold = threshold 144 self.tokenizer = tokenizer 145 self.n_grams = n_grams 146 self.seed = seed 147 148 # Compute optimal number of bands and band size based on threshold 149 self.num_bands, self.band_size = _optimal_param( 150 threshold=self.threshold, 151 num_perm=self.num_perm, 152 false_negative_weight=0.5, 153 false_positive_weight=0.5, 154 )
Initialize the deduplication filter with MinHash and LSH settings.
Args: num_perm: Number of hash permutations for MinHash signature length. threshold: Similarity threshold to decide optimal LSH parameters. tokenizer: Function to split text into tokens. n_grams: Number of tokens per n-gram for MinHash update. seed: Seed for hash permutation consistency. **kwargs: Additional keyword arguments for parent Filter.
156 def calculate_minhash_signature(self, text: str) -> NDArray[np.uint32]: 157 """ 158 Compute MinHash signature of input text as an array of uint32. 159 160 Steps: 161 1. Tokenize text using the provided tokenizer. 162 2. Generate n-gram tokens. 163 3. Update MinHash with n-gram tokens. 164 165 Args: 166 text: Input document text to be hashed. 167 168 Returns: 169 A 1D numpy array of shape (num_perm,) with dtype uint32. 170 """ 171 tokens = self.tokenizer(text) 172 n_gram_tokens = ngrams(tokens, self.n_grams) 173 # Join tokens into string n-grams for hashing 174 tokens = [" ".join(grams) for grams in n_gram_tokens] 175 # Initialize and update RMinHash 176 minhash = RMinHash(num_perm=self.num_perm, seed=self.seed) 177 minhash.update(tokens) 178 # Convert digest (list of ints) to numpy uint32 array 179 return np.asarray(minhash.digest(), dtype=np.uint32)
Compute MinHash signature of input text as an array of uint32.
Steps: 1. Tokenize text using the provided tokenizer. 2. Generate n-gram tokens. 3. Update MinHash with n-gram tokens.
Args: text: Input document text to be hashed.
Returns: A 1D numpy array of shape (num_perm,) with dtype uint32.
193 def signature_to_lsh_digest( 194 self, signature: NDArray[np.uint32], band_size: int, band_idx: int 195 ) -> int: 196 """ 197 Convert a slice of the MinHash signature into an LSH digest with less memory overhead. 198 199 This method is optimized for speed by avoiding copies: 200 - We view the uint32 array as raw bytes (uint8 view). 201 - We create a memoryview of the byte slice for the specified band. 202 - We compute a 128-bit hash directly on the slice. 203 204 Args: 205 signature: 1D numpy array of uint32 representing MinHash signature. 206 band_size: Number of hashes per LSH band. 207 band_idx: Index of the band to hash (0-based). 208 209 Returns: 210 An integer representing the 128-bit hash digest of the band. 211 212 Raises: 213 AssertionError: If signature shape/dtype or band index is invalid. 214 """ 215 assert signature.dtype == np.uint32 and signature.ndim == 1, ( 216 "signature must be a 1D numpy array of uint32" 217 ) 218 assert 0 <= band_idx < self.num_bands, ( 219 f"band_idx {band_idx} out of range [0, {self.num_bands})" 220 ) 221 assert len(signature) >= band_size * self.num_bands, ( 222 "signature length is too short for given band_size and num_bands" 223 ) 224 225 # Compute byte offsets for the selected band 226 start = band_idx * band_size * self._BYTES_PER_U32 227 stop = start + band_size * self._BYTES_PER_U32 228 229 # View signature as raw bytes without copy. memoryview avoids creating new bytes. 230 mv = self._sig_bytes_le(signature)[start:stop] # slice view 231 232 return xxhash.xxh128_intdigest(mv)
Convert a slice of the MinHash signature into an LSH digest with less memory overhead.
This method is optimized for speed by avoiding copies:
- We view the uint32 array as raw bytes (uint8 view).
- We create a memoryview of the byte slice for the specified band.
- We compute a 128-bit hash directly on the slice.
Args: signature: 1D numpy array of uint32 representing MinHash signature. band_size: Number of hashes per LSH band. band_idx: Index of the band to hash (0-based).
Returns: An integer representing the 128-bit hash digest of the band.
Raises: AssertionError: If signature shape/dtype or band index is invalid.
240 def apply(self, document: Document) -> Document: 241 """ 242 Decorate the document with LSH deduplication keys. 243 244 For each band, compute the digest and format as a hex string: 245 '<band_idx>+<128-bit-digest-hex>'. 246 Keys are stored in document.extras['dedup_lsh']. 247 248 Args: 249 document: Document object with 'text' attribute. 250 251 Returns: 252 The same Document object with 'dedup_lsh' added in extras. 253 """ 254 signature = self.calculate_minhash_signature(document.text) 255 lsh_keys = [ 256 self._format_lsh_key( 257 band_idx, self.signature_to_lsh_digest(signature, self.band_size, band_idx) 258 ) 259 for band_idx in range(self.num_bands) 260 ] 261 262 document.extras["dedup_lsh"] = lsh_keys 263 return document
Decorate the document with LSH deduplication keys.
For each band, compute the digest and format as a hex string:
'
Args: document: Document object with 'text' attribute.
Returns: The same Document object with 'dedup_lsh' added in extras.
266class InlineDeduplicator(Filter): 267 """ 268 Simple in‑memory deduplicator. 269 270 Stores every LSH key in a local :pyclass:`set`. If any key of the incoming 271 document is already present, the document is marked as duplicate via 272 `document.is_rejected = True`. 273 274 **Limitations** 275 ------------- 276 *State is per‑process only.* Running multiple workers or machines will *not* 277 share the key set – use :class:`RedisDeduplicator` or 278 :class:`RedisBloomDeduplicator` for distributed setups. 279 """ 280 281 def __init__(self, **kwargs: Any): 282 super().__init__(**kwargs) 283 self.hash_pool: set[str] = set() 284 285 def apply(self, document: Document) -> Document: 286 """ 287 Inline deduplication based on the LSH keys in the document. 288 This filter cannot use in the distributed environment because it uses a local hash pool. 289 """ 290 lsh_keys = document.extras.get("dedup_lsh") 291 if lsh_keys is None: 292 raise ValueError( 293 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 294 ) 295 296 for lsh in lsh_keys: 297 if lsh in self.hash_pool: 298 document.is_rejected = True 299 else: 300 self.hash_pool.add(lsh) 301 return document
Simple in‑memory deduplicator.
Stores every LSH key in a local :pyclass:set
. If any key of the incoming
document is already present, the document is marked as duplicate via
document.is_rejected = True
.
Limitations
State is per‑process only. Running multiple workers or machines will not
share the key set – use RedisDeduplicator
or
RedisBloomDeduplicator
for distributed setups.
281 def __init__(self, **kwargs: Any): 282 super().__init__(**kwargs) 283 self.hash_pool: set[str] = set()
Initialize the filter.
Parameters
p : float
The probability of applying the filter.
If p
is 1, the filter will always be applied.
skip_rejected : bool
If True
, the filter will skip documents that are already rejected.
If you want to apply the filter to all documents (e.g., postprocess), set this to False
.
random_state : Optional[Union[int, np.random.Generator]]
Seed for the random number generator.
If None
, a new random number generator will be created.
If None
, and use in the Compose
class, the random state is shared with the Compose
object.
use_batch : bool
If True
, the filter will process documents in batches in the apply_stream
method.
batch_size : int
The size of the batch to process documents in the apply_stream
method.
kwargs : Any
Additional keyword arguments to pass to the filter.
285 def apply(self, document: Document) -> Document: 286 """ 287 Inline deduplication based on the LSH keys in the document. 288 This filter cannot use in the distributed environment because it uses a local hash pool. 289 """ 290 lsh_keys = document.extras.get("dedup_lsh") 291 if lsh_keys is None: 292 raise ValueError( 293 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 294 ) 295 296 for lsh in lsh_keys: 297 if lsh in self.hash_pool: 298 document.is_rejected = True 299 else: 300 self.hash_pool.add(lsh) 301 return document
Inline deduplication based on the LSH keys in the document. This filter cannot use in the distributed environment because it uses a local hash pool.
304class RedisDeduplicator(Filter): 305 """ 306 Distributed deduplicator using **plain Redis keys**. 307 You have to run a Redis server and pass its connection parameters. 308 """ 309 310 def __init__( 311 self, 312 *, 313 host: str = "localhost", 314 port: int = 6379, 315 db: int = 0, 316 key_prefix: str = "dedup", 317 **kwargs: Any, 318 ) -> None: 319 """ 320 Initialize the Redis deduplicator. 321 Args: 322 host (str): Redis server hostname. 323 port (int): Redis server port. 324 db (int): Redis database number. 325 key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. 326 **kwargs: Additional keyword arguments for parent Filter. 327 """ 328 super().__init__(**kwargs) 329 self.rds = redis.Redis(host=host, port=port, db=db, decode_responses=False) 330 self.key_prefix = key_prefix.encode() 331 332 try: 333 self.rds.ping() 334 except redis.exceptions.RedisError as exc: 335 raise RuntimeError(f"Cannot connect to Redis server {host}:{port}/{db}") from exc 336 337 def apply(self, document: Document) -> Document: 338 lsh_keys = document.extras.get("dedup_lsh") 339 if lsh_keys is None: 340 raise ValueError("Apply GenerateDedupLSH first") 341 342 pipe = self.rds.pipeline(transaction=False) 343 for k in lsh_keys: 344 pipe.set(self.key_prefix + b":" + k.encode(), b"1", nx=True) 345 results: list[bool | None] = pipe.execute() # If instance already exists, it returns None 346 347 if any(r is None for r in results): 348 document.is_rejected = True 349 return document
Distributed deduplicator using plain Redis keys. You have to run a Redis server and pass its connection parameters.
310 def __init__( 311 self, 312 *, 313 host: str = "localhost", 314 port: int = 6379, 315 db: int = 0, 316 key_prefix: str = "dedup", 317 **kwargs: Any, 318 ) -> None: 319 """ 320 Initialize the Redis deduplicator. 321 Args: 322 host (str): Redis server hostname. 323 port (int): Redis server port. 324 db (int): Redis database number. 325 key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. 326 **kwargs: Additional keyword arguments for parent Filter. 327 """ 328 super().__init__(**kwargs) 329 self.rds = redis.Redis(host=host, port=port, db=db, decode_responses=False) 330 self.key_prefix = key_prefix.encode() 331 332 try: 333 self.rds.ping() 334 except redis.exceptions.RedisError as exc: 335 raise RuntimeError(f"Cannot connect to Redis server {host}:{port}/{db}") from exc
Initialize the Redis deduplicator. Args: host (str): Redis server hostname. port (int): Redis server port. db (int): Redis database number. key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. **kwargs: Additional keyword arguments for parent Filter.
337 def apply(self, document: Document) -> Document: 338 lsh_keys = document.extras.get("dedup_lsh") 339 if lsh_keys is None: 340 raise ValueError("Apply GenerateDedupLSH first") 341 342 pipe = self.rds.pipeline(transaction=False) 343 for k in lsh_keys: 344 pipe.set(self.key_prefix + b":" + k.encode(), b"1", nx=True) 345 results: list[bool | None] = pipe.execute() # If instance already exists, it returns None 346 347 if any(r is None for r in results): 348 document.is_rejected = True 349 return document
Definition of filter behavior.
The document must have a protocol TextContent
,
and mostly used hojichar.Document class.
In this method, the filter will modify document.text
or
document.extras
and set document.is_rejected = True
to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
352class RedisBloomDeduplicator(Filter): 353 """ 354 Distributed deduplicator backed by **RedisBloom scalable Bloom filters**. 355 You can use this filter to store-LSHs with less memory than Redis keys with the risk of false positives. 356 357 Each *band* gets its own scalable Bloom filter on the Redis side: 358 359 ```text 360 BF.RESERVE <prefix>:<band_idx> <error> <capacity> EXPANSION <n> 361 ``` 362 363 """ 364 365 def __init__( 366 self, 367 *, 368 expected_docs: int, 369 host: str = "localhost", 370 port: int = 6379, 371 db: int = 0, 372 key_prefix: str = "bloomdedup", 373 error_rate: float = 1e-7, 374 expansion: int = 2, 375 num_bands: int | None = None, 376 **kwargs: Any, 377 ): 378 """ 379 Initialize the RedisBloom deduplicator. 380 Args: 381 expected_docs (int): Expected number of documents to deduplicate. This is used to set the initial capacity of the Bloom filter. 382 host (str): Redis server hostname. 383 port (int): Redis server port. 384 db (int): Redis database number. 385 key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. 386 error_rate (float): Desired error rate for the Bloom filter. 387 expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically. 388 num_bands (int | None): Number of bands to use for LSH to calculate the capacity of BloomFilter. If None, it will be set to 32. 389 **kwargs: Additional keyword arguments for parent Filter. 390 """ 391 super().__init__(**kwargs) 392 self.rds = redis.Redis(host=host, port=port, db=db) 393 self.key_prefix = key_prefix.encode() 394 395 _num_bands = num_bands if num_bands is not None else 32 396 397 try: 398 self.rds.execute_command( 399 "BF.RESERVE", 400 self.key_prefix, 401 error_rate, 402 expected_docs * _num_bands, 403 "EXPANSION", 404 expansion, 405 ) 406 except redis.ResponseError as e: 407 if "exists" not in str(e): 408 raise 409 410 def apply(self, document: Document) -> Document: 411 lsh_keys: list[str] | None = document.extras.get("dedup_lsh") 412 if lsh_keys is None: 413 raise ValueError( 414 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 415 ) 416 417 key_bytes = [k.encode() for k in lsh_keys] 418 419 # Return value of BF.MADD is [1,0,1,...] (0 = already exists, 1 = insertion successful) 420 flags: Iterable[int] = self.rds.execute_command("BF.MADD", self.key_prefix, *key_bytes) 421 if 0 in flags: 422 document.is_rejected = True 423 return document
Distributed deduplicator backed by RedisBloom scalable Bloom filters. You can use this filter to store-LSHs with less memory than Redis keys with the risk of false positives.
Each band gets its own scalable Bloom filter on the Redis side:
BF.RESERVE <prefix>:<band_idx> <error> <capacity> EXPANSION <n>
365 def __init__( 366 self, 367 *, 368 expected_docs: int, 369 host: str = "localhost", 370 port: int = 6379, 371 db: int = 0, 372 key_prefix: str = "bloomdedup", 373 error_rate: float = 1e-7, 374 expansion: int = 2, 375 num_bands: int | None = None, 376 **kwargs: Any, 377 ): 378 """ 379 Initialize the RedisBloom deduplicator. 380 Args: 381 expected_docs (int): Expected number of documents to deduplicate. This is used to set the initial capacity of the Bloom filter. 382 host (str): Redis server hostname. 383 port (int): Redis server port. 384 db (int): Redis database number. 385 key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. 386 error_rate (float): Desired error rate for the Bloom filter. 387 expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically. 388 num_bands (int | None): Number of bands to use for LSH to calculate the capacity of BloomFilter. If None, it will be set to 32. 389 **kwargs: Additional keyword arguments for parent Filter. 390 """ 391 super().__init__(**kwargs) 392 self.rds = redis.Redis(host=host, port=port, db=db) 393 self.key_prefix = key_prefix.encode() 394 395 _num_bands = num_bands if num_bands is not None else 32 396 397 try: 398 self.rds.execute_command( 399 "BF.RESERVE", 400 self.key_prefix, 401 error_rate, 402 expected_docs * _num_bands, 403 "EXPANSION", 404 expansion, 405 ) 406 except redis.ResponseError as e: 407 if "exists" not in str(e): 408 raise
Initialize the RedisBloom deduplicator. Args: expected_docs (int): Expected number of documents to deduplicate. This is used to set the initial capacity of the Bloom filter. host (str): Redis server hostname. port (int): Redis server port. db (int): Redis database number. key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. error_rate (float): Desired error rate for the Bloom filter. expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically. num_bands (int | None): Number of bands to use for LSH to calculate the capacity of BloomFilter. If None, it will be set to 32. **kwargs: Additional keyword arguments for parent Filter.
410 def apply(self, document: Document) -> Document: 411 lsh_keys: list[str] | None = document.extras.get("dedup_lsh") 412 if lsh_keys is None: 413 raise ValueError( 414 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 415 ) 416 417 key_bytes = [k.encode() for k in lsh_keys] 418 419 # Return value of BF.MADD is [1,0,1,...] (0 = already exists, 1 = insertion successful) 420 flags: Iterable[int] = self.rds.execute_command("BF.MADD", self.key_prefix, *key_bytes) 421 if 0 in flags: 422 document.is_rejected = True 423 return document
Definition of filter behavior.
The document must have a protocol TextContent
,
and mostly used hojichar.Document class.
In this method, the filter will modify document.text
or
document.extras
and set document.is_rejected = True
to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
426class InlineDuplicateAnalyzer(Filter): 427 def __init__(self, **kwargs: Any): 428 super().__init__(**kwargs) 429 self.docs: dict[int, Document] = dict() # doc_id -> Document mapping 430 self.hash_pool: dict[str, int] = dict() # LSH key -> doc_id mapping 431 432 self._current_doc_id = 0 433 434 def apply(self, document: Document) -> Document: 435 """ 436 Analyze duplicates inline based on the LSH keys in the document. 437 This filter cannot use in the distributed environment because it uses a local hash pool. 438 """ 439 lsh_keys = document.extras.get("dedup_lsh") 440 if lsh_keys is None: 441 raise ValueError( 442 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 443 ) 444 445 for lsh in lsh_keys: 446 if lsh in self.hash_pool: 447 document.is_rejected = True 448 document.extras["similar_doc"] = self.docs[self.hash_pool[lsh]].text 449 else: 450 self.hash_pool[lsh] = self._current_doc_id 451 self.docs[self._current_doc_id] = document 452 453 self._current_doc_id += 1 454 return document
Base class for all filters. Document-level filters must inherit from this class.
The definition of text processing is in apply
method.
If you define a new filter, override the method.
When this class is called, apply the filter from string to string.
With context manager, you can use the filter as follows:
with YourFilter(p=0.5) as filt:
text = filt("This is a sample text.")
427 def __init__(self, **kwargs: Any): 428 super().__init__(**kwargs) 429 self.docs: dict[int, Document] = dict() # doc_id -> Document mapping 430 self.hash_pool: dict[str, int] = dict() # LSH key -> doc_id mapping 431 432 self._current_doc_id = 0
Initialize the filter.
Parameters
p : float
The probability of applying the filter.
If p
is 1, the filter will always be applied.
skip_rejected : bool
If True
, the filter will skip documents that are already rejected.
If you want to apply the filter to all documents (e.g., postprocess), set this to False
.
random_state : Optional[Union[int, np.random.Generator]]
Seed for the random number generator.
If None
, a new random number generator will be created.
If None
, and use in the Compose
class, the random state is shared with the Compose
object.
use_batch : bool
If True
, the filter will process documents in batches in the apply_stream
method.
batch_size : int
The size of the batch to process documents in the apply_stream
method.
kwargs : Any
Additional keyword arguments to pass to the filter.
434 def apply(self, document: Document) -> Document: 435 """ 436 Analyze duplicates inline based on the LSH keys in the document. 437 This filter cannot use in the distributed environment because it uses a local hash pool. 438 """ 439 lsh_keys = document.extras.get("dedup_lsh") 440 if lsh_keys is None: 441 raise ValueError( 442 "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first." 443 ) 444 445 for lsh in lsh_keys: 446 if lsh in self.hash_pool: 447 document.is_rejected = True 448 document.extras["similar_doc"] = self.docs[self.hash_pool[lsh]].text 449 else: 450 self.hash_pool[lsh] = self._current_doc_id 451 self.docs[self._current_doc_id] = document 452 453 self._current_doc_id += 1 454 return document
Analyze duplicates inline based on the LSH keys in the document. This filter cannot use in the distributed environment because it uses a local hash pool.