hojichar.filters.deduplication

This module provides filters for deduplication using MinHash and Locality-Sensitive Hashing (LSH). If you want to use this module, install hojichar via pip install 'hojichar[dedup]'

What is MinHash LSH?

A gentle introduction for first‑time leaner for MinHash LSH


1 What problem does this solve?

When you have millions of documents it is too expensive to compare every pair directly. MinHash + Locality‑Sensitive Hashing (LSH) lets you

  • estimate Jaccard similarity extremely fast, and memory‑efficiently
  • retrieve near‑duplicates in sub‑linear time.

In practice you can keep a single set or Redis index of the generated LSH keys and ask: "Does any existing document share at least one LSH key with mine?"

If the answer is yes the two documents are almost certainly similar; if no they are very likely different.

How the pipeline works

GenerateDedupLSH filter generates LSH keys for each document.

  1. Tokenize
    • Split text into tokens. (Default: character‑level, but you can plug in any callable.)
  2. n-grams
    • Group tokens into n‑grams (n_grams=5) to capture context.
  3. MinHash
    • Hash each n‑gram with num_perm independent permutations and keep only the minimum value. The resulting signature is a vector of num_perm 32‑bit integers.
    • This module uses rensa.RMinHash which is a fast MinHash implementation by Rust language.
  4. Banding and compression
    • Split the signature into b bands, each containing r integers (num_perm ≈ b×r).
    • Treat the r integers as raw bytes, hash them with xxhash‑128, and format as +.
  5. Output
    • Store all band keys in document.extras['dedup_lsh'] as a list of strings.

InlineDeduplicator, RedisDeduplicator, and RedisBloomDeduplicator filters use the generated LSH keys to mark documents as duplicates.

  • InlineDeduplicator stores LSH keys in a local set, so it works only in a single process.
  • RedisDeduplicator stores LSH keys in Redis, so it works in a distributed environment.
  • RedisBloomDeduplicator stores LSH keys in RedisBloom, which is a scalable Bloom filter. It uses less memory than Redis keys but may return false positives.
  1"""
  2This module provides filters for deduplication using MinHash and Locality-Sensitive Hashing (LSH).
  3If you want to use this module, install hojichar via `pip install 'hojichar[dedup]'`
  4
  5## What is MinHash LSH?
  6*A gentle introduction for first‑time leaner for MinHash LSH*
  7
  8---
  9
 10### 1 What problem does this solve?
 11
 12When you have **millions of documents** it is too expensive to compare every pair directly.
 13**MinHash + Locality‑Sensitive Hashing (LSH)** lets you
 14
 15- estimate Jaccard similarity extremely fast, and memory‑efficiently
 16- retrieve **near‑duplicates** in sub‑linear time.
 17
 18In practice you can keep a single `set` or Redis index of the generated **LSH keys** and ask:
 19"Does any existing document share *at least one* LSH key with mine?"
 20
 21If the answer is yes the two documents are almost certainly similar; if no they are very likely different.
 22
 23### How the pipeline works
 24
 25`GenerateDedupLSH` filter generates LSH keys for each document.
 26
 271. Tokenize
 28  - Split text into tokens. (Default: character‑level, but you can plug in any callable.)
 292. n-grams
 30  - Group tokens into n‑grams (n_grams=5) to capture context.
 313. MinHash
 32  - Hash each n‑gram with `num_perm` independent permutations and keep only the minimum value. The resulting **signature** is a vector of num_perm 32‑bit integers.
 33  - This module uses `rensa.RMinHash` which is a fast MinHash implementation by Rust language.
 344. Banding and compression
 35  - Split the signature into b bands, each containing `r` integers (num_perm ≈ b×r).
 36  - Treat the `r` integers as raw bytes, hash them with xxhash‑128, and format as <band_idx>+<digest32hex>.
 375. Output
 38  - Store all band keys in `document.extras['dedup_lsh']` as a list of strings.
 39
 40`InlineDeduplicator`, `RedisDeduplicator`, and `RedisBloomDeduplicator` filters use the generated LSH keys to mark documents as duplicates.
 41
 42- `InlineDeduplicator` stores LSH keys in a local set, so it works only in a single process.
 43- `RedisDeduplicator` stores LSH keys in Redis, so it works in a distributed environment.
 44- `RedisBloomDeduplicator` stores LSH keys in RedisBloom, which is a scalable Bloom filter. It uses less memory than Redis keys but may return false positives.
 45
 46"""
 47
 48from __future__ import annotations
 49
 50import importlib
 51import re
 52import sys
 53from typing import Any, Callable, Final, Iterable, Optional
 54
 55import numpy as np
 56import redis
 57import xxhash
 58from datasketch.lsh import _optimal_param  # type: ignore
 59from nltk.util import ngrams  # type: ignore
 60from numpy.typing import NDArray
 61from rensa import RMinHash  # type: ignore
 62
 63from hojichar import Document, Filter
 64
 65_japanese_tagger: Optional["fugashi.Tagger"] = None  # type: ignore[name-defined] # noqa: F821
 66NON_ALPHA = re.compile("[^A-Za-z_0-9]")
 67
 68
 69def char_level_splitter(text: str) -> list[str]:
 70    """
 71    Split the text into characters.
 72    This is a simple implementation that splits the text into individual characters.
 73    """
 74    return list(text)
 75
 76
 77def non_alpha_num_splitter(text: str) -> list[str]:
 78    """
 79    Split the text into alphanumeric tokens.
 80    This is a simple implementation that splits on non-alphanumeric characters.
 81    """
 82    return [token for token in NON_ALPHA.split(text) if token]
 83
 84
 85def japanese_word_splitter(text: str) -> list[str]:
 86    """
 87    Split the text into Japanese words using fugashi.
 88    This will import fugashi and instantiate Tagger on first use.
 89    """
 90    global _japanese_tagger
 91    if _japanese_tagger is None:
 92        fugashi = importlib.import_module("fugashi")
 93        _japanese_tagger = fugashi.Tagger()
 94    return [token.surface for token in _japanese_tagger(text)]
 95
 96
 97class GenerateDedupLSH(Filter):
 98    """
 99    Filter that uses MinHash + Locality-Sensitive Hashing (LSH) to assign
100    deduplication keys to documents, allowing fast near-duplicate detection.
101
102    Attributes:
103        num_perm (int): Number of permutations (hash functions) for MinHash.
104        threshold (float): Similarity threshold for tuning LSH parameters.
105        tokenizer (Callable[[str], Iterable[str]]): Function to tokenize text.
106        n_grams (int): n-gram size for token grouping.
107        seed (int): Random seed for MinHash.
108        num_bands (int): Number of LSH bands computed from threshold and num_perm.
109        band_size (int): Number of hashes per band.
110
111    Notes
112    -----
113    `_optimal_param` searches for the optimal number of **bands** (`b`) and
114    **rows per band** (`r`) that minimise a weighted sum of false positives /
115    false negatives at the specified *threshold*.
116    """
117
118    _BYTES_PER_U32: Final[int] = 4
119
120    def __init__(
121        self,
122        num_perm: int = 500,
123        threshold: float = 0.8,
124        tokenizer: Callable[[str], Iterable[str]] = char_level_splitter,
125        n_grams: int = 5,
126        seed: int = 42,
127        **kwargs: Any,
128    ) -> None:
129        """
130        Initialize the deduplication filter with MinHash and LSH settings.
131
132        Args:
133            num_perm: Number of hash permutations for MinHash signature length.
134            threshold: Similarity threshold to decide optimal LSH parameters.
135            tokenizer: Function to split text into tokens.
136            n_grams: Number of tokens per n-gram for MinHash update.
137            seed: Seed for hash permutation consistency.
138            **kwargs: Additional keyword arguments for parent Filter.
139        """
140        super().__init__(**kwargs)
141        self.num_perm = num_perm
142        self.threshold = threshold
143        self.tokenizer = tokenizer
144        self.n_grams = n_grams
145        self.seed = seed
146
147        # Compute optimal number of bands and band size based on threshold
148        self.num_bands, self.band_size = _optimal_param(
149            threshold=self.threshold,
150            num_perm=self.num_perm,
151            false_negative_weight=0.5,
152            false_positive_weight=0.5,
153        )
154
155    def calculate_minhash_signature(self, text: str) -> NDArray[np.uint32]:
156        """
157        Compute MinHash signature of input text as an array of uint32.
158
159        Steps:
160            1. Tokenize text using the provided tokenizer.
161            2. Generate n-gram tokens.
162            3. Update MinHash with n-gram tokens.
163
164        Args:
165            text: Input document text to be hashed.
166
167        Returns:
168            A 1D numpy array of shape (num_perm,) with dtype uint32.
169        """
170        tokens = self.tokenizer(text)
171        n_gram_tokens = ngrams(tokens, self.n_grams)
172        # Join tokens into string n-grams for hashing
173        tokens = [" ".join(grams) for grams in n_gram_tokens]
174        # Initialize and update RMinHash
175        minhash = RMinHash(num_perm=self.num_perm, seed=self.seed)
176        minhash.update(tokens)
177        # Convert digest (list of ints) to numpy uint32 array
178        return np.asarray(minhash.digest(), dtype=np.uint32)
179
180    def _sig_bytes_le(self, sig: NDArray[np.uint32]) -> memoryview:
181        """
182        Return the signature as *little-endian* byte view.
183        Platform‑independent way to get bytes from a numpy array.
184        """
185        if sys.byteorder == "little":
186            # amd64 / arm64 (little)
187            return memoryview(sig).cast("B")  # type: ignore[arg-type]  # zero-copy
188        else:
189            # big-endian CPU
190            return memoryview(sig.byteswap()).cast("B")  # type: ignore[arg-type] # 1 copy
191
192    def signature_to_lsh_digest(
193        self, signature: NDArray[np.uint32], band_size: int, band_idx: int
194    ) -> int:
195        """
196        Convert a slice of the MinHash signature into an LSH digest with less memory overhead.
197
198        This method is optimized for speed by avoiding copies:
199        - We view the uint32 array as raw bytes (uint8 view).
200        - We create a memoryview of the byte slice for the specified band.
201        - We compute a 128-bit hash directly on the slice.
202
203        Args:
204            signature: 1D numpy array of uint32 representing MinHash signature.
205            band_size: Number of hashes per LSH band.
206            band_idx: Index of the band to hash (0-based).
207
208        Returns:
209            An integer representing the 128-bit hash digest of the band.
210
211        Raises:
212            AssertionError: If signature shape/dtype or band index is invalid.
213        """
214        assert signature.dtype == np.uint32 and signature.ndim == 1, (
215            "signature must be a 1D numpy array of uint32"
216        )
217        assert 0 <= band_idx < self.num_bands, (
218            f"band_idx {band_idx} out of range [0, {self.num_bands})"
219        )
220        assert len(signature) >= band_size * self.num_bands, (
221            "signature length is too short for given band_size and num_bands"
222        )
223
224        # Compute byte offsets for the selected band
225        start = band_idx * band_size * self._BYTES_PER_U32
226        stop = start + band_size * self._BYTES_PER_U32
227
228        # View signature as raw bytes without copy. memoryview avoids creating new bytes.
229        mv = self._sig_bytes_le(signature)[start:stop]  # slice view
230
231        return xxhash.xxh128_intdigest(mv)
232
233    def _format_lsh_key(self, band_idx: int, digest: int) -> str:
234        """
235        Format the LSH key for a given band index and digest.
236        """
237        return f"{band_idx}+{digest:032x}"
238
239    def apply(self, document: Document) -> Document:
240        """
241        Decorate the document with LSH deduplication keys.
242
243        For each band, compute the digest and format as a hex string:
244            '<band_idx>+<128-bit-digest-hex>'.
245        Keys are stored in document.extras['dedup_lsh'].
246
247        Args:
248            document: Document object with 'text' attribute.
249
250        Returns:
251            The same Document object with 'dedup_lsh' added in extras.
252        """
253        signature = self.calculate_minhash_signature(document.text)
254        lsh_keys = [
255            self._format_lsh_key(
256                band_idx, self.signature_to_lsh_digest(signature, self.band_size, band_idx)
257            )
258            for band_idx in range(self.num_bands)
259        ]
260
261        document.extras["dedup_lsh"] = lsh_keys
262        return document
263
264
265class InlineDeduplicator(Filter):
266    """
267    Simple in‑memory deduplicator.
268
269    Stores every LSH key in a local :pyclass:`set`. If any key of the incoming
270    document is already present, the document is marked as duplicate via
271    `document.is_rejected = True`.
272
273    **Limitations**
274    -------------
275    *State is per‑process only.* Running multiple workers or machines will *not*
276    share the key set – use :class:`RedisDeduplicator` or
277    :class:`RedisBloomDeduplicator` for distributed setups.
278    """
279
280    def __init__(self, **kwargs: Any):
281        super().__init__(**kwargs)
282        self.hash_pool: set[str] = set()
283
284    def apply(self, document: Document) -> Document:
285        """
286        Inline deduplication based on the LSH keys in the document.
287        This filter cannot use in the distributed environment because it uses a local hash pool.
288        """
289        lsh_keys = document.extras.get("dedup_lsh")
290        if lsh_keys is None:
291            raise ValueError(
292                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
293            )
294
295        for lsh in lsh_keys:
296            if lsh in self.hash_pool:
297                document.is_rejected = True
298            else:
299                self.hash_pool.add(lsh)
300        return document
301
302
303class RedisDeduplicator(Filter):
304    """
305    Distributed deduplicator using **plain Redis keys**.
306    You have to run a Redis server and pass its connection parameters.
307    """
308
309    def __init__(
310        self,
311        *,
312        host: str = "localhost",
313        port: int = 6379,
314        db: int = 0,
315        key_prefix: str = "dedup",
316        **kwargs: Any,
317    ) -> None:
318        """
319        Initialize the Redis deduplicator.
320        Args:
321            host (str): Redis server hostname.
322            port (int): Redis server port.
323            db (int): Redis database number.
324            key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task.
325            **kwargs: Additional keyword arguments for parent Filter.
326        """
327        super().__init__(**kwargs)
328        self.rds = redis.Redis(host=host, port=port, db=db, decode_responses=False)
329        self.key_prefix = key_prefix.encode()
330
331        try:
332            self.rds.ping()
333        except redis.exceptions.RedisError as exc:
334            raise RuntimeError(f"Cannot connect to Redis server {host}:{port}/{db}") from exc
335
336    def apply(self, document: Document) -> Document:
337        lsh_keys = document.extras.get("dedup_lsh")
338        if lsh_keys is None:
339            raise ValueError("Apply GenerateDedupLSH first")
340
341        pipe = self.rds.pipeline(transaction=False)
342        for k in lsh_keys:
343            pipe.set(self.key_prefix + b":" + k.encode(), b"1", nx=True)
344        results: list[bool | None] = pipe.execute()  # If instance already exists, it returns None
345
346        if any(r is None for r in results):
347            document.is_rejected = True
348        return document
349
350
351class RedisBloomDeduplicator(Filter):
352    """
353    Distributed deduplicator backed by **RedisBloom scalable Bloom filters**.
354    You can use this filter to store-LSHs with less memory than Redis keys with the risk of false positives.
355
356    Each *band* gets its own scalable Bloom filter on the Redis side:
357
358    ```text
359    BF.RESERVE <prefix>:<band_idx> <error> <capacity> EXPANSION <n>
360    ```
361
362    """
363
364    def __init__(
365        self,
366        *,
367        expected_docs: int,
368        host: str = "localhost",
369        port: int = 6379,
370        db: int = 0,
371        key_prefix: str = "bloomdedup",
372        error_rate: float = 1e-7,
373        expansion: int = 2,
374        num_bands: int | None = None,
375        **kwargs: Any,
376    ):
377        """
378        Initialize the RedisBloom deduplicator.
379        Args:
380            expected_docs (int): Expected number of documents to deduplicate. This is used to set the initial capacity of the Bloom filter.
381            host (str): Redis server hostname.
382            port (int): Redis server port.
383            db (int): Redis database number.
384            key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task.
385            error_rate (float): Desired error rate for the Bloom filter.
386            expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically.
387            num_bands (int | None): Number of bands to use for LSH to calculate the capacity of BloomFilter. If None, it will be set to 32.
388            **kwargs: Additional keyword arguments for parent Filter.
389        """
390        super().__init__(**kwargs)
391        self.rds = redis.Redis(host=host, port=port, db=db)
392        self.key_prefix = key_prefix.encode()
393
394        _num_bands = num_bands if num_bands is not None else 32
395
396        try:
397            self.rds.execute_command(
398                "BF.RESERVE",
399                self.key_prefix,
400                error_rate,
401                expected_docs * _num_bands,
402                "EXPANSION",
403                expansion,
404            )
405        except redis.ResponseError as e:
406            if "exists" not in str(e):
407                raise
408
409    def apply(self, document: Document) -> Document:
410        lsh_keys: list[str] | None = document.extras.get("dedup_lsh")
411        if lsh_keys is None:
412            raise ValueError(
413                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
414            )
415
416        key_bytes = [k.encode() for k in lsh_keys]
417
418        # Return value of BF.MADD is [1,0,1,...] (0 = already exists, 1 = insertion successful)
419        flags: Iterable[int] = self.rds.execute_command("BF.MADD", self.key_prefix, *key_bytes)
420        if 0 in flags:
421            document.is_rejected = True
422        return document
423
424
425class InlineDuplicateAnalyzer(Filter):
426    def __init__(self, **kwargs: Any):
427        super().__init__(**kwargs)
428        self.docs: dict[int, Document] = dict()  # doc_id -> Document mapping
429        self.hash_pool: dict[str, int] = dict()  # LSH key -> doc_id mapping
430
431        self._current_doc_id = 0
432
433    def apply(self, document: Document) -> Document:
434        """
435        Analyze duplicates inline based on the LSH keys in the document.
436        This filter cannot use in the distributed environment because it uses a local hash pool.
437        """
438        lsh_keys = document.extras.get("dedup_lsh")
439        if lsh_keys is None:
440            raise ValueError(
441                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
442            )
443
444        for lsh in lsh_keys:
445            if lsh in self.hash_pool:
446                document.is_rejected = True
447                document.extras["similar_doc"] = self.docs[self.hash_pool[lsh]].text
448            else:
449                self.hash_pool[lsh] = self._current_doc_id
450                self.docs[self._current_doc_id] = document
451
452        self._current_doc_id += 1
453        return document
def char_level_splitter(text: str) -> list[str]:
70def char_level_splitter(text: str) -> list[str]:
71    """
72    Split the text into characters.
73    This is a simple implementation that splits the text into individual characters.
74    """
75    return list(text)

Split the text into characters. This is a simple implementation that splits the text into individual characters.

def non_alpha_num_splitter(text: str) -> list[str]:
78def non_alpha_num_splitter(text: str) -> list[str]:
79    """
80    Split the text into alphanumeric tokens.
81    This is a simple implementation that splits on non-alphanumeric characters.
82    """
83    return [token for token in NON_ALPHA.split(text) if token]

Split the text into alphanumeric tokens. This is a simple implementation that splits on non-alphanumeric characters.

def japanese_word_splitter(text: str) -> list[str]:
86def japanese_word_splitter(text: str) -> list[str]:
87    """
88    Split the text into Japanese words using fugashi.
89    This will import fugashi and instantiate Tagger on first use.
90    """
91    global _japanese_tagger
92    if _japanese_tagger is None:
93        fugashi = importlib.import_module("fugashi")
94        _japanese_tagger = fugashi.Tagger()
95    return [token.surface for token in _japanese_tagger(text)]

Split the text into Japanese words using fugashi. This will import fugashi and instantiate Tagger on first use.

class GenerateDedupLSH(hojichar.core.filter_interface.Filter):
 98class GenerateDedupLSH(Filter):
 99    """
100    Filter that uses MinHash + Locality-Sensitive Hashing (LSH) to assign
101    deduplication keys to documents, allowing fast near-duplicate detection.
102
103    Attributes:
104        num_perm (int): Number of permutations (hash functions) for MinHash.
105        threshold (float): Similarity threshold for tuning LSH parameters.
106        tokenizer (Callable[[str], Iterable[str]]): Function to tokenize text.
107        n_grams (int): n-gram size for token grouping.
108        seed (int): Random seed for MinHash.
109        num_bands (int): Number of LSH bands computed from threshold and num_perm.
110        band_size (int): Number of hashes per band.
111
112    Notes
113    -----
114    `_optimal_param` searches for the optimal number of **bands** (`b`) and
115    **rows per band** (`r`) that minimise a weighted sum of false positives /
116    false negatives at the specified *threshold*.
117    """
118
119    _BYTES_PER_U32: Final[int] = 4
120
121    def __init__(
122        self,
123        num_perm: int = 500,
124        threshold: float = 0.8,
125        tokenizer: Callable[[str], Iterable[str]] = char_level_splitter,
126        n_grams: int = 5,
127        seed: int = 42,
128        **kwargs: Any,
129    ) -> None:
130        """
131        Initialize the deduplication filter with MinHash and LSH settings.
132
133        Args:
134            num_perm: Number of hash permutations for MinHash signature length.
135            threshold: Similarity threshold to decide optimal LSH parameters.
136            tokenizer: Function to split text into tokens.
137            n_grams: Number of tokens per n-gram for MinHash update.
138            seed: Seed for hash permutation consistency.
139            **kwargs: Additional keyword arguments for parent Filter.
140        """
141        super().__init__(**kwargs)
142        self.num_perm = num_perm
143        self.threshold = threshold
144        self.tokenizer = tokenizer
145        self.n_grams = n_grams
146        self.seed = seed
147
148        # Compute optimal number of bands and band size based on threshold
149        self.num_bands, self.band_size = _optimal_param(
150            threshold=self.threshold,
151            num_perm=self.num_perm,
152            false_negative_weight=0.5,
153            false_positive_weight=0.5,
154        )
155
156    def calculate_minhash_signature(self, text: str) -> NDArray[np.uint32]:
157        """
158        Compute MinHash signature of input text as an array of uint32.
159
160        Steps:
161            1. Tokenize text using the provided tokenizer.
162            2. Generate n-gram tokens.
163            3. Update MinHash with n-gram tokens.
164
165        Args:
166            text: Input document text to be hashed.
167
168        Returns:
169            A 1D numpy array of shape (num_perm,) with dtype uint32.
170        """
171        tokens = self.tokenizer(text)
172        n_gram_tokens = ngrams(tokens, self.n_grams)
173        # Join tokens into string n-grams for hashing
174        tokens = [" ".join(grams) for grams in n_gram_tokens]
175        # Initialize and update RMinHash
176        minhash = RMinHash(num_perm=self.num_perm, seed=self.seed)
177        minhash.update(tokens)
178        # Convert digest (list of ints) to numpy uint32 array
179        return np.asarray(minhash.digest(), dtype=np.uint32)
180
181    def _sig_bytes_le(self, sig: NDArray[np.uint32]) -> memoryview:
182        """
183        Return the signature as *little-endian* byte view.
184        Platform‑independent way to get bytes from a numpy array.
185        """
186        if sys.byteorder == "little":
187            # amd64 / arm64 (little)
188            return memoryview(sig).cast("B")  # type: ignore[arg-type]  # zero-copy
189        else:
190            # big-endian CPU
191            return memoryview(sig.byteswap()).cast("B")  # type: ignore[arg-type] # 1 copy
192
193    def signature_to_lsh_digest(
194        self, signature: NDArray[np.uint32], band_size: int, band_idx: int
195    ) -> int:
196        """
197        Convert a slice of the MinHash signature into an LSH digest with less memory overhead.
198
199        This method is optimized for speed by avoiding copies:
200        - We view the uint32 array as raw bytes (uint8 view).
201        - We create a memoryview of the byte slice for the specified band.
202        - We compute a 128-bit hash directly on the slice.
203
204        Args:
205            signature: 1D numpy array of uint32 representing MinHash signature.
206            band_size: Number of hashes per LSH band.
207            band_idx: Index of the band to hash (0-based).
208
209        Returns:
210            An integer representing the 128-bit hash digest of the band.
211
212        Raises:
213            AssertionError: If signature shape/dtype or band index is invalid.
214        """
215        assert signature.dtype == np.uint32 and signature.ndim == 1, (
216            "signature must be a 1D numpy array of uint32"
217        )
218        assert 0 <= band_idx < self.num_bands, (
219            f"band_idx {band_idx} out of range [0, {self.num_bands})"
220        )
221        assert len(signature) >= band_size * self.num_bands, (
222            "signature length is too short for given band_size and num_bands"
223        )
224
225        # Compute byte offsets for the selected band
226        start = band_idx * band_size * self._BYTES_PER_U32
227        stop = start + band_size * self._BYTES_PER_U32
228
229        # View signature as raw bytes without copy. memoryview avoids creating new bytes.
230        mv = self._sig_bytes_le(signature)[start:stop]  # slice view
231
232        return xxhash.xxh128_intdigest(mv)
233
234    def _format_lsh_key(self, band_idx: int, digest: int) -> str:
235        """
236        Format the LSH key for a given band index and digest.
237        """
238        return f"{band_idx}+{digest:032x}"
239
240    def apply(self, document: Document) -> Document:
241        """
242        Decorate the document with LSH deduplication keys.
243
244        For each band, compute the digest and format as a hex string:
245            '<band_idx>+<128-bit-digest-hex>'.
246        Keys are stored in document.extras['dedup_lsh'].
247
248        Args:
249            document: Document object with 'text' attribute.
250
251        Returns:
252            The same Document object with 'dedup_lsh' added in extras.
253        """
254        signature = self.calculate_minhash_signature(document.text)
255        lsh_keys = [
256            self._format_lsh_key(
257                band_idx, self.signature_to_lsh_digest(signature, self.band_size, band_idx)
258            )
259            for band_idx in range(self.num_bands)
260        ]
261
262        document.extras["dedup_lsh"] = lsh_keys
263        return document

Filter that uses MinHash + Locality-Sensitive Hashing (LSH) to assign deduplication keys to documents, allowing fast near-duplicate detection.

Attributes: num_perm (int): Number of permutations (hash functions) for MinHash. threshold (float): Similarity threshold for tuning LSH parameters. tokenizer (Callable[[str], Iterable[str]]): Function to tokenize text. n_grams (int): n-gram size for token grouping. seed (int): Random seed for MinHash. num_bands (int): Number of LSH bands computed from threshold and num_perm. band_size (int): Number of hashes per band.

Notes

_optimal_param searches for the optimal number of bands (b) and rows per band (r) that minimise a weighted sum of false positives / false negatives at the specified threshold.

GenerateDedupLSH( num_perm: int = 500, threshold: float = 0.8, tokenizer: Callable[[str], Iterable[str]] = <function char_level_splitter>, n_grams: int = 5, seed: int = 42, **kwargs: Any)
121    def __init__(
122        self,
123        num_perm: int = 500,
124        threshold: float = 0.8,
125        tokenizer: Callable[[str], Iterable[str]] = char_level_splitter,
126        n_grams: int = 5,
127        seed: int = 42,
128        **kwargs: Any,
129    ) -> None:
130        """
131        Initialize the deduplication filter with MinHash and LSH settings.
132
133        Args:
134            num_perm: Number of hash permutations for MinHash signature length.
135            threshold: Similarity threshold to decide optimal LSH parameters.
136            tokenizer: Function to split text into tokens.
137            n_grams: Number of tokens per n-gram for MinHash update.
138            seed: Seed for hash permutation consistency.
139            **kwargs: Additional keyword arguments for parent Filter.
140        """
141        super().__init__(**kwargs)
142        self.num_perm = num_perm
143        self.threshold = threshold
144        self.tokenizer = tokenizer
145        self.n_grams = n_grams
146        self.seed = seed
147
148        # Compute optimal number of bands and band size based on threshold
149        self.num_bands, self.band_size = _optimal_param(
150            threshold=self.threshold,
151            num_perm=self.num_perm,
152            false_negative_weight=0.5,
153            false_positive_weight=0.5,
154        )

Initialize the deduplication filter with MinHash and LSH settings.

Args: num_perm: Number of hash permutations for MinHash signature length. threshold: Similarity threshold to decide optimal LSH parameters. tokenizer: Function to split text into tokens. n_grams: Number of tokens per n-gram for MinHash update. seed: Seed for hash permutation consistency. **kwargs: Additional keyword arguments for parent Filter.

def calculate_minhash_signature(self, text: str) -> numpy.ndarray[typing.Any, numpy.dtype[numpy.uint32]]:
156    def calculate_minhash_signature(self, text: str) -> NDArray[np.uint32]:
157        """
158        Compute MinHash signature of input text as an array of uint32.
159
160        Steps:
161            1. Tokenize text using the provided tokenizer.
162            2. Generate n-gram tokens.
163            3. Update MinHash with n-gram tokens.
164
165        Args:
166            text: Input document text to be hashed.
167
168        Returns:
169            A 1D numpy array of shape (num_perm,) with dtype uint32.
170        """
171        tokens = self.tokenizer(text)
172        n_gram_tokens = ngrams(tokens, self.n_grams)
173        # Join tokens into string n-grams for hashing
174        tokens = [" ".join(grams) for grams in n_gram_tokens]
175        # Initialize and update RMinHash
176        minhash = RMinHash(num_perm=self.num_perm, seed=self.seed)
177        minhash.update(tokens)
178        # Convert digest (list of ints) to numpy uint32 array
179        return np.asarray(minhash.digest(), dtype=np.uint32)

Compute MinHash signature of input text as an array of uint32.

Steps: 1. Tokenize text using the provided tokenizer. 2. Generate n-gram tokens. 3. Update MinHash with n-gram tokens.

Args: text: Input document text to be hashed.

Returns: A 1D numpy array of shape (num_perm,) with dtype uint32.

def signature_to_lsh_digest( self, signature: numpy.ndarray[typing.Any, numpy.dtype[numpy.uint32]], band_size: int, band_idx: int) -> int:
193    def signature_to_lsh_digest(
194        self, signature: NDArray[np.uint32], band_size: int, band_idx: int
195    ) -> int:
196        """
197        Convert a slice of the MinHash signature into an LSH digest with less memory overhead.
198
199        This method is optimized for speed by avoiding copies:
200        - We view the uint32 array as raw bytes (uint8 view).
201        - We create a memoryview of the byte slice for the specified band.
202        - We compute a 128-bit hash directly on the slice.
203
204        Args:
205            signature: 1D numpy array of uint32 representing MinHash signature.
206            band_size: Number of hashes per LSH band.
207            band_idx: Index of the band to hash (0-based).
208
209        Returns:
210            An integer representing the 128-bit hash digest of the band.
211
212        Raises:
213            AssertionError: If signature shape/dtype or band index is invalid.
214        """
215        assert signature.dtype == np.uint32 and signature.ndim == 1, (
216            "signature must be a 1D numpy array of uint32"
217        )
218        assert 0 <= band_idx < self.num_bands, (
219            f"band_idx {band_idx} out of range [0, {self.num_bands})"
220        )
221        assert len(signature) >= band_size * self.num_bands, (
222            "signature length is too short for given band_size and num_bands"
223        )
224
225        # Compute byte offsets for the selected band
226        start = band_idx * band_size * self._BYTES_PER_U32
227        stop = start + band_size * self._BYTES_PER_U32
228
229        # View signature as raw bytes without copy. memoryview avoids creating new bytes.
230        mv = self._sig_bytes_le(signature)[start:stop]  # slice view
231
232        return xxhash.xxh128_intdigest(mv)

Convert a slice of the MinHash signature into an LSH digest with less memory overhead.

This method is optimized for speed by avoiding copies:

  • We view the uint32 array as raw bytes (uint8 view).
  • We create a memoryview of the byte slice for the specified band.
  • We compute a 128-bit hash directly on the slice.

Args: signature: 1D numpy array of uint32 representing MinHash signature. band_size: Number of hashes per LSH band. band_idx: Index of the band to hash (0-based).

Returns: An integer representing the 128-bit hash digest of the band.

Raises: AssertionError: If signature shape/dtype or band index is invalid.

def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
240    def apply(self, document: Document) -> Document:
241        """
242        Decorate the document with LSH deduplication keys.
243
244        For each band, compute the digest and format as a hex string:
245            '<band_idx>+<128-bit-digest-hex>'.
246        Keys are stored in document.extras['dedup_lsh'].
247
248        Args:
249            document: Document object with 'text' attribute.
250
251        Returns:
252            The same Document object with 'dedup_lsh' added in extras.
253        """
254        signature = self.calculate_minhash_signature(document.text)
255        lsh_keys = [
256            self._format_lsh_key(
257                band_idx, self.signature_to_lsh_digest(signature, self.band_size, band_idx)
258            )
259            for band_idx in range(self.num_bands)
260        ]
261
262        document.extras["dedup_lsh"] = lsh_keys
263        return document

Decorate the document with LSH deduplication keys.

For each band, compute the digest and format as a hex string: '+<128-bit-digest-hex>'. Keys are stored in document.extras['dedup_lsh'].

Args: document: Document object with 'text' attribute.

Returns: The same Document object with 'dedup_lsh' added in extras.

class InlineDeduplicator(hojichar.core.filter_interface.Filter):
266class InlineDeduplicator(Filter):
267    """
268    Simple in‑memory deduplicator.
269
270    Stores every LSH key in a local :pyclass:`set`. If any key of the incoming
271    document is already present, the document is marked as duplicate via
272    `document.is_rejected = True`.
273
274    **Limitations**
275    -------------
276    *State is per‑process only.* Running multiple workers or machines will *not*
277    share the key set – use :class:`RedisDeduplicator` or
278    :class:`RedisBloomDeduplicator` for distributed setups.
279    """
280
281    def __init__(self, **kwargs: Any):
282        super().__init__(**kwargs)
283        self.hash_pool: set[str] = set()
284
285    def apply(self, document: Document) -> Document:
286        """
287        Inline deduplication based on the LSH keys in the document.
288        This filter cannot use in the distributed environment because it uses a local hash pool.
289        """
290        lsh_keys = document.extras.get("dedup_lsh")
291        if lsh_keys is None:
292            raise ValueError(
293                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
294            )
295
296        for lsh in lsh_keys:
297            if lsh in self.hash_pool:
298                document.is_rejected = True
299            else:
300                self.hash_pool.add(lsh)
301        return document

Simple in‑memory deduplicator.

Stores every LSH key in a local :pyclass:set. If any key of the incoming document is already present, the document is marked as duplicate via document.is_rejected = True.

Limitations

State is per‑process only. Running multiple workers or machines will not share the key set – use RedisDeduplicator or RedisBloomDeduplicator for distributed setups.

InlineDeduplicator(**kwargs: Any)
281    def __init__(self, **kwargs: Any):
282        super().__init__(**kwargs)
283        self.hash_pool: set[str] = set()

Initialize the filter.

Parameters

p : float The probability of applying the filter. If p is 1, the filter will always be applied. skip_rejected : bool If True, the filter will skip documents that are already rejected. If you want to apply the filter to all documents (e.g., postprocess), set this to False. random_state : Optional[Union[int, np.random.Generator]] Seed for the random number generator. If None, a new random number generator will be created. If None, and use in the Compose class, the random state is shared with the Compose object. use_batch : bool If True, the filter will process documents in batches in the apply_stream method. batch_size : int The size of the batch to process documents in the apply_stream method. kwargs : Any Additional keyword arguments to pass to the filter.

def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
285    def apply(self, document: Document) -> Document:
286        """
287        Inline deduplication based on the LSH keys in the document.
288        This filter cannot use in the distributed environment because it uses a local hash pool.
289        """
290        lsh_keys = document.extras.get("dedup_lsh")
291        if lsh_keys is None:
292            raise ValueError(
293                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
294            )
295
296        for lsh in lsh_keys:
297            if lsh in self.hash_pool:
298                document.is_rejected = True
299            else:
300                self.hash_pool.add(lsh)
301        return document

Inline deduplication based on the LSH keys in the document. This filter cannot use in the distributed environment because it uses a local hash pool.

class RedisDeduplicator(hojichar.core.filter_interface.Filter):
304class RedisDeduplicator(Filter):
305    """
306    Distributed deduplicator using **plain Redis keys**.
307    You have to run a Redis server and pass its connection parameters.
308    """
309
310    def __init__(
311        self,
312        *,
313        host: str = "localhost",
314        port: int = 6379,
315        db: int = 0,
316        key_prefix: str = "dedup",
317        **kwargs: Any,
318    ) -> None:
319        """
320        Initialize the Redis deduplicator.
321        Args:
322            host (str): Redis server hostname.
323            port (int): Redis server port.
324            db (int): Redis database number.
325            key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task.
326            **kwargs: Additional keyword arguments for parent Filter.
327        """
328        super().__init__(**kwargs)
329        self.rds = redis.Redis(host=host, port=port, db=db, decode_responses=False)
330        self.key_prefix = key_prefix.encode()
331
332        try:
333            self.rds.ping()
334        except redis.exceptions.RedisError as exc:
335            raise RuntimeError(f"Cannot connect to Redis server {host}:{port}/{db}") from exc
336
337    def apply(self, document: Document) -> Document:
338        lsh_keys = document.extras.get("dedup_lsh")
339        if lsh_keys is None:
340            raise ValueError("Apply GenerateDedupLSH first")
341
342        pipe = self.rds.pipeline(transaction=False)
343        for k in lsh_keys:
344            pipe.set(self.key_prefix + b":" + k.encode(), b"1", nx=True)
345        results: list[bool | None] = pipe.execute()  # If instance already exists, it returns None
346
347        if any(r is None for r in results):
348            document.is_rejected = True
349        return document

Distributed deduplicator using plain Redis keys. You have to run a Redis server and pass its connection parameters.

RedisDeduplicator( *, host: str = 'localhost', port: int = 6379, db: int = 0, key_prefix: str = 'dedup', **kwargs: Any)
310    def __init__(
311        self,
312        *,
313        host: str = "localhost",
314        port: int = 6379,
315        db: int = 0,
316        key_prefix: str = "dedup",
317        **kwargs: Any,
318    ) -> None:
319        """
320        Initialize the Redis deduplicator.
321        Args:
322            host (str): Redis server hostname.
323            port (int): Redis server port.
324            db (int): Redis database number.
325            key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task.
326            **kwargs: Additional keyword arguments for parent Filter.
327        """
328        super().__init__(**kwargs)
329        self.rds = redis.Redis(host=host, port=port, db=db, decode_responses=False)
330        self.key_prefix = key_prefix.encode()
331
332        try:
333            self.rds.ping()
334        except redis.exceptions.RedisError as exc:
335            raise RuntimeError(f"Cannot connect to Redis server {host}:{port}/{db}") from exc

Initialize the Redis deduplicator. Args: host (str): Redis server hostname. port (int): Redis server port. db (int): Redis database number. key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. **kwargs: Additional keyword arguments for parent Filter.

def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
337    def apply(self, document: Document) -> Document:
338        lsh_keys = document.extras.get("dedup_lsh")
339        if lsh_keys is None:
340            raise ValueError("Apply GenerateDedupLSH first")
341
342        pipe = self.rds.pipeline(transaction=False)
343        for k in lsh_keys:
344            pipe.set(self.key_prefix + b":" + k.encode(), b"1", nx=True)
345        results: list[bool | None] = pipe.execute()  # If instance already exists, it returns None
346
347        if any(r is None for r in results):
348            document.is_rejected = True
349        return document

Definition of filter behavior.

The document must have a protocol TextContent, and mostly used hojichar.Document class.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

class RedisBloomDeduplicator(hojichar.core.filter_interface.Filter):
352class RedisBloomDeduplicator(Filter):
353    """
354    Distributed deduplicator backed by **RedisBloom scalable Bloom filters**.
355    You can use this filter to store-LSHs with less memory than Redis keys with the risk of false positives.
356
357    Each *band* gets its own scalable Bloom filter on the Redis side:
358
359    ```text
360    BF.RESERVE <prefix>:<band_idx> <error> <capacity> EXPANSION <n>
361    ```
362
363    """
364
365    def __init__(
366        self,
367        *,
368        expected_docs: int,
369        host: str = "localhost",
370        port: int = 6379,
371        db: int = 0,
372        key_prefix: str = "bloomdedup",
373        error_rate: float = 1e-7,
374        expansion: int = 2,
375        num_bands: int | None = None,
376        **kwargs: Any,
377    ):
378        """
379        Initialize the RedisBloom deduplicator.
380        Args:
381            expected_docs (int): Expected number of documents to deduplicate. This is used to set the initial capacity of the Bloom filter.
382            host (str): Redis server hostname.
383            port (int): Redis server port.
384            db (int): Redis database number.
385            key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task.
386            error_rate (float): Desired error rate for the Bloom filter.
387            expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically.
388            num_bands (int | None): Number of bands to use for LSH to calculate the capacity of BloomFilter. If None, it will be set to 32.
389            **kwargs: Additional keyword arguments for parent Filter.
390        """
391        super().__init__(**kwargs)
392        self.rds = redis.Redis(host=host, port=port, db=db)
393        self.key_prefix = key_prefix.encode()
394
395        _num_bands = num_bands if num_bands is not None else 32
396
397        try:
398            self.rds.execute_command(
399                "BF.RESERVE",
400                self.key_prefix,
401                error_rate,
402                expected_docs * _num_bands,
403                "EXPANSION",
404                expansion,
405            )
406        except redis.ResponseError as e:
407            if "exists" not in str(e):
408                raise
409
410    def apply(self, document: Document) -> Document:
411        lsh_keys: list[str] | None = document.extras.get("dedup_lsh")
412        if lsh_keys is None:
413            raise ValueError(
414                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
415            )
416
417        key_bytes = [k.encode() for k in lsh_keys]
418
419        # Return value of BF.MADD is [1,0,1,...] (0 = already exists, 1 = insertion successful)
420        flags: Iterable[int] = self.rds.execute_command("BF.MADD", self.key_prefix, *key_bytes)
421        if 0 in flags:
422            document.is_rejected = True
423        return document

Distributed deduplicator backed by RedisBloom scalable Bloom filters. You can use this filter to store-LSHs with less memory than Redis keys with the risk of false positives.

Each band gets its own scalable Bloom filter on the Redis side:

BF.RESERVE <prefix>:<band_idx> <error> <capacity> EXPANSION <n>
RedisBloomDeduplicator( *, expected_docs: int, host: str = 'localhost', port: int = 6379, db: int = 0, key_prefix: str = 'bloomdedup', error_rate: float = 1e-07, expansion: int = 2, num_bands: int | None = None, **kwargs: Any)
365    def __init__(
366        self,
367        *,
368        expected_docs: int,
369        host: str = "localhost",
370        port: int = 6379,
371        db: int = 0,
372        key_prefix: str = "bloomdedup",
373        error_rate: float = 1e-7,
374        expansion: int = 2,
375        num_bands: int | None = None,
376        **kwargs: Any,
377    ):
378        """
379        Initialize the RedisBloom deduplicator.
380        Args:
381            expected_docs (int): Expected number of documents to deduplicate. This is used to set the initial capacity of the Bloom filter.
382            host (str): Redis server hostname.
383            port (int): Redis server port.
384            db (int): Redis database number.
385            key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task.
386            error_rate (float): Desired error rate for the Bloom filter.
387            expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically.
388            num_bands (int | None): Number of bands to use for LSH to calculate the capacity of BloomFilter. If None, it will be set to 32.
389            **kwargs: Additional keyword arguments for parent Filter.
390        """
391        super().__init__(**kwargs)
392        self.rds = redis.Redis(host=host, port=port, db=db)
393        self.key_prefix = key_prefix.encode()
394
395        _num_bands = num_bands if num_bands is not None else 32
396
397        try:
398            self.rds.execute_command(
399                "BF.RESERVE",
400                self.key_prefix,
401                error_rate,
402                expected_docs * _num_bands,
403                "EXPANSION",
404                expansion,
405            )
406        except redis.ResponseError as e:
407            if "exists" not in str(e):
408                raise

Initialize the RedisBloom deduplicator. Args: expected_docs (int): Expected number of documents to deduplicate. This is used to set the initial capacity of the Bloom filter. host (str): Redis server hostname. port (int): Redis server port. db (int): Redis database number. key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. error_rate (float): Desired error rate for the Bloom filter. expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically. num_bands (int | None): Number of bands to use for LSH to calculate the capacity of BloomFilter. If None, it will be set to 32. **kwargs: Additional keyword arguments for parent Filter.

def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
410    def apply(self, document: Document) -> Document:
411        lsh_keys: list[str] | None = document.extras.get("dedup_lsh")
412        if lsh_keys is None:
413            raise ValueError(
414                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
415            )
416
417        key_bytes = [k.encode() for k in lsh_keys]
418
419        # Return value of BF.MADD is [1,0,1,...] (0 = already exists, 1 = insertion successful)
420        flags: Iterable[int] = self.rds.execute_command("BF.MADD", self.key_prefix, *key_bytes)
421        if 0 in flags:
422            document.is_rejected = True
423        return document

Definition of filter behavior.

The document must have a protocol TextContent, and mostly used hojichar.Document class.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

class InlineDuplicateAnalyzer(hojichar.core.filter_interface.Filter):
426class InlineDuplicateAnalyzer(Filter):
427    def __init__(self, **kwargs: Any):
428        super().__init__(**kwargs)
429        self.docs: dict[int, Document] = dict()  # doc_id -> Document mapping
430        self.hash_pool: dict[str, int] = dict()  # LSH key -> doc_id mapping
431
432        self._current_doc_id = 0
433
434    def apply(self, document: Document) -> Document:
435        """
436        Analyze duplicates inline based on the LSH keys in the document.
437        This filter cannot use in the distributed environment because it uses a local hash pool.
438        """
439        lsh_keys = document.extras.get("dedup_lsh")
440        if lsh_keys is None:
441            raise ValueError(
442                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
443            )
444
445        for lsh in lsh_keys:
446            if lsh in self.hash_pool:
447                document.is_rejected = True
448                document.extras["similar_doc"] = self.docs[self.hash_pool[lsh]].text
449            else:
450                self.hash_pool[lsh] = self._current_doc_id
451                self.docs[self._current_doc_id] = document
452
453        self._current_doc_id += 1
454        return document

Base class for all filters. Document-level filters must inherit from this class.

The definition of text processing is in apply method. If you define a new filter, override the method.

When this class is called, apply the filter from string to string.

With context manager, you can use the filter as follows:

with YourFilter(p=0.5) as filt:
    text = filt("This is a sample text.")
InlineDuplicateAnalyzer(**kwargs: Any)
427    def __init__(self, **kwargs: Any):
428        super().__init__(**kwargs)
429        self.docs: dict[int, Document] = dict()  # doc_id -> Document mapping
430        self.hash_pool: dict[str, int] = dict()  # LSH key -> doc_id mapping
431
432        self._current_doc_id = 0

Initialize the filter.

Parameters

p : float The probability of applying the filter. If p is 1, the filter will always be applied. skip_rejected : bool If True, the filter will skip documents that are already rejected. If you want to apply the filter to all documents (e.g., postprocess), set this to False. random_state : Optional[Union[int, np.random.Generator]] Seed for the random number generator. If None, a new random number generator will be created. If None, and use in the Compose class, the random state is shared with the Compose object. use_batch : bool If True, the filter will process documents in batches in the apply_stream method. batch_size : int The size of the batch to process documents in the apply_stream method. kwargs : Any Additional keyword arguments to pass to the filter.

def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
434    def apply(self, document: Document) -> Document:
435        """
436        Analyze duplicates inline based on the LSH keys in the document.
437        This filter cannot use in the distributed environment because it uses a local hash pool.
438        """
439        lsh_keys = document.extras.get("dedup_lsh")
440        if lsh_keys is None:
441            raise ValueError(
442                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
443            )
444
445        for lsh in lsh_keys:
446            if lsh in self.hash_pool:
447                document.is_rejected = True
448                document.extras["similar_doc"] = self.docs[self.hash_pool[lsh]].text
449            else:
450                self.hash_pool[lsh] = self._current_doc_id
451                self.docs[self._current_doc_id] = document
452
453        self._current_doc_id += 1
454        return document

Analyze duplicates inline based on the LSH keys in the document. This filter cannot use in the distributed environment because it uses a local hash pool.