hojichar.filters.deduplication

This module provides filters for deduplication using MinHash and Locality-Sensitive Hashing (LSH). If you want to use this module, install hojichar via pip install 'hojichar[dedup]'

What is MinHash LSH?

A gentle introduction for first‑time leaner for MinHash LSH


1 What problem does this solve?

When you have millions of documents it is too expensive to compare every pair directly. MinHash + Locality‑Sensitive Hashing (LSH) lets you

  • estimate Jaccard similarity extremely fast, and memory‑efficiently
  • retrieve near‑duplicates in sub‑linear time.

In practice you can keep a single set or Redis index of the generated LSH keys and ask: "Does any existing document share at least one LSH key with mine?"

If the answer is yes the two documents are almost certainly similar; if no they are very likely different.

How the pipeline works

GenerateDedupLSH filter generates LSH keys for each document.

  1. Tokenize
    • Split text into tokens. (Default: character‑level, but you can plug in any callable.)
  2. n-grams
    • Group tokens into n‑grams (n_grams=5) to capture context.
  3. MinHash
    • Hash each n‑gram with num_perm independent permutations and keep only the minimum value. The resulting signature is a vector of num_perm 32‑bit integers.
    • This module uses rensa.RMinHash which is a fast MinHash implementation by Rust language.
  4. Banding and compression
    • Split the signature into b bands, each containing r integers (num_perm ≈ b×r).
    • Treat the r integers as raw bytes, hash them with xxhash‑128, and format as +.
  5. Output
    • Store all band keys in document.extras['dedup_lsh'] as a list of strings.

InlineDeduplicator, RedisDeduplicator, and RedisBloomDeduplicator filters use the generated LSH keys to mark documents as duplicates.

  • InlineDeduplicator stores LSH keys in a local set, so it works only in a single process.
  • RedisDeduplicator stores LSH keys in Redis, so it works in a distributed environment.
  • RedisBloomDeduplicator stores LSH keys in RedisBloom, which is a scalable Bloom filter. It uses less memory than Redis keys but may return false positives.
  1"""
  2This module provides filters for deduplication using MinHash and Locality-Sensitive Hashing (LSH).
  3If you want to use this module, install hojichar via `pip install 'hojichar[dedup]'`
  4
  5## What is MinHash LSH?
  6*A gentle introduction for first‑time leaner for MinHash LSH*
  7
  8---
  9
 10### 1 What problem does this solve?
 11
 12When you have **millions of documents** it is too expensive to compare every pair directly.
 13**MinHash + Locality‑Sensitive Hashing (LSH)** lets you
 14
 15- estimate Jaccard similarity extremely fast, and memory‑efficiently
 16- retrieve **near‑duplicates** in sub‑linear time.
 17
 18In practice you can keep a single `set` or Redis index of the generated **LSH keys** and ask:
 19"Does any existing document share *at least one* LSH key with mine?"
 20
 21If the answer is yes the two documents are almost certainly similar; if no they are very likely different.
 22
 23### How the pipeline works
 24
 25`GenerateDedupLSH` filter generates LSH keys for each document.
 26
 271. Tokenize
 28  - Split text into tokens. (Default: character‑level, but you can plug in any callable.)
 292. n-grams
 30  - Group tokens into n‑grams (n_grams=5) to capture context.
 313. MinHash
 32  - Hash each n‑gram with `num_perm` independent permutations and keep only the minimum value. The resulting **signature** is a vector of num_perm 32‑bit integers.
 33  - This module uses `rensa.RMinHash` which is a fast MinHash implementation by Rust language.
 344. Banding and compression
 35  - Split the signature into b bands, each containing `r` integers (num_perm ≈ b×r).
 36  - Treat the `r` integers as raw bytes, hash them with xxhash‑128, and format as <band_idx>+<digest32hex>.
 375. Output
 38  - Store all band keys in `document.extras['dedup_lsh']` as a list of strings.
 39
 40`InlineDeduplicator`, `RedisDeduplicator`, and `RedisBloomDeduplicator` filters use the generated LSH keys to mark documents as duplicates.
 41
 42- `InlineDeduplicator` stores LSH keys in a local set, so it works only in a single process.
 43- `RedisDeduplicator` stores LSH keys in Redis, so it works in a distributed environment.
 44- `RedisBloomDeduplicator` stores LSH keys in RedisBloom, which is a scalable Bloom filter. It uses less memory than Redis keys but may return false positives.
 45
 46"""
 47
 48from __future__ import annotations
 49
 50import importlib
 51import re
 52import sys
 53from typing import Any, Callable, Final, Iterable, Optional
 54
 55import numpy as np
 56from numpy.typing import NDArray
 57
 58try:
 59    import redis
 60    import xxhash
 61    from datasketch.lsh import _optimal_param  # type: ignore
 62    from nltk.util import ngrams  # type: ignore
 63    from rensa import RMinHash  # type: ignore
 64
 65    is_loaded_dedup = True
 66except ImportError:
 67    is_loaded_dedup = False
 68
 69
 70from hojichar import Document, Filter
 71
 72_japanese_tagger: Optional["fugashi.Tagger"] = None  # type: ignore[name-defined] # noqa: F821
 73NON_ALPHA = re.compile("[^A-Za-z_0-9]")
 74IS_LOADED_DEDUP_ERROR_MSG = (
 75    "Failed to import redis, xxhash, rensa, or datasketch. "
 76    "Please install the extra dependencies with `pip install 'hojichar[dedup]'`"
 77)
 78
 79
 80def char_level_splitter(text: str) -> list[str]:
 81    """
 82    Split the text into characters.
 83    This is a simple implementation that splits the text into individual characters.
 84    """
 85    return list(text)
 86
 87
 88def non_alpha_num_splitter(text: str) -> list[str]:
 89    """
 90    Split the text into alphanumeric tokens.
 91    This is a simple implementation that splits on non-alphanumeric characters.
 92    """
 93    return [token for token in NON_ALPHA.split(text) if token]
 94
 95
 96def japanese_word_splitter(text: str) -> list[str]:
 97    """
 98    Split the text into Japanese words using fugashi.
 99    This will import fugashi and instantiate Tagger on first use.
100    """
101    global _japanese_tagger
102    if _japanese_tagger is None:
103        fugashi = importlib.import_module("fugashi")
104        _japanese_tagger = fugashi.Tagger()
105    return [token.surface for token in _japanese_tagger(text)]
106
107
108class GenerateDedupLSH(Filter):
109    """
110    Filter that uses MinHash + Locality-Sensitive Hashing (LSH) to assign
111    deduplication keys to documents, allowing fast near-duplicate detection.
112
113    Attributes:
114        num_perm (int): Number of permutations (hash functions) for MinHash.
115        threshold (float): Similarity threshold for tuning LSH parameters.
116        tokenizer (Callable[[str], Iterable[str]]): Function to tokenize text.
117        n_grams (int): n-gram size for token grouping.
118        seed (int): Random seed for MinHash.
119        num_bands (int): Number of LSH bands computed from threshold and num_perm.
120        band_size (int): Number of hashes per band.
121
122    Notes
123    -----
124    `_optimal_param` searches for the optimal number of **bands** (`b`) and
125    **rows per band** (`r`) that minimise a weighted sum of false positives /
126    false negatives at the specified *threshold*.
127    """
128
129    _BYTES_PER_U32: Final[int] = 4
130
131    def __init__(
132        self,
133        num_perm: int = 500,
134        threshold: float = 0.8,
135        tokenizer: Callable[[str], Iterable[str]] = char_level_splitter,
136        n_grams: int = 5,
137        seed: int = 42,
138        **kwargs: Any,
139    ) -> None:
140        """
141        Initialize the deduplication filter with MinHash and LSH settings.
142
143        Args:
144            num_perm: Number of hash permutations for MinHash signature length.
145            threshold: Similarity threshold to decide optimal LSH parameters.
146            tokenizer: Function to split text into tokens.
147            n_grams: Number of tokens per n-gram for MinHash update.
148            seed: Seed for hash permutation consistency.
149            **kwargs: Additional keyword arguments for parent Filter.
150        """
151        super().__init__(**kwargs)
152        if not is_loaded_dedup:
153            raise ImportError(IS_LOADED_DEDUP_ERROR_MSG)
154        self.num_perm = num_perm
155        self.threshold = threshold
156        self.tokenizer = tokenizer
157        self.n_grams = n_grams
158        self.seed = seed
159
160        # Compute optimal number of bands and band size based on threshold
161        self.num_bands, self.band_size = _optimal_param(
162            threshold=self.threshold,
163            num_perm=self.num_perm,
164            false_negative_weight=0.5,
165            false_positive_weight=0.5,
166        )
167
168    def calculate_minhash_signature(self, text: str) -> NDArray[np.uint32]:
169        """
170        Compute MinHash signature of input text as an array of uint32.
171
172        Steps:
173            1. Tokenize text using the provided tokenizer.
174            2. Generate n-gram tokens.
175            3. Update MinHash with n-gram tokens.
176
177        Args:
178            text: Input document text to be hashed.
179
180        Returns:
181            A 1D numpy array of shape (num_perm,) with dtype uint32.
182        """
183        tokens = self.tokenizer(text)
184        n_gram_tokens = ngrams(tokens, self.n_grams)
185        # Join tokens into string n-grams for hashing
186        tokens = [" ".join(grams) for grams in n_gram_tokens]
187        # Initialize and update RMinHash
188        minhash = RMinHash(num_perm=self.num_perm, seed=self.seed)
189        minhash.update(tokens)
190        # Convert digest (list of ints) to numpy uint32 array
191        return np.asarray(minhash.digest(), dtype=np.uint32)
192
193    def _sig_bytes_le(self, sig: NDArray[np.uint32]) -> memoryview:
194        """
195        Return the signature as *little-endian* byte view.
196        Platform‑independent way to get bytes from a numpy array.
197        """
198        if sys.byteorder == "little":
199            # amd64 / arm64 (little)
200            return memoryview(sig).cast("B")  # type: ignore[arg-type]  # zero-copy
201        else:
202            # big-endian CPU
203            return memoryview(sig.byteswap()).cast("B")  # type: ignore[arg-type] # 1 copy
204
205    def signature_to_lsh_digest(
206        self, signature: NDArray[np.uint32], band_size: int, band_idx: int
207    ) -> int:
208        """
209        Convert a slice of the MinHash signature into an LSH digest with less memory overhead.
210
211        This method is optimized for speed by avoiding copies:
212        - We view the uint32 array as raw bytes (uint8 view).
213        - We create a memoryview of the byte slice for the specified band.
214        - We compute a 128-bit hash directly on the slice.
215
216        Args:
217            signature: 1D numpy array of uint32 representing MinHash signature.
218            band_size: Number of hashes per LSH band.
219            band_idx: Index of the band to hash (0-based).
220
221        Returns:
222            An integer representing the 128-bit hash digest of the band.
223
224        Raises:
225            AssertionError: If signature shape/dtype or band index is invalid.
226        """
227        assert signature.dtype == np.uint32 and signature.ndim == 1, (
228            "signature must be a 1D numpy array of uint32"
229        )
230        assert 0 <= band_idx < self.num_bands, (
231            f"band_idx {band_idx} out of range [0, {self.num_bands})"
232        )
233        assert len(signature) >= band_size * self.num_bands, (
234            "signature length is too short for given band_size and num_bands"
235        )
236
237        # Compute byte offsets for the selected band
238        start = band_idx * band_size * self._BYTES_PER_U32
239        stop = start + band_size * self._BYTES_PER_U32
240
241        # View signature as raw bytes without copy. memoryview avoids creating new bytes.
242        mv = self._sig_bytes_le(signature)[start:stop]  # slice view
243
244        return xxhash.xxh128_intdigest(mv)
245
246    def _format_lsh_key(self, band_idx: int, digest: int) -> str:
247        """
248        Format the LSH key for a given band index and digest.
249        """
250        return f"{band_idx}+{digest:032x}"
251
252    def apply(self, document: Document) -> Document:
253        """
254        Decorate the document with LSH deduplication keys.
255
256        For each band, compute the digest and format as a hex string:
257            '<band_idx>+<128-bit-digest-hex>'.
258        Keys are stored in document.extras['dedup_lsh'].
259
260        Args:
261            document: Document object with 'text' attribute.
262
263        Returns:
264            The same Document object with 'dedup_lsh' added in extras.
265        """
266        signature = self.calculate_minhash_signature(document.text)
267        lsh_keys = [
268            self._format_lsh_key(
269                band_idx, self.signature_to_lsh_digest(signature, self.band_size, band_idx)
270            )
271            for band_idx in range(self.num_bands)
272        ]
273
274        document.extras["dedup_lsh"] = lsh_keys
275        return document
276
277
278class InlineDeduplicator(Filter):
279    """
280    Simple in‑memory deduplicator.
281
282    Stores every LSH key in a local :pyclass:`set`. If any key of the incoming
283    document is already present, the document is marked as duplicate via
284    `document.is_rejected = True`.
285
286    **Limitations**
287    -------------
288    *State is per‑process only.* Running multiple workers or machines will *not*
289    share the key set – use :class:`RedisDeduplicator` or
290    :class:`RedisBloomDeduplicator` for distributed setups.
291    """
292
293    def __init__(self, **kwargs: Any):
294        super().__init__(**kwargs)
295        self.hash_pool: set[str] = set()
296
297    def apply(self, document: Document) -> Document:
298        """
299        Inline deduplication based on the LSH keys in the document.
300        This filter cannot use in the distributed environment because it uses a local hash pool.
301        """
302        lsh_keys = document.extras.get("dedup_lsh")
303        if lsh_keys is None:
304            raise ValueError(
305                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
306            )
307
308        for lsh in lsh_keys:
309            if lsh in self.hash_pool:
310                document.is_rejected = True
311            else:
312                self.hash_pool.add(lsh)
313        return document
314
315
316class RedisDeduplicator(Filter):
317    """
318    Distributed deduplicator using **plain Redis keys**.
319    You have to run a Redis server and pass its connection parameters.
320    """
321
322    def __init__(
323        self,
324        *,
325        host: str = "localhost",
326        port: int = 6379,
327        db: int = 0,
328        key_prefix: str = "dedup",
329        **kwargs: Any,
330    ) -> None:
331        """
332        Initialize the Redis deduplicator.
333        Args:
334            host (str): Redis server hostname.
335            port (int): Redis server port.
336            db (int): Redis database number.
337            key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task.
338            **kwargs: Additional keyword arguments for parent Filter.
339        """
340        if not is_loaded_dedup:
341            raise ImportError(IS_LOADED_DEDUP_ERROR_MSG)
342        super().__init__(**kwargs)
343        self.rds = redis.Redis(host=host, port=port, db=db, decode_responses=False)
344        self.key_prefix = key_prefix.encode()
345
346        try:
347            self.rds.ping()
348        except redis.exceptions.RedisError as exc:
349            raise RuntimeError(f"Cannot connect to Redis server {host}:{port}/{db}") from exc
350
351    def apply(self, document: Document) -> Document:
352        lsh_keys = document.extras.get("dedup_lsh")
353        if lsh_keys is None:
354            raise ValueError("Apply GenerateDedupLSH first")
355
356        pipe = self.rds.pipeline(transaction=False)
357        for k in lsh_keys:
358            pipe.set(self.key_prefix + b":" + k.encode(), b"1", nx=True)
359        results: list[bool | None] = pipe.execute()  # If instance already exists, it returns None
360
361        if any(r is None for r in results):
362            document.is_rejected = True
363        return document
364
365
366class RedisBloomDeduplicator(Filter):
367    """
368    Distributed deduplicator backed by **RedisBloom scalable Bloom filters**.
369    You can use this filter to store-LSHs with less memory than Redis keys with the risk of false positives.
370
371    Each *band* gets its own scalable Bloom filter on the Redis side:
372
373    ```text
374    BF.RESERVE <prefix>:<band_idx> <error> <capacity> EXPANSION <n>
375    ```
376
377    """
378
379    def __init__(
380        self,
381        *,
382        expected_docs: int,
383        host: str = "localhost",
384        port: int = 6379,
385        db: int = 0,
386        key_prefix: str = "bloomdedup",
387        error_rate: float = 1e-7,
388        expansion: int = 2,
389        num_bands: int | None = None,
390        **kwargs: Any,
391    ):
392        """
393        Initialize the RedisBloom deduplicator.
394        Args:
395            expected_docs (int): Expected number of documents to deduplicate. This is used to set the initial capacity of the Bloom filter.
396            host (str): Redis server hostname.
397            port (int): Redis server port.
398            db (int): Redis database number.
399            key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task.
400            error_rate (float): Desired error rate for the Bloom filter.
401            expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically.
402            num_bands (int | None): Number of bands to use for LSH to calculate the capacity of BloomFilter. If None, it will be set to 32.
403            **kwargs: Additional keyword arguments for parent Filter.
404        """
405        if not is_loaded_dedup:
406            raise ImportError(IS_LOADED_DEDUP_ERROR_MSG)
407        super().__init__(**kwargs)
408        self.rds = redis.Redis(host=host, port=port, db=db)
409        self.key_prefix = key_prefix.encode()
410
411        _num_bands = num_bands if num_bands is not None else 32
412
413        try:
414            self.rds.execute_command(
415                "BF.RESERVE",
416                self.key_prefix,
417                error_rate,
418                expected_docs * _num_bands,
419                "EXPANSION",
420                expansion,
421            )
422        except redis.ResponseError as e:
423            if "exists" not in str(e):
424                raise
425
426    def apply(self, document: Document) -> Document:
427        lsh_keys: list[str] | None = document.extras.get("dedup_lsh")
428        if lsh_keys is None:
429            raise ValueError(
430                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
431            )
432
433        key_bytes = [k.encode() for k in lsh_keys]
434
435        # Return value of BF.MADD is [1,0,1,...] (0 = already exists, 1 = insertion successful)
436        flags: Iterable[int] = self.rds.execute_command("BF.MADD", self.key_prefix, *key_bytes)
437        if 0 in flags:
438            document.is_rejected = True
439        return document
440
441
442class InlineDuplicateAnalyzer(Filter):
443    def __init__(self, **kwargs: Any):
444        super().__init__(**kwargs)
445        self.docs: dict[int, Document] = dict()  # doc_id -> Document mapping
446        self.hash_pool: dict[str, int] = dict()  # LSH key -> doc_id mapping
447
448        self._current_doc_id = 0
449
450    def apply(self, document: Document) -> Document:
451        """
452        Analyze duplicates inline based on the LSH keys in the document.
453        This filter cannot use in the distributed environment because it uses a local hash pool.
454        """
455        lsh_keys = document.extras.get("dedup_lsh")
456        if lsh_keys is None:
457            raise ValueError(
458                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
459            )
460
461        for lsh in lsh_keys:
462            if lsh in self.hash_pool:
463                document.is_rejected = True
464                document.extras["similar_doc"] = self.docs[self.hash_pool[lsh]].text
465            else:
466                self.hash_pool[lsh] = self._current_doc_id
467                self.docs[self._current_doc_id] = document
468
469        self._current_doc_id += 1
470        return document
def char_level_splitter(text: str) -> list[str]:
81def char_level_splitter(text: str) -> list[str]:
82    """
83    Split the text into characters.
84    This is a simple implementation that splits the text into individual characters.
85    """
86    return list(text)

Split the text into characters. This is a simple implementation that splits the text into individual characters.

def non_alpha_num_splitter(text: str) -> list[str]:
89def non_alpha_num_splitter(text: str) -> list[str]:
90    """
91    Split the text into alphanumeric tokens.
92    This is a simple implementation that splits on non-alphanumeric characters.
93    """
94    return [token for token in NON_ALPHA.split(text) if token]

Split the text into alphanumeric tokens. This is a simple implementation that splits on non-alphanumeric characters.

def japanese_word_splitter(text: str) -> list[str]:
 97def japanese_word_splitter(text: str) -> list[str]:
 98    """
 99    Split the text into Japanese words using fugashi.
100    This will import fugashi and instantiate Tagger on first use.
101    """
102    global _japanese_tagger
103    if _japanese_tagger is None:
104        fugashi = importlib.import_module("fugashi")
105        _japanese_tagger = fugashi.Tagger()
106    return [token.surface for token in _japanese_tagger(text)]

Split the text into Japanese words using fugashi. This will import fugashi and instantiate Tagger on first use.

class GenerateDedupLSH(hojichar.core.filter_interface.Filter):
109class GenerateDedupLSH(Filter):
110    """
111    Filter that uses MinHash + Locality-Sensitive Hashing (LSH) to assign
112    deduplication keys to documents, allowing fast near-duplicate detection.
113
114    Attributes:
115        num_perm (int): Number of permutations (hash functions) for MinHash.
116        threshold (float): Similarity threshold for tuning LSH parameters.
117        tokenizer (Callable[[str], Iterable[str]]): Function to tokenize text.
118        n_grams (int): n-gram size for token grouping.
119        seed (int): Random seed for MinHash.
120        num_bands (int): Number of LSH bands computed from threshold and num_perm.
121        band_size (int): Number of hashes per band.
122
123    Notes
124    -----
125    `_optimal_param` searches for the optimal number of **bands** (`b`) and
126    **rows per band** (`r`) that minimise a weighted sum of false positives /
127    false negatives at the specified *threshold*.
128    """
129
130    _BYTES_PER_U32: Final[int] = 4
131
132    def __init__(
133        self,
134        num_perm: int = 500,
135        threshold: float = 0.8,
136        tokenizer: Callable[[str], Iterable[str]] = char_level_splitter,
137        n_grams: int = 5,
138        seed: int = 42,
139        **kwargs: Any,
140    ) -> None:
141        """
142        Initialize the deduplication filter with MinHash and LSH settings.
143
144        Args:
145            num_perm: Number of hash permutations for MinHash signature length.
146            threshold: Similarity threshold to decide optimal LSH parameters.
147            tokenizer: Function to split text into tokens.
148            n_grams: Number of tokens per n-gram for MinHash update.
149            seed: Seed for hash permutation consistency.
150            **kwargs: Additional keyword arguments for parent Filter.
151        """
152        super().__init__(**kwargs)
153        if not is_loaded_dedup:
154            raise ImportError(IS_LOADED_DEDUP_ERROR_MSG)
155        self.num_perm = num_perm
156        self.threshold = threshold
157        self.tokenizer = tokenizer
158        self.n_grams = n_grams
159        self.seed = seed
160
161        # Compute optimal number of bands and band size based on threshold
162        self.num_bands, self.band_size = _optimal_param(
163            threshold=self.threshold,
164            num_perm=self.num_perm,
165            false_negative_weight=0.5,
166            false_positive_weight=0.5,
167        )
168
169    def calculate_minhash_signature(self, text: str) -> NDArray[np.uint32]:
170        """
171        Compute MinHash signature of input text as an array of uint32.
172
173        Steps:
174            1. Tokenize text using the provided tokenizer.
175            2. Generate n-gram tokens.
176            3. Update MinHash with n-gram tokens.
177
178        Args:
179            text: Input document text to be hashed.
180
181        Returns:
182            A 1D numpy array of shape (num_perm,) with dtype uint32.
183        """
184        tokens = self.tokenizer(text)
185        n_gram_tokens = ngrams(tokens, self.n_grams)
186        # Join tokens into string n-grams for hashing
187        tokens = [" ".join(grams) for grams in n_gram_tokens]
188        # Initialize and update RMinHash
189        minhash = RMinHash(num_perm=self.num_perm, seed=self.seed)
190        minhash.update(tokens)
191        # Convert digest (list of ints) to numpy uint32 array
192        return np.asarray(minhash.digest(), dtype=np.uint32)
193
194    def _sig_bytes_le(self, sig: NDArray[np.uint32]) -> memoryview:
195        """
196        Return the signature as *little-endian* byte view.
197        Platform‑independent way to get bytes from a numpy array.
198        """
199        if sys.byteorder == "little":
200            # amd64 / arm64 (little)
201            return memoryview(sig).cast("B")  # type: ignore[arg-type]  # zero-copy
202        else:
203            # big-endian CPU
204            return memoryview(sig.byteswap()).cast("B")  # type: ignore[arg-type] # 1 copy
205
206    def signature_to_lsh_digest(
207        self, signature: NDArray[np.uint32], band_size: int, band_idx: int
208    ) -> int:
209        """
210        Convert a slice of the MinHash signature into an LSH digest with less memory overhead.
211
212        This method is optimized for speed by avoiding copies:
213        - We view the uint32 array as raw bytes (uint8 view).
214        - We create a memoryview of the byte slice for the specified band.
215        - We compute a 128-bit hash directly on the slice.
216
217        Args:
218            signature: 1D numpy array of uint32 representing MinHash signature.
219            band_size: Number of hashes per LSH band.
220            band_idx: Index of the band to hash (0-based).
221
222        Returns:
223            An integer representing the 128-bit hash digest of the band.
224
225        Raises:
226            AssertionError: If signature shape/dtype or band index is invalid.
227        """
228        assert signature.dtype == np.uint32 and signature.ndim == 1, (
229            "signature must be a 1D numpy array of uint32"
230        )
231        assert 0 <= band_idx < self.num_bands, (
232            f"band_idx {band_idx} out of range [0, {self.num_bands})"
233        )
234        assert len(signature) >= band_size * self.num_bands, (
235            "signature length is too short for given band_size and num_bands"
236        )
237
238        # Compute byte offsets for the selected band
239        start = band_idx * band_size * self._BYTES_PER_U32
240        stop = start + band_size * self._BYTES_PER_U32
241
242        # View signature as raw bytes without copy. memoryview avoids creating new bytes.
243        mv = self._sig_bytes_le(signature)[start:stop]  # slice view
244
245        return xxhash.xxh128_intdigest(mv)
246
247    def _format_lsh_key(self, band_idx: int, digest: int) -> str:
248        """
249        Format the LSH key for a given band index and digest.
250        """
251        return f"{band_idx}+{digest:032x}"
252
253    def apply(self, document: Document) -> Document:
254        """
255        Decorate the document with LSH deduplication keys.
256
257        For each band, compute the digest and format as a hex string:
258            '<band_idx>+<128-bit-digest-hex>'.
259        Keys are stored in document.extras['dedup_lsh'].
260
261        Args:
262            document: Document object with 'text' attribute.
263
264        Returns:
265            The same Document object with 'dedup_lsh' added in extras.
266        """
267        signature = self.calculate_minhash_signature(document.text)
268        lsh_keys = [
269            self._format_lsh_key(
270                band_idx, self.signature_to_lsh_digest(signature, self.band_size, band_idx)
271            )
272            for band_idx in range(self.num_bands)
273        ]
274
275        document.extras["dedup_lsh"] = lsh_keys
276        return document

Filter that uses MinHash + Locality-Sensitive Hashing (LSH) to assign deduplication keys to documents, allowing fast near-duplicate detection.

Attributes: num_perm (int): Number of permutations (hash functions) for MinHash. threshold (float): Similarity threshold for tuning LSH parameters. tokenizer (Callable[[str], Iterable[str]]): Function to tokenize text. n_grams (int): n-gram size for token grouping. seed (int): Random seed for MinHash. num_bands (int): Number of LSH bands computed from threshold and num_perm. band_size (int): Number of hashes per band.

Notes

_optimal_param searches for the optimal number of bands (b) and rows per band (r) that minimise a weighted sum of false positives / false negatives at the specified threshold.

GenerateDedupLSH( num_perm: int = 500, threshold: float = 0.8, tokenizer: Callable[[str], Iterable[str]] = <function char_level_splitter>, n_grams: int = 5, seed: int = 42, **kwargs: Any)
132    def __init__(
133        self,
134        num_perm: int = 500,
135        threshold: float = 0.8,
136        tokenizer: Callable[[str], Iterable[str]] = char_level_splitter,
137        n_grams: int = 5,
138        seed: int = 42,
139        **kwargs: Any,
140    ) -> None:
141        """
142        Initialize the deduplication filter with MinHash and LSH settings.
143
144        Args:
145            num_perm: Number of hash permutations for MinHash signature length.
146            threshold: Similarity threshold to decide optimal LSH parameters.
147            tokenizer: Function to split text into tokens.
148            n_grams: Number of tokens per n-gram for MinHash update.
149            seed: Seed for hash permutation consistency.
150            **kwargs: Additional keyword arguments for parent Filter.
151        """
152        super().__init__(**kwargs)
153        if not is_loaded_dedup:
154            raise ImportError(IS_LOADED_DEDUP_ERROR_MSG)
155        self.num_perm = num_perm
156        self.threshold = threshold
157        self.tokenizer = tokenizer
158        self.n_grams = n_grams
159        self.seed = seed
160
161        # Compute optimal number of bands and band size based on threshold
162        self.num_bands, self.band_size = _optimal_param(
163            threshold=self.threshold,
164            num_perm=self.num_perm,
165            false_negative_weight=0.5,
166            false_positive_weight=0.5,
167        )

Initialize the deduplication filter with MinHash and LSH settings.

Args: num_perm: Number of hash permutations for MinHash signature length. threshold: Similarity threshold to decide optimal LSH parameters. tokenizer: Function to split text into tokens. n_grams: Number of tokens per n-gram for MinHash update. seed: Seed for hash permutation consistency. **kwargs: Additional keyword arguments for parent Filter.

def calculate_minhash_signature(self, text: str) -> numpy.ndarray[typing.Any, numpy.dtype[numpy.uint32]]:
169    def calculate_minhash_signature(self, text: str) -> NDArray[np.uint32]:
170        """
171        Compute MinHash signature of input text as an array of uint32.
172
173        Steps:
174            1. Tokenize text using the provided tokenizer.
175            2. Generate n-gram tokens.
176            3. Update MinHash with n-gram tokens.
177
178        Args:
179            text: Input document text to be hashed.
180
181        Returns:
182            A 1D numpy array of shape (num_perm,) with dtype uint32.
183        """
184        tokens = self.tokenizer(text)
185        n_gram_tokens = ngrams(tokens, self.n_grams)
186        # Join tokens into string n-grams for hashing
187        tokens = [" ".join(grams) for grams in n_gram_tokens]
188        # Initialize and update RMinHash
189        minhash = RMinHash(num_perm=self.num_perm, seed=self.seed)
190        minhash.update(tokens)
191        # Convert digest (list of ints) to numpy uint32 array
192        return np.asarray(minhash.digest(), dtype=np.uint32)

Compute MinHash signature of input text as an array of uint32.

Steps: 1. Tokenize text using the provided tokenizer. 2. Generate n-gram tokens. 3. Update MinHash with n-gram tokens.

Args: text: Input document text to be hashed.

Returns: A 1D numpy array of shape (num_perm,) with dtype uint32.

def signature_to_lsh_digest( self, signature: numpy.ndarray[typing.Any, numpy.dtype[numpy.uint32]], band_size: int, band_idx: int) -> int:
206    def signature_to_lsh_digest(
207        self, signature: NDArray[np.uint32], band_size: int, band_idx: int
208    ) -> int:
209        """
210        Convert a slice of the MinHash signature into an LSH digest with less memory overhead.
211
212        This method is optimized for speed by avoiding copies:
213        - We view the uint32 array as raw bytes (uint8 view).
214        - We create a memoryview of the byte slice for the specified band.
215        - We compute a 128-bit hash directly on the slice.
216
217        Args:
218            signature: 1D numpy array of uint32 representing MinHash signature.
219            band_size: Number of hashes per LSH band.
220            band_idx: Index of the band to hash (0-based).
221
222        Returns:
223            An integer representing the 128-bit hash digest of the band.
224
225        Raises:
226            AssertionError: If signature shape/dtype or band index is invalid.
227        """
228        assert signature.dtype == np.uint32 and signature.ndim == 1, (
229            "signature must be a 1D numpy array of uint32"
230        )
231        assert 0 <= band_idx < self.num_bands, (
232            f"band_idx {band_idx} out of range [0, {self.num_bands})"
233        )
234        assert len(signature) >= band_size * self.num_bands, (
235            "signature length is too short for given band_size and num_bands"
236        )
237
238        # Compute byte offsets for the selected band
239        start = band_idx * band_size * self._BYTES_PER_U32
240        stop = start + band_size * self._BYTES_PER_U32
241
242        # View signature as raw bytes without copy. memoryview avoids creating new bytes.
243        mv = self._sig_bytes_le(signature)[start:stop]  # slice view
244
245        return xxhash.xxh128_intdigest(mv)

Convert a slice of the MinHash signature into an LSH digest with less memory overhead.

This method is optimized for speed by avoiding copies:

  • We view the uint32 array as raw bytes (uint8 view).
  • We create a memoryview of the byte slice for the specified band.
  • We compute a 128-bit hash directly on the slice.

Args: signature: 1D numpy array of uint32 representing MinHash signature. band_size: Number of hashes per LSH band. band_idx: Index of the band to hash (0-based).

Returns: An integer representing the 128-bit hash digest of the band.

Raises: AssertionError: If signature shape/dtype or band index is invalid.

def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
253    def apply(self, document: Document) -> Document:
254        """
255        Decorate the document with LSH deduplication keys.
256
257        For each band, compute the digest and format as a hex string:
258            '<band_idx>+<128-bit-digest-hex>'.
259        Keys are stored in document.extras['dedup_lsh'].
260
261        Args:
262            document: Document object with 'text' attribute.
263
264        Returns:
265            The same Document object with 'dedup_lsh' added in extras.
266        """
267        signature = self.calculate_minhash_signature(document.text)
268        lsh_keys = [
269            self._format_lsh_key(
270                band_idx, self.signature_to_lsh_digest(signature, self.band_size, band_idx)
271            )
272            for band_idx in range(self.num_bands)
273        ]
274
275        document.extras["dedup_lsh"] = lsh_keys
276        return document

Decorate the document with LSH deduplication keys.

For each band, compute the digest and format as a hex string: '+<128-bit-digest-hex>'. Keys are stored in document.extras['dedup_lsh'].

Args: document: Document object with 'text' attribute.

Returns: The same Document object with 'dedup_lsh' added in extras.

class InlineDeduplicator(hojichar.core.filter_interface.Filter):
279class InlineDeduplicator(Filter):
280    """
281    Simple in‑memory deduplicator.
282
283    Stores every LSH key in a local :pyclass:`set`. If any key of the incoming
284    document is already present, the document is marked as duplicate via
285    `document.is_rejected = True`.
286
287    **Limitations**
288    -------------
289    *State is per‑process only.* Running multiple workers or machines will *not*
290    share the key set – use :class:`RedisDeduplicator` or
291    :class:`RedisBloomDeduplicator` for distributed setups.
292    """
293
294    def __init__(self, **kwargs: Any):
295        super().__init__(**kwargs)
296        self.hash_pool: set[str] = set()
297
298    def apply(self, document: Document) -> Document:
299        """
300        Inline deduplication based on the LSH keys in the document.
301        This filter cannot use in the distributed environment because it uses a local hash pool.
302        """
303        lsh_keys = document.extras.get("dedup_lsh")
304        if lsh_keys is None:
305            raise ValueError(
306                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
307            )
308
309        for lsh in lsh_keys:
310            if lsh in self.hash_pool:
311                document.is_rejected = True
312            else:
313                self.hash_pool.add(lsh)
314        return document

Simple in‑memory deduplicator.

Stores every LSH key in a local :pyclass:set. If any key of the incoming document is already present, the document is marked as duplicate via document.is_rejected = True.

Limitations

State is per‑process only. Running multiple workers or machines will not share the key set – use RedisDeduplicator or RedisBloomDeduplicator for distributed setups.

InlineDeduplicator(**kwargs: Any)
294    def __init__(self, **kwargs: Any):
295        super().__init__(**kwargs)
296        self.hash_pool: set[str] = set()

Initialize the filter.

Parameters

p : float The probability of applying the filter. If p is 1, the filter will always be applied. skip_rejected : bool If True, the filter will skip documents that are already rejected. If you want to apply the filter to all documents (e.g., postprocess), set this to False. random_state : Optional[Union[int, np.random.Generator]] Seed for the random number generator. If None, a new random number generator will be created. If None, and use in the Compose class, the random state is shared with the Compose object. use_batch : bool If True, the filter will process documents in batches in the apply_stream method. batch_size : int The size of the batch to process documents in the apply_stream method. kwargs : Any Additional keyword arguments to pass to the filter.

def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
298    def apply(self, document: Document) -> Document:
299        """
300        Inline deduplication based on the LSH keys in the document.
301        This filter cannot use in the distributed environment because it uses a local hash pool.
302        """
303        lsh_keys = document.extras.get("dedup_lsh")
304        if lsh_keys is None:
305            raise ValueError(
306                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
307            )
308
309        for lsh in lsh_keys:
310            if lsh in self.hash_pool:
311                document.is_rejected = True
312            else:
313                self.hash_pool.add(lsh)
314        return document

Inline deduplication based on the LSH keys in the document. This filter cannot use in the distributed environment because it uses a local hash pool.

class RedisDeduplicator(hojichar.core.filter_interface.Filter):
317class RedisDeduplicator(Filter):
318    """
319    Distributed deduplicator using **plain Redis keys**.
320    You have to run a Redis server and pass its connection parameters.
321    """
322
323    def __init__(
324        self,
325        *,
326        host: str = "localhost",
327        port: int = 6379,
328        db: int = 0,
329        key_prefix: str = "dedup",
330        **kwargs: Any,
331    ) -> None:
332        """
333        Initialize the Redis deduplicator.
334        Args:
335            host (str): Redis server hostname.
336            port (int): Redis server port.
337            db (int): Redis database number.
338            key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task.
339            **kwargs: Additional keyword arguments for parent Filter.
340        """
341        if not is_loaded_dedup:
342            raise ImportError(IS_LOADED_DEDUP_ERROR_MSG)
343        super().__init__(**kwargs)
344        self.rds = redis.Redis(host=host, port=port, db=db, decode_responses=False)
345        self.key_prefix = key_prefix.encode()
346
347        try:
348            self.rds.ping()
349        except redis.exceptions.RedisError as exc:
350            raise RuntimeError(f"Cannot connect to Redis server {host}:{port}/{db}") from exc
351
352    def apply(self, document: Document) -> Document:
353        lsh_keys = document.extras.get("dedup_lsh")
354        if lsh_keys is None:
355            raise ValueError("Apply GenerateDedupLSH first")
356
357        pipe = self.rds.pipeline(transaction=False)
358        for k in lsh_keys:
359            pipe.set(self.key_prefix + b":" + k.encode(), b"1", nx=True)
360        results: list[bool | None] = pipe.execute()  # If instance already exists, it returns None
361
362        if any(r is None for r in results):
363            document.is_rejected = True
364        return document

Distributed deduplicator using plain Redis keys. You have to run a Redis server and pass its connection parameters.

RedisDeduplicator( *, host: str = 'localhost', port: int = 6379, db: int = 0, key_prefix: str = 'dedup', **kwargs: Any)
323    def __init__(
324        self,
325        *,
326        host: str = "localhost",
327        port: int = 6379,
328        db: int = 0,
329        key_prefix: str = "dedup",
330        **kwargs: Any,
331    ) -> None:
332        """
333        Initialize the Redis deduplicator.
334        Args:
335            host (str): Redis server hostname.
336            port (int): Redis server port.
337            db (int): Redis database number.
338            key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task.
339            **kwargs: Additional keyword arguments for parent Filter.
340        """
341        if not is_loaded_dedup:
342            raise ImportError(IS_LOADED_DEDUP_ERROR_MSG)
343        super().__init__(**kwargs)
344        self.rds = redis.Redis(host=host, port=port, db=db, decode_responses=False)
345        self.key_prefix = key_prefix.encode()
346
347        try:
348            self.rds.ping()
349        except redis.exceptions.RedisError as exc:
350            raise RuntimeError(f"Cannot connect to Redis server {host}:{port}/{db}") from exc

Initialize the Redis deduplicator. Args: host (str): Redis server hostname. port (int): Redis server port. db (int): Redis database number. key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. **kwargs: Additional keyword arguments for parent Filter.

def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
352    def apply(self, document: Document) -> Document:
353        lsh_keys = document.extras.get("dedup_lsh")
354        if lsh_keys is None:
355            raise ValueError("Apply GenerateDedupLSH first")
356
357        pipe = self.rds.pipeline(transaction=False)
358        for k in lsh_keys:
359            pipe.set(self.key_prefix + b":" + k.encode(), b"1", nx=True)
360        results: list[bool | None] = pipe.execute()  # If instance already exists, it returns None
361
362        if any(r is None for r in results):
363            document.is_rejected = True
364        return document

Definition of filter behavior.

The document must have a protocol TextContent, and mostly used hojichar.Document class.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

class RedisBloomDeduplicator(hojichar.core.filter_interface.Filter):
367class RedisBloomDeduplicator(Filter):
368    """
369    Distributed deduplicator backed by **RedisBloom scalable Bloom filters**.
370    You can use this filter to store-LSHs with less memory than Redis keys with the risk of false positives.
371
372    Each *band* gets its own scalable Bloom filter on the Redis side:
373
374    ```text
375    BF.RESERVE <prefix>:<band_idx> <error> <capacity> EXPANSION <n>
376    ```
377
378    """
379
380    def __init__(
381        self,
382        *,
383        expected_docs: int,
384        host: str = "localhost",
385        port: int = 6379,
386        db: int = 0,
387        key_prefix: str = "bloomdedup",
388        error_rate: float = 1e-7,
389        expansion: int = 2,
390        num_bands: int | None = None,
391        **kwargs: Any,
392    ):
393        """
394        Initialize the RedisBloom deduplicator.
395        Args:
396            expected_docs (int): Expected number of documents to deduplicate. This is used to set the initial capacity of the Bloom filter.
397            host (str): Redis server hostname.
398            port (int): Redis server port.
399            db (int): Redis database number.
400            key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task.
401            error_rate (float): Desired error rate for the Bloom filter.
402            expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically.
403            num_bands (int | None): Number of bands to use for LSH to calculate the capacity of BloomFilter. If None, it will be set to 32.
404            **kwargs: Additional keyword arguments for parent Filter.
405        """
406        if not is_loaded_dedup:
407            raise ImportError(IS_LOADED_DEDUP_ERROR_MSG)
408        super().__init__(**kwargs)
409        self.rds = redis.Redis(host=host, port=port, db=db)
410        self.key_prefix = key_prefix.encode()
411
412        _num_bands = num_bands if num_bands is not None else 32
413
414        try:
415            self.rds.execute_command(
416                "BF.RESERVE",
417                self.key_prefix,
418                error_rate,
419                expected_docs * _num_bands,
420                "EXPANSION",
421                expansion,
422            )
423        except redis.ResponseError as e:
424            if "exists" not in str(e):
425                raise
426
427    def apply(self, document: Document) -> Document:
428        lsh_keys: list[str] | None = document.extras.get("dedup_lsh")
429        if lsh_keys is None:
430            raise ValueError(
431                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
432            )
433
434        key_bytes = [k.encode() for k in lsh_keys]
435
436        # Return value of BF.MADD is [1,0,1,...] (0 = already exists, 1 = insertion successful)
437        flags: Iterable[int] = self.rds.execute_command("BF.MADD", self.key_prefix, *key_bytes)
438        if 0 in flags:
439            document.is_rejected = True
440        return document

Distributed deduplicator backed by RedisBloom scalable Bloom filters. You can use this filter to store-LSHs with less memory than Redis keys with the risk of false positives.

Each band gets its own scalable Bloom filter on the Redis side:

BF.RESERVE <prefix>:<band_idx> <error> <capacity> EXPANSION <n>
RedisBloomDeduplicator( *, expected_docs: int, host: str = 'localhost', port: int = 6379, db: int = 0, key_prefix: str = 'bloomdedup', error_rate: float = 1e-07, expansion: int = 2, num_bands: int | None = None, **kwargs: Any)
380    def __init__(
381        self,
382        *,
383        expected_docs: int,
384        host: str = "localhost",
385        port: int = 6379,
386        db: int = 0,
387        key_prefix: str = "bloomdedup",
388        error_rate: float = 1e-7,
389        expansion: int = 2,
390        num_bands: int | None = None,
391        **kwargs: Any,
392    ):
393        """
394        Initialize the RedisBloom deduplicator.
395        Args:
396            expected_docs (int): Expected number of documents to deduplicate. This is used to set the initial capacity of the Bloom filter.
397            host (str): Redis server hostname.
398            port (int): Redis server port.
399            db (int): Redis database number.
400            key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task.
401            error_rate (float): Desired error rate for the Bloom filter.
402            expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically.
403            num_bands (int | None): Number of bands to use for LSH to calculate the capacity of BloomFilter. If None, it will be set to 32.
404            **kwargs: Additional keyword arguments for parent Filter.
405        """
406        if not is_loaded_dedup:
407            raise ImportError(IS_LOADED_DEDUP_ERROR_MSG)
408        super().__init__(**kwargs)
409        self.rds = redis.Redis(host=host, port=port, db=db)
410        self.key_prefix = key_prefix.encode()
411
412        _num_bands = num_bands if num_bands is not None else 32
413
414        try:
415            self.rds.execute_command(
416                "BF.RESERVE",
417                self.key_prefix,
418                error_rate,
419                expected_docs * _num_bands,
420                "EXPANSION",
421                expansion,
422            )
423        except redis.ResponseError as e:
424            if "exists" not in str(e):
425                raise

Initialize the RedisBloom deduplicator. Args: expected_docs (int): Expected number of documents to deduplicate. This is used to set the initial capacity of the Bloom filter. host (str): Redis server hostname. port (int): Redis server port. db (int): Redis database number. key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. error_rate (float): Desired error rate for the Bloom filter. expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically. num_bands (int | None): Number of bands to use for LSH to calculate the capacity of BloomFilter. If None, it will be set to 32. **kwargs: Additional keyword arguments for parent Filter.

def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
427    def apply(self, document: Document) -> Document:
428        lsh_keys: list[str] | None = document.extras.get("dedup_lsh")
429        if lsh_keys is None:
430            raise ValueError(
431                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
432            )
433
434        key_bytes = [k.encode() for k in lsh_keys]
435
436        # Return value of BF.MADD is [1,0,1,...] (0 = already exists, 1 = insertion successful)
437        flags: Iterable[int] = self.rds.execute_command("BF.MADD", self.key_prefix, *key_bytes)
438        if 0 in flags:
439            document.is_rejected = True
440        return document

Definition of filter behavior.

The document must have a protocol TextContent, and mostly used hojichar.Document class.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

class InlineDuplicateAnalyzer(hojichar.core.filter_interface.Filter):
443class InlineDuplicateAnalyzer(Filter):
444    def __init__(self, **kwargs: Any):
445        super().__init__(**kwargs)
446        self.docs: dict[int, Document] = dict()  # doc_id -> Document mapping
447        self.hash_pool: dict[str, int] = dict()  # LSH key -> doc_id mapping
448
449        self._current_doc_id = 0
450
451    def apply(self, document: Document) -> Document:
452        """
453        Analyze duplicates inline based on the LSH keys in the document.
454        This filter cannot use in the distributed environment because it uses a local hash pool.
455        """
456        lsh_keys = document.extras.get("dedup_lsh")
457        if lsh_keys is None:
458            raise ValueError(
459                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
460            )
461
462        for lsh in lsh_keys:
463            if lsh in self.hash_pool:
464                document.is_rejected = True
465                document.extras["similar_doc"] = self.docs[self.hash_pool[lsh]].text
466            else:
467                self.hash_pool[lsh] = self._current_doc_id
468                self.docs[self._current_doc_id] = document
469
470        self._current_doc_id += 1
471        return document

Base class for all filters. Document-level filters must inherit from this class.

The definition of text processing is in apply method. If you define a new filter, override the method.

When this class is called, apply the filter from string to string.

With context manager, you can use the filter as follows:

with YourFilter(p=0.5) as filt:
    text = filt("This is a sample text.")
InlineDuplicateAnalyzer(**kwargs: Any)
444    def __init__(self, **kwargs: Any):
445        super().__init__(**kwargs)
446        self.docs: dict[int, Document] = dict()  # doc_id -> Document mapping
447        self.hash_pool: dict[str, int] = dict()  # LSH key -> doc_id mapping
448
449        self._current_doc_id = 0

Initialize the filter.

Parameters

p : float The probability of applying the filter. If p is 1, the filter will always be applied. skip_rejected : bool If True, the filter will skip documents that are already rejected. If you want to apply the filter to all documents (e.g., postprocess), set this to False. random_state : Optional[Union[int, np.random.Generator]] Seed for the random number generator. If None, a new random number generator will be created. If None, and use in the Compose class, the random state is shared with the Compose object. use_batch : bool If True, the filter will process documents in batches in the apply_stream method. batch_size : int The size of the batch to process documents in the apply_stream method. kwargs : Any Additional keyword arguments to pass to the filter.

def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
451    def apply(self, document: Document) -> Document:
452        """
453        Analyze duplicates inline based on the LSH keys in the document.
454        This filter cannot use in the distributed environment because it uses a local hash pool.
455        """
456        lsh_keys = document.extras.get("dedup_lsh")
457        if lsh_keys is None:
458            raise ValueError(
459                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
460            )
461
462        for lsh in lsh_keys:
463            if lsh in self.hash_pool:
464                document.is_rejected = True
465                document.extras["similar_doc"] = self.docs[self.hash_pool[lsh]].text
466            else:
467                self.hash_pool[lsh] = self._current_doc_id
468                self.docs[self._current_doc_id] = document
469
470        self._current_doc_id += 1
471        return document

Analyze duplicates inline based on the LSH keys in the document. This filter cannot use in the distributed environment because it uses a local hash pool.