hojichar.filters.deduplication

This module provides filters for deduplication using MinHash and Locality-Sensitive Hashing (LSH). If you want to use this module, install hojichar via pip install 'hojichar[dedup]'

What is MinHash LSH?

A gentle introduction for first‑time leaner for MinHash LSH


1 What problem does this solve?

When you have millions of documents it is too expensive to compare every pair directly. MinHash + Locality‑Sensitive Hashing (LSH) lets you

  • estimate Jaccard similarity extremely fast, and memory‑efficiently
  • retrieve near‑duplicates in sub‑linear time.

In practice you can keep a single set or Redis index of the generated LSH keys and ask: "Does any existing document share at least one LSH key with mine?"

If the answer is yes the two documents are almost certainly similar; if no they are very likely different.

How the pipeline works

GenerateDedupLSH filter generates LSH keys for each document.

  1. Tokenize
    • Split text into tokens. (Default: character‑level, but you can plug in any callable.)
  2. n-grams
    • Group tokens into n‑grams (n_grams=5) to capture context.
  3. MinHash
    • Hash each n‑gram with num_perm independent permutations and keep only the minimum value. The resulting signature is a vector of num_perm 32‑bit integers.
    • This module uses rensa.RMinHash which is a fast MinHash implementation by Rust language.
  4. Banding and compression
    • Split the signature into b bands, each containing r integers (num_perm ≈ b×r).
    • Treat the r integers as raw bytes, hash them with xxhash‑128, and format as +.
  5. Output
    • Store all band keys in document.extras['dedup_lsh'] as a list of strings.

InlineDeduplicator, RedisDeduplicator, and RedisBloomDeduplicator filters use the generated LSH keys to mark documents as duplicates.

  • InlineDeduplicator stores LSH keys in a local set, so it works only in a single process.
  • RedisDeduplicator stores LSH keys in Redis, so it works in a distributed environment.
  • RedisBloomDeduplicator stores LSH keys in RedisBloom, which is a scalable Bloom filter. It uses less memory than Redis keys but may return false positives.
  1"""
  2This module provides filters for deduplication using MinHash and Locality-Sensitive Hashing (LSH).
  3If you want to use this module, install hojichar via `pip install 'hojichar[dedup]'`
  4
  5## What is MinHash LSH?
  6*A gentle introduction for first‑time leaner for MinHash LSH*
  7
  8---
  9
 10### 1 What problem does this solve?
 11
 12When you have **millions of documents** it is too expensive to compare every pair directly.
 13**MinHash + Locality‑Sensitive Hashing (LSH)** lets you
 14
 15- estimate Jaccard similarity extremely fast, and memory‑efficiently
 16- retrieve **near‑duplicates** in sub‑linear time.
 17
 18In practice you can keep a single `set` or Redis index of the generated **LSH keys** and ask:
 19"Does any existing document share *at least one* LSH key with mine?"
 20
 21If the answer is yes the two documents are almost certainly similar; if no they are very likely different.
 22
 23### How the pipeline works
 24
 25`GenerateDedupLSH` filter generates LSH keys for each document.
 26
 271. Tokenize
 28  - Split text into tokens. (Default: character‑level, but you can plug in any callable.)
 292. n-grams
 30  - Group tokens into n‑grams (n_grams=5) to capture context.
 313. MinHash
 32  - Hash each n‑gram with `num_perm` independent permutations and keep only the minimum value. The resulting **signature** is a vector of num_perm 32‑bit integers.
 33  - This module uses `rensa.RMinHash` which is a fast MinHash implementation by Rust language.
 344. Banding and compression
 35  - Split the signature into b bands, each containing `r` integers (num_perm ≈ b×r).
 36  - Treat the `r` integers as raw bytes, hash them with xxhash‑128, and format as <band_idx>+<digest32hex>.
 375. Output
 38  - Store all band keys in `document.extras['dedup_lsh']` as a list of strings.
 39
 40`InlineDeduplicator`, `RedisDeduplicator`, and `RedisBloomDeduplicator` filters use the generated LSH keys to mark documents as duplicates.
 41
 42- `InlineDeduplicator` stores LSH keys in a local set, so it works only in a single process.
 43- `RedisDeduplicator` stores LSH keys in Redis, so it works in a distributed environment.
 44- `RedisBloomDeduplicator` stores LSH keys in RedisBloom, which is a scalable Bloom filter. It uses less memory than Redis keys but may return false positives.
 45
 46"""
 47
 48from __future__ import annotations
 49
 50import importlib
 51import re
 52from typing import Any, Callable, Final, Iterable, Optional
 53
 54import numpy as np
 55import redis
 56import xxhash
 57from datasketch.lsh import _optimal_param  # type: ignore
 58from nltk.util import ngrams  # type: ignore
 59from numpy.typing import NDArray
 60from rensa import RMinHash  # type: ignore
 61
 62from hojichar import Document, Filter
 63
 64_japanese_tagger: Optional["fugashi.Tagger"] = None  # type: ignore[name-defined] # noqa: F821
 65NON_ALPHA = re.compile("[^A-Za-z_0-9]")
 66
 67
 68def char_level_splitter(text: str) -> list[str]:
 69    """
 70    Split the text into characters.
 71    This is a simple implementation that splits the text into individual characters.
 72    """
 73    return list(text)
 74
 75
 76def non_alpha_num_splitter(text: str) -> list[str]:
 77    """
 78    Split the text into alphanumeric tokens.
 79    This is a simple implementation that splits on non-alphanumeric characters.
 80    """
 81    return [token for token in NON_ALPHA.split(text) if token]
 82
 83
 84def japanese_word_splitter(text: str) -> list[str]:
 85    """
 86    Split the text into Japanese words using fugashi.
 87    This will import fugashi and instantiate Tagger on first use.
 88    """
 89    global _japanese_tagger
 90    if _japanese_tagger is None:
 91        fugashi = importlib.import_module("fugashi")
 92        _japanese_tagger = fugashi.Tagger()
 93    return [token.surface for token in _japanese_tagger(text)]
 94
 95
 96class GenerateDedupLSH(Filter):
 97    """
 98    Filter that uses MinHash + Locality-Sensitive Hashing (LSH) to assign
 99    deduplication keys to documents, allowing fast near-duplicate detection.
100
101    Attributes:
102        num_perm (int): Number of permutations (hash functions) for MinHash.
103        threshold (float): Similarity threshold for tuning LSH parameters.
104        tokenizer (Callable[[str], Iterable[str]]): Function to tokenize text.
105        n_grams (int): n-gram size for token grouping.
106        seed (int): Random seed for MinHash.
107        num_bands (int): Number of LSH bands computed from threshold and num_perm.
108        band_size (int): Number of hashes per band.
109
110    Notes
111    -----
112    `_optimal_param` searches for the optimal number of **bands** (`b`) and
113    **rows per band** (`r`) that minimise a weighted sum of false positives /
114    false negatives at the specified *threshold*.
115    """
116
117    _BYTES_PER_U32: Final[int] = 4
118
119    def __init__(
120        self,
121        num_perm: int = 500,
122        threshold: float = 0.8,
123        tokenizer: Callable[[str], Iterable[str]] = char_level_splitter,
124        n_grams: int = 5,
125        seed: int = 42,
126        **kwargs: Any,
127    ) -> None:
128        """
129        Initialize the deduplication filter with MinHash and LSH settings.
130
131        Args:
132            num_perm: Number of hash permutations for MinHash signature length.
133            threshold: Similarity threshold to decide optimal LSH parameters.
134            tokenizer: Function to split text into tokens.
135            n_grams: Number of tokens per n-gram for MinHash update.
136            seed: Seed for hash permutation consistency.
137            **kwargs: Additional keyword arguments for parent Filter.
138        """
139        super().__init__(**kwargs)
140        self.num_perm = num_perm
141        self.threshold = threshold
142        self.tokenizer = tokenizer
143        self.n_grams = n_grams
144        self.seed = seed
145
146        # Compute optimal number of bands and band size based on threshold
147        self.num_bands, self.band_size = _optimal_param(
148            threshold=self.threshold,
149            num_perm=self.num_perm,
150            false_negative_weight=0.5,
151            false_positive_weight=0.5,
152        )
153
154    def calculate_minhash_signature(self, text: str) -> NDArray[np.uint32]:
155        """
156        Compute MinHash signature of input text as an array of uint32.
157
158        Steps:
159            1. Tokenize text using the provided tokenizer.
160            2. Generate n-gram tokens.
161            3. Update MinHash with n-gram tokens.
162
163        Args:
164            text: Input document text to be hashed.
165
166        Returns:
167            A 1D numpy array of shape (num_perm,) with dtype uint32.
168        """
169        tokens = self.tokenizer(text)
170        n_gram_tokens = ngrams(tokens, self.n_grams)
171        # Join tokens into string n-grams for hashing
172        tokens = [" ".join(grams) for grams in n_gram_tokens]
173        # Initialize and update RMinHash
174        minhash = RMinHash(num_perm=self.num_perm, seed=self.seed)
175        minhash.update(tokens)
176        # Convert digest (list of ints) to numpy uint32 array
177        return np.asarray(minhash.digest(), dtype=np.uint32)
178
179    def signature_to_lsh_digest(
180        self, signature: NDArray[np.uint32], band_size: int, band_idx: int
181    ) -> int:
182        """
183        Convert a slice of the MinHash signature into an LSH digest with less memory overhead.
184
185        This method is optimized for speed by avoiding copies:
186        - We view the uint32 array as raw bytes (uint8 view).
187        - We create a memoryview of the byte slice for the specified band.
188        - We compute a 128-bit hash directly on the slice.
189
190        Args:
191            signature: 1D numpy array of uint32 representing MinHash signature.
192            band_size: Number of hashes per LSH band.
193            band_idx: Index of the band to hash (0-based).
194
195        Returns:
196            An integer representing the 128-bit hash digest of the band.
197
198        Raises:
199            AssertionError: If signature shape/dtype or band index is invalid.
200        """
201        assert signature.dtype == np.uint32 and signature.ndim == 1, (
202            "signature must be a 1D numpy array of uint32"
203        )
204        assert 0 <= band_idx < self.num_bands, (
205            f"band_idx {band_idx} out of range [0, {self.num_bands})"
206        )
207        assert len(signature) >= band_size * self.num_bands, (
208            "signature length is too short for given band_size and num_bands"
209        )
210
211        # Compute byte offsets for the selected band
212        start = band_idx * band_size * self._BYTES_PER_U32
213        stop = start + band_size * self._BYTES_PER_U32
214
215        # View signature as raw bytes without copy. memoryview avoids creating new bytes.
216        u8 = signature.view(np.uint8)
217        mv = memoryview(u8)[start:stop]  # type: ignore[arg-type]
218
219        return xxhash.xxh128_intdigest(mv)
220
221    def _format_lsh_key(self, band_idx: int, digest: int) -> str:
222        """
223        Format the LSH key for a given band index and digest.
224        """
225        return f"{band_idx}+{digest:032x}"
226
227    def apply(self, document: Document) -> Document:
228        """
229        Decorate the document with LSH deduplication keys.
230
231        For each band, compute the digest and format as a hex string:
232            '<band_idx>+<128-bit-digest-hex>'.
233        Keys are stored in document.extras['dedup_lsh'].
234
235        Args:
236            document: Document object with 'text' attribute.
237
238        Returns:
239            The same Document object with 'dedup_lsh' added in extras.
240        """
241        signature = self.calculate_minhash_signature(document.text)
242        lsh_keys = [
243            self._format_lsh_key(
244                band_idx, self.signature_to_lsh_digest(signature, self.band_size, band_idx)
245            )
246            for band_idx in range(self.num_bands)
247        ]
248
249        document.extras["dedup_lsh"] = lsh_keys
250        return document
251
252
253class InlineDeduplicator(Filter):
254    """
255    Simple in‑memory deduplicator.
256
257    Stores every LSH key in a local :pyclass:`set`. If any key of the incoming
258    document is already present, the document is marked as duplicate via
259    `document.is_rejected = True`.
260
261    **Limitations**
262    -------------
263    *State is per‑process only.* Running multiple workers or machines will *not*
264    share the key set – use :class:`RedisDeduplicator` or
265    :class:`RedisBloomDeduplicator` for distributed setups.
266    """
267
268    def __init__(self, **kwargs: Any):
269        super().__init__(**kwargs)
270        self.hash_pool: set[str] = set()
271
272    def apply(self, document: Document) -> Document:
273        """
274        Inline deduplication based on the LSH keys in the document.
275        This filter cannot use in the distributed environment because it uses a local hash pool.
276        """
277        lsh_keys = document.extras.get("dedup_lsh")
278        if lsh_keys is None:
279            raise ValueError(
280                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
281            )
282
283        for lsh in lsh_keys:
284            if lsh in self.hash_pool:
285                document.is_rejected = True
286            else:
287                self.hash_pool.add(lsh)
288        return document
289
290
291class RedisDeduplicator(Filter):
292    """
293    Distributed deduplicator using **plain Redis keys**.
294    You have to run a Redis server and pass its connection parameters.
295    """
296
297    def __init__(
298        self,
299        *,
300        host: str = "localhost",
301        port: int = 6379,
302        db: int = 0,
303        key_prefix: str = "dedup",
304        **kwargs: Any,
305    ) -> None:
306        """
307        Initialize the Redis deduplicator.
308        Args:
309            host (str): Redis server hostname.
310            port (int): Redis server port.
311            db (int): Redis database number.
312            key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task.
313            **kwargs: Additional keyword arguments for parent Filter.
314        """
315        super().__init__(**kwargs)
316        self.rds = redis.Redis(host=host, port=port, db=db, decode_responses=False)
317        self.key_prefix = key_prefix.encode()
318
319        try:
320            self.rds.ping()
321        except redis.exceptions.RedisError as exc:
322            raise RuntimeError(f"Cannot connect to Redis server {host}:{port}/{db}") from exc
323
324    def apply(self, document: Document) -> Document:
325        lsh_keys = document.extras.get("dedup_lsh")
326        if lsh_keys is None:
327            raise ValueError("Apply GenerateDedupLSH first")
328
329        pipe = self.rds.pipeline(transaction=False)
330        for k in lsh_keys:
331            pipe.set(self.key_prefix + b":" + k.encode(), b"1", nx=True)
332        results: list[bool | None] = pipe.execute()  # If instance already exists, it returns None
333
334        if any(r is None for r in results):
335            document.is_rejected = True
336        return document
337
338
339class RedisBloomDeduplicator(Filter):
340    """
341    Distributed deduplicator backed by **RedisBloom scalable Bloom filters**.
342    You can use this filter to store-LSHs with less memory than Redis keys with the risk of false positives.
343
344    Each *band* gets its own scalable Bloom filter on the Redis side:
345
346    ```text
347    BF.RESERVE <prefix>:<band_idx> <error> <capacity> EXPANSION <n>
348    ```
349
350    """
351
352    def __init__(
353        self,
354        *,
355        host: str = "localhost",
356        port: int = 6379,
357        db: int = 0,
358        key_prefix: str = "bloomdedup",
359        error_rate: float = 1e-4,
360        capacity: int = 1_000_000_000,
361        expansion: int = 2,
362        **kwargs: Any,
363    ):
364        """
365        Initialize the RedisBloom deduplicator.
366        Args:
367            host (str): Redis server hostname.
368            port (int): Redis server port.
369            db (int): Redis database number.
370            key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task.
371            error_rate (float): Desired error rate for the Bloom filter.
372            capacity (int): Initial capacity of the Bloom filter. Guide: set it to the expected number of unique LSH keys ~ num_doc * num_bands.
373            expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically.
374            **kwargs: Additional keyword arguments for parent Filter.
375        """
376        super().__init__(**kwargs)
377        self.rds = redis.Redis(host=host, port=port, db=db)
378        self.key_prefix = key_prefix.encode()
379
380        try:
381            self.rds.execute_command(
382                "BF.RESERVE",
383                self.key_prefix,
384                error_rate,
385                capacity,
386                "EXPANSION",
387                expansion,
388            )
389        except redis.ResponseError as e:
390            if "exists" not in str(e):
391                raise
392
393    def apply(self, document: Document) -> Document:
394        lsh_keys: list[str] | None = document.extras.get("dedup_lsh")
395        if lsh_keys is None:
396            raise ValueError(
397                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
398            )
399
400        key_bytes = [k.encode() for k in lsh_keys]
401
402        # Return value of BF.MADD is [1,0,1,...] (0 = already exists, 1 = insertion successful)
403        flags: Iterable[int] = self.rds.execute_command("BF.MADD", self.key_prefix, *key_bytes)
404        if 0 in flags:
405            document.is_rejected = True
406        return document
407
408
409class InlineDuplicateAnalyzer(Filter):
410    def __init__(self, **kwargs: Any):
411        super().__init__(**kwargs)
412        self.docs: dict[int, Document] = dict()  # doc_id -> Document mapping
413        self.hash_pool: dict[str, int] = dict()  # LSH key -> doc_id mapping
414
415        self._current_doc_id = 0
416
417    def apply(self, document: Document) -> Document:
418        """
419        Analyze duplicates inline based on the LSH keys in the document.
420        This filter cannot use in the distributed environment because it uses a local hash pool.
421        """
422        lsh_keys = document.extras.get("dedup_lsh")
423        if lsh_keys is None:
424            raise ValueError(
425                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
426            )
427
428        for lsh in lsh_keys:
429            if lsh in self.hash_pool:
430                document.is_rejected = True
431                document.extras["similar_doc"] = self.docs[self.hash_pool[lsh]].text
432            else:
433                self.hash_pool[lsh] = self._current_doc_id
434                self.docs[self._current_doc_id] = document
435
436        self._current_doc_id += 1
437        return document
def char_level_splitter(text: str) -> list[str]:
69def char_level_splitter(text: str) -> list[str]:
70    """
71    Split the text into characters.
72    This is a simple implementation that splits the text into individual characters.
73    """
74    return list(text)

Split the text into characters. This is a simple implementation that splits the text into individual characters.

def non_alpha_num_splitter(text: str) -> list[str]:
77def non_alpha_num_splitter(text: str) -> list[str]:
78    """
79    Split the text into alphanumeric tokens.
80    This is a simple implementation that splits on non-alphanumeric characters.
81    """
82    return [token for token in NON_ALPHA.split(text) if token]

Split the text into alphanumeric tokens. This is a simple implementation that splits on non-alphanumeric characters.

def japanese_word_splitter(text: str) -> list[str]:
85def japanese_word_splitter(text: str) -> list[str]:
86    """
87    Split the text into Japanese words using fugashi.
88    This will import fugashi and instantiate Tagger on first use.
89    """
90    global _japanese_tagger
91    if _japanese_tagger is None:
92        fugashi = importlib.import_module("fugashi")
93        _japanese_tagger = fugashi.Tagger()
94    return [token.surface for token in _japanese_tagger(text)]

Split the text into Japanese words using fugashi. This will import fugashi and instantiate Tagger on first use.

class GenerateDedupLSH(hojichar.core.filter_interface.Filter):
 97class GenerateDedupLSH(Filter):
 98    """
 99    Filter that uses MinHash + Locality-Sensitive Hashing (LSH) to assign
100    deduplication keys to documents, allowing fast near-duplicate detection.
101
102    Attributes:
103        num_perm (int): Number of permutations (hash functions) for MinHash.
104        threshold (float): Similarity threshold for tuning LSH parameters.
105        tokenizer (Callable[[str], Iterable[str]]): Function to tokenize text.
106        n_grams (int): n-gram size for token grouping.
107        seed (int): Random seed for MinHash.
108        num_bands (int): Number of LSH bands computed from threshold and num_perm.
109        band_size (int): Number of hashes per band.
110
111    Notes
112    -----
113    `_optimal_param` searches for the optimal number of **bands** (`b`) and
114    **rows per band** (`r`) that minimise a weighted sum of false positives /
115    false negatives at the specified *threshold*.
116    """
117
118    _BYTES_PER_U32: Final[int] = 4
119
120    def __init__(
121        self,
122        num_perm: int = 500,
123        threshold: float = 0.8,
124        tokenizer: Callable[[str], Iterable[str]] = char_level_splitter,
125        n_grams: int = 5,
126        seed: int = 42,
127        **kwargs: Any,
128    ) -> None:
129        """
130        Initialize the deduplication filter with MinHash and LSH settings.
131
132        Args:
133            num_perm: Number of hash permutations for MinHash signature length.
134            threshold: Similarity threshold to decide optimal LSH parameters.
135            tokenizer: Function to split text into tokens.
136            n_grams: Number of tokens per n-gram for MinHash update.
137            seed: Seed for hash permutation consistency.
138            **kwargs: Additional keyword arguments for parent Filter.
139        """
140        super().__init__(**kwargs)
141        self.num_perm = num_perm
142        self.threshold = threshold
143        self.tokenizer = tokenizer
144        self.n_grams = n_grams
145        self.seed = seed
146
147        # Compute optimal number of bands and band size based on threshold
148        self.num_bands, self.band_size = _optimal_param(
149            threshold=self.threshold,
150            num_perm=self.num_perm,
151            false_negative_weight=0.5,
152            false_positive_weight=0.5,
153        )
154
155    def calculate_minhash_signature(self, text: str) -> NDArray[np.uint32]:
156        """
157        Compute MinHash signature of input text as an array of uint32.
158
159        Steps:
160            1. Tokenize text using the provided tokenizer.
161            2. Generate n-gram tokens.
162            3. Update MinHash with n-gram tokens.
163
164        Args:
165            text: Input document text to be hashed.
166
167        Returns:
168            A 1D numpy array of shape (num_perm,) with dtype uint32.
169        """
170        tokens = self.tokenizer(text)
171        n_gram_tokens = ngrams(tokens, self.n_grams)
172        # Join tokens into string n-grams for hashing
173        tokens = [" ".join(grams) for grams in n_gram_tokens]
174        # Initialize and update RMinHash
175        minhash = RMinHash(num_perm=self.num_perm, seed=self.seed)
176        minhash.update(tokens)
177        # Convert digest (list of ints) to numpy uint32 array
178        return np.asarray(minhash.digest(), dtype=np.uint32)
179
180    def signature_to_lsh_digest(
181        self, signature: NDArray[np.uint32], band_size: int, band_idx: int
182    ) -> int:
183        """
184        Convert a slice of the MinHash signature into an LSH digest with less memory overhead.
185
186        This method is optimized for speed by avoiding copies:
187        - We view the uint32 array as raw bytes (uint8 view).
188        - We create a memoryview of the byte slice for the specified band.
189        - We compute a 128-bit hash directly on the slice.
190
191        Args:
192            signature: 1D numpy array of uint32 representing MinHash signature.
193            band_size: Number of hashes per LSH band.
194            band_idx: Index of the band to hash (0-based).
195
196        Returns:
197            An integer representing the 128-bit hash digest of the band.
198
199        Raises:
200            AssertionError: If signature shape/dtype or band index is invalid.
201        """
202        assert signature.dtype == np.uint32 and signature.ndim == 1, (
203            "signature must be a 1D numpy array of uint32"
204        )
205        assert 0 <= band_idx < self.num_bands, (
206            f"band_idx {band_idx} out of range [0, {self.num_bands})"
207        )
208        assert len(signature) >= band_size * self.num_bands, (
209            "signature length is too short for given band_size and num_bands"
210        )
211
212        # Compute byte offsets for the selected band
213        start = band_idx * band_size * self._BYTES_PER_U32
214        stop = start + band_size * self._BYTES_PER_U32
215
216        # View signature as raw bytes without copy. memoryview avoids creating new bytes.
217        u8 = signature.view(np.uint8)
218        mv = memoryview(u8)[start:stop]  # type: ignore[arg-type]
219
220        return xxhash.xxh128_intdigest(mv)
221
222    def _format_lsh_key(self, band_idx: int, digest: int) -> str:
223        """
224        Format the LSH key for a given band index and digest.
225        """
226        return f"{band_idx}+{digest:032x}"
227
228    def apply(self, document: Document) -> Document:
229        """
230        Decorate the document with LSH deduplication keys.
231
232        For each band, compute the digest and format as a hex string:
233            '<band_idx>+<128-bit-digest-hex>'.
234        Keys are stored in document.extras['dedup_lsh'].
235
236        Args:
237            document: Document object with 'text' attribute.
238
239        Returns:
240            The same Document object with 'dedup_lsh' added in extras.
241        """
242        signature = self.calculate_minhash_signature(document.text)
243        lsh_keys = [
244            self._format_lsh_key(
245                band_idx, self.signature_to_lsh_digest(signature, self.band_size, band_idx)
246            )
247            for band_idx in range(self.num_bands)
248        ]
249
250        document.extras["dedup_lsh"] = lsh_keys
251        return document

Filter that uses MinHash + Locality-Sensitive Hashing (LSH) to assign deduplication keys to documents, allowing fast near-duplicate detection.

Attributes: num_perm (int): Number of permutations (hash functions) for MinHash. threshold (float): Similarity threshold for tuning LSH parameters. tokenizer (Callable[[str], Iterable[str]]): Function to tokenize text. n_grams (int): n-gram size for token grouping. seed (int): Random seed for MinHash. num_bands (int): Number of LSH bands computed from threshold and num_perm. band_size (int): Number of hashes per band.

Notes

_optimal_param searches for the optimal number of bands (b) and rows per band (r) that minimise a weighted sum of false positives / false negatives at the specified threshold.

GenerateDedupLSH( num_perm: int = 500, threshold: float = 0.8, tokenizer: Callable[[str], Iterable[str]] = <function char_level_splitter>, n_grams: int = 5, seed: int = 42, **kwargs: Any)
120    def __init__(
121        self,
122        num_perm: int = 500,
123        threshold: float = 0.8,
124        tokenizer: Callable[[str], Iterable[str]] = char_level_splitter,
125        n_grams: int = 5,
126        seed: int = 42,
127        **kwargs: Any,
128    ) -> None:
129        """
130        Initialize the deduplication filter with MinHash and LSH settings.
131
132        Args:
133            num_perm: Number of hash permutations for MinHash signature length.
134            threshold: Similarity threshold to decide optimal LSH parameters.
135            tokenizer: Function to split text into tokens.
136            n_grams: Number of tokens per n-gram for MinHash update.
137            seed: Seed for hash permutation consistency.
138            **kwargs: Additional keyword arguments for parent Filter.
139        """
140        super().__init__(**kwargs)
141        self.num_perm = num_perm
142        self.threshold = threshold
143        self.tokenizer = tokenizer
144        self.n_grams = n_grams
145        self.seed = seed
146
147        # Compute optimal number of bands and band size based on threshold
148        self.num_bands, self.band_size = _optimal_param(
149            threshold=self.threshold,
150            num_perm=self.num_perm,
151            false_negative_weight=0.5,
152            false_positive_weight=0.5,
153        )

Initialize the deduplication filter with MinHash and LSH settings.

Args: num_perm: Number of hash permutations for MinHash signature length. threshold: Similarity threshold to decide optimal LSH parameters. tokenizer: Function to split text into tokens. n_grams: Number of tokens per n-gram for MinHash update. seed: Seed for hash permutation consistency. **kwargs: Additional keyword arguments for parent Filter.

def calculate_minhash_signature(self, text: str) -> numpy.ndarray[typing.Any, numpy.dtype[numpy.uint32]]:
155    def calculate_minhash_signature(self, text: str) -> NDArray[np.uint32]:
156        """
157        Compute MinHash signature of input text as an array of uint32.
158
159        Steps:
160            1. Tokenize text using the provided tokenizer.
161            2. Generate n-gram tokens.
162            3. Update MinHash with n-gram tokens.
163
164        Args:
165            text: Input document text to be hashed.
166
167        Returns:
168            A 1D numpy array of shape (num_perm,) with dtype uint32.
169        """
170        tokens = self.tokenizer(text)
171        n_gram_tokens = ngrams(tokens, self.n_grams)
172        # Join tokens into string n-grams for hashing
173        tokens = [" ".join(grams) for grams in n_gram_tokens]
174        # Initialize and update RMinHash
175        minhash = RMinHash(num_perm=self.num_perm, seed=self.seed)
176        minhash.update(tokens)
177        # Convert digest (list of ints) to numpy uint32 array
178        return np.asarray(minhash.digest(), dtype=np.uint32)

Compute MinHash signature of input text as an array of uint32.

Steps: 1. Tokenize text using the provided tokenizer. 2. Generate n-gram tokens. 3. Update MinHash with n-gram tokens.

Args: text: Input document text to be hashed.

Returns: A 1D numpy array of shape (num_perm,) with dtype uint32.

def signature_to_lsh_digest( self, signature: numpy.ndarray[typing.Any, numpy.dtype[numpy.uint32]], band_size: int, band_idx: int) -> int:
180    def signature_to_lsh_digest(
181        self, signature: NDArray[np.uint32], band_size: int, band_idx: int
182    ) -> int:
183        """
184        Convert a slice of the MinHash signature into an LSH digest with less memory overhead.
185
186        This method is optimized for speed by avoiding copies:
187        - We view the uint32 array as raw bytes (uint8 view).
188        - We create a memoryview of the byte slice for the specified band.
189        - We compute a 128-bit hash directly on the slice.
190
191        Args:
192            signature: 1D numpy array of uint32 representing MinHash signature.
193            band_size: Number of hashes per LSH band.
194            band_idx: Index of the band to hash (0-based).
195
196        Returns:
197            An integer representing the 128-bit hash digest of the band.
198
199        Raises:
200            AssertionError: If signature shape/dtype or band index is invalid.
201        """
202        assert signature.dtype == np.uint32 and signature.ndim == 1, (
203            "signature must be a 1D numpy array of uint32"
204        )
205        assert 0 <= band_idx < self.num_bands, (
206            f"band_idx {band_idx} out of range [0, {self.num_bands})"
207        )
208        assert len(signature) >= band_size * self.num_bands, (
209            "signature length is too short for given band_size and num_bands"
210        )
211
212        # Compute byte offsets for the selected band
213        start = band_idx * band_size * self._BYTES_PER_U32
214        stop = start + band_size * self._BYTES_PER_U32
215
216        # View signature as raw bytes without copy. memoryview avoids creating new bytes.
217        u8 = signature.view(np.uint8)
218        mv = memoryview(u8)[start:stop]  # type: ignore[arg-type]
219
220        return xxhash.xxh128_intdigest(mv)

Convert a slice of the MinHash signature into an LSH digest with less memory overhead.

This method is optimized for speed by avoiding copies:

  • We view the uint32 array as raw bytes (uint8 view).
  • We create a memoryview of the byte slice for the specified band.
  • We compute a 128-bit hash directly on the slice.

Args: signature: 1D numpy array of uint32 representing MinHash signature. band_size: Number of hashes per LSH band. band_idx: Index of the band to hash (0-based).

Returns: An integer representing the 128-bit hash digest of the band.

Raises: AssertionError: If signature shape/dtype or band index is invalid.

def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
228    def apply(self, document: Document) -> Document:
229        """
230        Decorate the document with LSH deduplication keys.
231
232        For each band, compute the digest and format as a hex string:
233            '<band_idx>+<128-bit-digest-hex>'.
234        Keys are stored in document.extras['dedup_lsh'].
235
236        Args:
237            document: Document object with 'text' attribute.
238
239        Returns:
240            The same Document object with 'dedup_lsh' added in extras.
241        """
242        signature = self.calculate_minhash_signature(document.text)
243        lsh_keys = [
244            self._format_lsh_key(
245                band_idx, self.signature_to_lsh_digest(signature, self.band_size, band_idx)
246            )
247            for band_idx in range(self.num_bands)
248        ]
249
250        document.extras["dedup_lsh"] = lsh_keys
251        return document

Decorate the document with LSH deduplication keys.

For each band, compute the digest and format as a hex string: '+<128-bit-digest-hex>'. Keys are stored in document.extras['dedup_lsh'].

Args: document: Document object with 'text' attribute.

Returns: The same Document object with 'dedup_lsh' added in extras.

class InlineDeduplicator(hojichar.core.filter_interface.Filter):
254class InlineDeduplicator(Filter):
255    """
256    Simple in‑memory deduplicator.
257
258    Stores every LSH key in a local :pyclass:`set`. If any key of the incoming
259    document is already present, the document is marked as duplicate via
260    `document.is_rejected = True`.
261
262    **Limitations**
263    -------------
264    *State is per‑process only.* Running multiple workers or machines will *not*
265    share the key set – use :class:`RedisDeduplicator` or
266    :class:`RedisBloomDeduplicator` for distributed setups.
267    """
268
269    def __init__(self, **kwargs: Any):
270        super().__init__(**kwargs)
271        self.hash_pool: set[str] = set()
272
273    def apply(self, document: Document) -> Document:
274        """
275        Inline deduplication based on the LSH keys in the document.
276        This filter cannot use in the distributed environment because it uses a local hash pool.
277        """
278        lsh_keys = document.extras.get("dedup_lsh")
279        if lsh_keys is None:
280            raise ValueError(
281                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
282            )
283
284        for lsh in lsh_keys:
285            if lsh in self.hash_pool:
286                document.is_rejected = True
287            else:
288                self.hash_pool.add(lsh)
289        return document

Simple in‑memory deduplicator.

Stores every LSH key in a local :pyclass:set. If any key of the incoming document is already present, the document is marked as duplicate via document.is_rejected = True.

Limitations

State is per‑process only. Running multiple workers or machines will not share the key set – use RedisDeduplicator or RedisBloomDeduplicator for distributed setups.

InlineDeduplicator(**kwargs: Any)
269    def __init__(self, **kwargs: Any):
270        super().__init__(**kwargs)
271        self.hash_pool: set[str] = set()

Initialize the filter.

Parameters

p : float The probability of applying the filter. If p is 1, the filter will always be applied. skip_rejected : bool If True, the filter will skip documents that are already rejected. If you want to apply the filter to all documents (e.g., postprocess), set this to False. random_state : Optional[Union[int, np.random.Generator]] Seed for the random number generator. If None, a new random number generator will be created. If None, and use in the Compose class, the random state is shared with the Compose object. use_batch : bool If True, the filter will process documents in batches in the apply_stream method. batch_size : int The size of the batch to process documents in the apply_stream method. kwargs : Any Additional keyword arguments to pass to the filter.

def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
273    def apply(self, document: Document) -> Document:
274        """
275        Inline deduplication based on the LSH keys in the document.
276        This filter cannot use in the distributed environment because it uses a local hash pool.
277        """
278        lsh_keys = document.extras.get("dedup_lsh")
279        if lsh_keys is None:
280            raise ValueError(
281                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
282            )
283
284        for lsh in lsh_keys:
285            if lsh in self.hash_pool:
286                document.is_rejected = True
287            else:
288                self.hash_pool.add(lsh)
289        return document

Inline deduplication based on the LSH keys in the document. This filter cannot use in the distributed environment because it uses a local hash pool.

class RedisDeduplicator(hojichar.core.filter_interface.Filter):
292class RedisDeduplicator(Filter):
293    """
294    Distributed deduplicator using **plain Redis keys**.
295    You have to run a Redis server and pass its connection parameters.
296    """
297
298    def __init__(
299        self,
300        *,
301        host: str = "localhost",
302        port: int = 6379,
303        db: int = 0,
304        key_prefix: str = "dedup",
305        **kwargs: Any,
306    ) -> None:
307        """
308        Initialize the Redis deduplicator.
309        Args:
310            host (str): Redis server hostname.
311            port (int): Redis server port.
312            db (int): Redis database number.
313            key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task.
314            **kwargs: Additional keyword arguments for parent Filter.
315        """
316        super().__init__(**kwargs)
317        self.rds = redis.Redis(host=host, port=port, db=db, decode_responses=False)
318        self.key_prefix = key_prefix.encode()
319
320        try:
321            self.rds.ping()
322        except redis.exceptions.RedisError as exc:
323            raise RuntimeError(f"Cannot connect to Redis server {host}:{port}/{db}") from exc
324
325    def apply(self, document: Document) -> Document:
326        lsh_keys = document.extras.get("dedup_lsh")
327        if lsh_keys is None:
328            raise ValueError("Apply GenerateDedupLSH first")
329
330        pipe = self.rds.pipeline(transaction=False)
331        for k in lsh_keys:
332            pipe.set(self.key_prefix + b":" + k.encode(), b"1", nx=True)
333        results: list[bool | None] = pipe.execute()  # If instance already exists, it returns None
334
335        if any(r is None for r in results):
336            document.is_rejected = True
337        return document

Distributed deduplicator using plain Redis keys. You have to run a Redis server and pass its connection parameters.

RedisDeduplicator( *, host: str = 'localhost', port: int = 6379, db: int = 0, key_prefix: str = 'dedup', **kwargs: Any)
298    def __init__(
299        self,
300        *,
301        host: str = "localhost",
302        port: int = 6379,
303        db: int = 0,
304        key_prefix: str = "dedup",
305        **kwargs: Any,
306    ) -> None:
307        """
308        Initialize the Redis deduplicator.
309        Args:
310            host (str): Redis server hostname.
311            port (int): Redis server port.
312            db (int): Redis database number.
313            key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task.
314            **kwargs: Additional keyword arguments for parent Filter.
315        """
316        super().__init__(**kwargs)
317        self.rds = redis.Redis(host=host, port=port, db=db, decode_responses=False)
318        self.key_prefix = key_prefix.encode()
319
320        try:
321            self.rds.ping()
322        except redis.exceptions.RedisError as exc:
323            raise RuntimeError(f"Cannot connect to Redis server {host}:{port}/{db}") from exc

Initialize the Redis deduplicator. Args: host (str): Redis server hostname. port (int): Redis server port. db (int): Redis database number. key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. **kwargs: Additional keyword arguments for parent Filter.

def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
325    def apply(self, document: Document) -> Document:
326        lsh_keys = document.extras.get("dedup_lsh")
327        if lsh_keys is None:
328            raise ValueError("Apply GenerateDedupLSH first")
329
330        pipe = self.rds.pipeline(transaction=False)
331        for k in lsh_keys:
332            pipe.set(self.key_prefix + b":" + k.encode(), b"1", nx=True)
333        results: list[bool | None] = pipe.execute()  # If instance already exists, it returns None
334
335        if any(r is None for r in results):
336            document.is_rejected = True
337        return document

Definition of filter behavior.

The document must have a protocol TextContent, and mostly used hojichar.Document class.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

class RedisBloomDeduplicator(hojichar.core.filter_interface.Filter):
340class RedisBloomDeduplicator(Filter):
341    """
342    Distributed deduplicator backed by **RedisBloom scalable Bloom filters**.
343    You can use this filter to store-LSHs with less memory than Redis keys with the risk of false positives.
344
345    Each *band* gets its own scalable Bloom filter on the Redis side:
346
347    ```text
348    BF.RESERVE <prefix>:<band_idx> <error> <capacity> EXPANSION <n>
349    ```
350
351    """
352
353    def __init__(
354        self,
355        *,
356        host: str = "localhost",
357        port: int = 6379,
358        db: int = 0,
359        key_prefix: str = "bloomdedup",
360        error_rate: float = 1e-4,
361        capacity: int = 1_000_000_000,
362        expansion: int = 2,
363        **kwargs: Any,
364    ):
365        """
366        Initialize the RedisBloom deduplicator.
367        Args:
368            host (str): Redis server hostname.
369            port (int): Redis server port.
370            db (int): Redis database number.
371            key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task.
372            error_rate (float): Desired error rate for the Bloom filter.
373            capacity (int): Initial capacity of the Bloom filter. Guide: set it to the expected number of unique LSH keys ~ num_doc * num_bands.
374            expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically.
375            **kwargs: Additional keyword arguments for parent Filter.
376        """
377        super().__init__(**kwargs)
378        self.rds = redis.Redis(host=host, port=port, db=db)
379        self.key_prefix = key_prefix.encode()
380
381        try:
382            self.rds.execute_command(
383                "BF.RESERVE",
384                self.key_prefix,
385                error_rate,
386                capacity,
387                "EXPANSION",
388                expansion,
389            )
390        except redis.ResponseError as e:
391            if "exists" not in str(e):
392                raise
393
394    def apply(self, document: Document) -> Document:
395        lsh_keys: list[str] | None = document.extras.get("dedup_lsh")
396        if lsh_keys is None:
397            raise ValueError(
398                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
399            )
400
401        key_bytes = [k.encode() for k in lsh_keys]
402
403        # Return value of BF.MADD is [1,0,1,...] (0 = already exists, 1 = insertion successful)
404        flags: Iterable[int] = self.rds.execute_command("BF.MADD", self.key_prefix, *key_bytes)
405        if 0 in flags:
406            document.is_rejected = True
407        return document

Distributed deduplicator backed by RedisBloom scalable Bloom filters. You can use this filter to store-LSHs with less memory than Redis keys with the risk of false positives.

Each band gets its own scalable Bloom filter on the Redis side:

BF.RESERVE <prefix>:<band_idx> <error> <capacity> EXPANSION <n>
RedisBloomDeduplicator( *, host: str = 'localhost', port: int = 6379, db: int = 0, key_prefix: str = 'bloomdedup', error_rate: float = 0.0001, capacity: int = 1000000000, expansion: int = 2, **kwargs: Any)
353    def __init__(
354        self,
355        *,
356        host: str = "localhost",
357        port: int = 6379,
358        db: int = 0,
359        key_prefix: str = "bloomdedup",
360        error_rate: float = 1e-4,
361        capacity: int = 1_000_000_000,
362        expansion: int = 2,
363        **kwargs: Any,
364    ):
365        """
366        Initialize the RedisBloom deduplicator.
367        Args:
368            host (str): Redis server hostname.
369            port (int): Redis server port.
370            db (int): Redis database number.
371            key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task.
372            error_rate (float): Desired error rate for the Bloom filter.
373            capacity (int): Initial capacity of the Bloom filter. Guide: set it to the expected number of unique LSH keys ~ num_doc * num_bands.
374            expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically.
375            **kwargs: Additional keyword arguments for parent Filter.
376        """
377        super().__init__(**kwargs)
378        self.rds = redis.Redis(host=host, port=port, db=db)
379        self.key_prefix = key_prefix.encode()
380
381        try:
382            self.rds.execute_command(
383                "BF.RESERVE",
384                self.key_prefix,
385                error_rate,
386                capacity,
387                "EXPANSION",
388                expansion,
389            )
390        except redis.ResponseError as e:
391            if "exists" not in str(e):
392                raise

Initialize the RedisBloom deduplicator. Args: host (str): Redis server hostname. port (int): Redis server port. db (int): Redis database number. key_prefix (str): Prefix for Redis keys to avoid collisions. You should use a unique prefix for each deduplication task. error_rate (float): Desired error rate for the Bloom filter. capacity (int): Initial capacity of the Bloom filter. Guide: set it to the expected number of unique LSH keys ~ num_doc * num_bands. expansion (int): Expansion factor for the Bloom filter. This is used to increase the capacity of the filter dynamically. **kwargs: Additional keyword arguments for parent Filter.

def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
394    def apply(self, document: Document) -> Document:
395        lsh_keys: list[str] | None = document.extras.get("dedup_lsh")
396        if lsh_keys is None:
397            raise ValueError(
398                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
399            )
400
401        key_bytes = [k.encode() for k in lsh_keys]
402
403        # Return value of BF.MADD is [1,0,1,...] (0 = already exists, 1 = insertion successful)
404        flags: Iterable[int] = self.rds.execute_command("BF.MADD", self.key_prefix, *key_bytes)
405        if 0 in flags:
406            document.is_rejected = True
407        return document

Definition of filter behavior.

The document must have a protocol TextContent, and mostly used hojichar.Document class.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

class InlineDuplicateAnalyzer(hojichar.core.filter_interface.Filter):
410class InlineDuplicateAnalyzer(Filter):
411    def __init__(self, **kwargs: Any):
412        super().__init__(**kwargs)
413        self.docs: dict[int, Document] = dict()  # doc_id -> Document mapping
414        self.hash_pool: dict[str, int] = dict()  # LSH key -> doc_id mapping
415
416        self._current_doc_id = 0
417
418    def apply(self, document: Document) -> Document:
419        """
420        Analyze duplicates inline based on the LSH keys in the document.
421        This filter cannot use in the distributed environment because it uses a local hash pool.
422        """
423        lsh_keys = document.extras.get("dedup_lsh")
424        if lsh_keys is None:
425            raise ValueError(
426                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
427            )
428
429        for lsh in lsh_keys:
430            if lsh in self.hash_pool:
431                document.is_rejected = True
432                document.extras["similar_doc"] = self.docs[self.hash_pool[lsh]].text
433            else:
434                self.hash_pool[lsh] = self._current_doc_id
435                self.docs[self._current_doc_id] = document
436
437        self._current_doc_id += 1
438        return document

Base class for all filters. Document-level filters must inherit from this class.

The definition of text processing is in apply method. If you define a new filter, override the method.

When this class is called, apply the filter from string to string.

With context manager, you can use the filter as follows:

with YourFilter(p=0.5) as filt:
    text = filt("This is a sample text.")
InlineDuplicateAnalyzer(**kwargs: Any)
411    def __init__(self, **kwargs: Any):
412        super().__init__(**kwargs)
413        self.docs: dict[int, Document] = dict()  # doc_id -> Document mapping
414        self.hash_pool: dict[str, int] = dict()  # LSH key -> doc_id mapping
415
416        self._current_doc_id = 0

Initialize the filter.

Parameters

p : float The probability of applying the filter. If p is 1, the filter will always be applied. skip_rejected : bool If True, the filter will skip documents that are already rejected. If you want to apply the filter to all documents (e.g., postprocess), set this to False. random_state : Optional[Union[int, np.random.Generator]] Seed for the random number generator. If None, a new random number generator will be created. If None, and use in the Compose class, the random state is shared with the Compose object. use_batch : bool If True, the filter will process documents in batches in the apply_stream method. batch_size : int The size of the batch to process documents in the apply_stream method. kwargs : Any Additional keyword arguments to pass to the filter.

def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
418    def apply(self, document: Document) -> Document:
419        """
420        Analyze duplicates inline based on the LSH keys in the document.
421        This filter cannot use in the distributed environment because it uses a local hash pool.
422        """
423        lsh_keys = document.extras.get("dedup_lsh")
424        if lsh_keys is None:
425            raise ValueError(
426                "Document does not contain LSH keys for deduplication. Please apply GenerateDedupLSH first."
427            )
428
429        for lsh in lsh_keys:
430            if lsh in self.hash_pool:
431                document.is_rejected = True
432                document.extras["similar_doc"] = self.docs[self.hash_pool[lsh]].text
433            else:
434                self.hash_pool[lsh] = self._current_doc_id
435                self.docs[self._current_doc_id] = document
436
437        self._current_doc_id += 1
438        return document

Analyze duplicates inline based on the LSH keys in the document. This filter cannot use in the distributed environment because it uses a local hash pool.