hojichar.core.filter_interface

  1import logging
  2from abc import ABC, abstractmethod
  3from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Set, TypeVar, Union
  4
  5import numpy as np
  6
  7from hojichar.core.models import Document, Statistics, Token, get_doc_info
  8from hojichar.utils.warn_deprecation import deprecated_since
  9
 10T = TypeVar("T")
 11
 12
 13def _is_jsonable(data: Any) -> bool:
 14    if data is None:
 15        return True
 16    elif isinstance(data, (bool, int, float, str)):
 17        return True
 18    return False
 19
 20
 21class Filter(ABC):
 22    """
 23    Base class for all filters.
 24    Document-level filters must inherit from this class.
 25
 26    The definition of text processing is in `apply` method.
 27    If you define a new filter, override the method.
 28
 29    When this class is called, apply the filter from string to string.
 30
 31    With context manager, you can use the filter as follows:
 32    ```python
 33    with YourFilter(p=0.5) as filt:
 34        text = filt("This is a sample text.")
 35    ```
 36
 37    """
 38
 39    def __init__(
 40        self,
 41        p: float = 1.0,
 42        skip_rejected: bool = True,
 43        *args: Any,
 44        random_state: Optional[Union[int, np.random.Generator]] = None,
 45        use_batch: bool = False,
 46        batch_size: int = 128,
 47        **kwargs: Any,
 48    ) -> None:
 49        """
 50        Initialize the filter.
 51        Parameters
 52        ----------
 53        p : float
 54            The probability of applying the filter.
 55            If `p` is 1, the filter will always be applied.
 56        skip_rejected : bool
 57            If `True`, the filter will skip documents that are already rejected.
 58            If you want to apply the filter to all documents (e.g., postprocess), set this to `False`.
 59        random_state : Optional[Union[int, np.random.Generator]]
 60            Seed for the random number generator.
 61            If `None`, a new random number generator will be created.
 62            If `None`, and use in the `Compose` class, the random state is shared with the `Compose` object.
 63        use_batch : bool
 64            If `True`, the filter will process documents in batches in the `apply_stream` method.
 65        batch_size : int
 66            The size of the batch to process documents in the `apply_stream` method.
 67        kwargs : Any
 68            Additional keyword arguments to pass to the filter.
 69        """
 70        self.name = self.__class__.__name__
 71        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
 72        assert 0 <= p <= 1
 73        self.p = p
 74        self.__init_rng(random_state)
 75        self.skip_rejected = skip_rejected
 76        self.use_batch = use_batch
 77        self.batch_size = batch_size
 78
 79        self._statistics: Statistics = Statistics()
 80
 81    @abstractmethod
 82    def apply(self, document: Document) -> Document:
 83        """
 84        Definition of filter behavior.
 85
 86        The document must have a protocol `TextContent`,
 87        and mostly used hojichar.Document class.
 88
 89        In this method, the filter will modify `document.text` or
 90        `document.extras` and set `document.is_rejected = True` to discard the document.
 91
 92        Parameters
 93        ----------
 94        document : Document
 95            Input document
 96
 97        Returns
 98        -------
 99        Document
100            Processed Document
101        """
102
103    @deprecated_since(version="1.0.0", alternative="apply")
104    def apply_filter(self, document: Document) -> Document:
105        document = self.apply(document)
106        return document
107
108    def _check_skip(self, document: Document) -> bool:
109        """
110        Check if the document should be skipped by this filter.
111        If `skip_rejected` is set to `True`, this method will return `True`
112        if the document is already rejected.
113        If `p` is less than 1, this method will return `True` with a probability of `1 - p`.
114        """
115        skip = self.skip_rejected and document.is_rejected
116        if skip:
117            return True
118        if self.p < 1:
119            if self._rng.random() > self.p:
120                return True
121        return False
122
123    def _apply(self, document: Document) -> Document:
124        """
125        Apply the filter to a single document.
126        This method
127          - checks if the document should be skipped
128          - counts and logging the statistics
129          - logging the reason for rejection if the document is rejected
130
131        This method may be used in `apply` method of `Compose` class.
132        """
133
134        stats = get_doc_info(document)
135
136        if not self._check_skip(document):
137            document = self.apply(document)
138
139        new_stats = get_doc_info(document)
140        self._statistics.update_by_diff(stats, new_stats)
141
142        if not stats["is_rejected"] and new_stats["is_rejected"]:
143            document.reject_reason = self.get_jsonable_vars()
144
145        return document
146
147    def apply_batch(self, batch: Sequence[Document]) -> List[Document]:
148        """
149        Apply the filter to a batch of documents.
150        You can override this method if you want to
151        apply the filter to a batch of documents at once.
152
153        This method may be used in `apply_batch` method of `Compose` class.
154
155        Parameters
156        ----------
157        documents : Sequence[Document]
158            List-like object of input documents
159
160        Returns
161        -------
162        list[Document]
163            List of processed documents
164        """
165        return [self.apply(document) for document in batch]
166
167    def _apply_batch(self, batch: Sequence[Document]) -> List[Document]:
168        """
169        Apply the filter to a batch of documents.
170        This method
171        - checks if the documents should be skipped
172        - counts and logs the statistics
173        - logs the reason for rejection if any document is rejected
174        """
175        skip = False
176        if self.p < 1:
177            skip = self._rng.random() > self.p
178
179        stats = [get_doc_info(document=doc) for doc in batch]
180        if not skip:
181            batch = self.apply_batch(batch)
182        batch = self._finalize_batch(batch, stats)
183        return batch
184
185    def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]:
186        """
187        Apply the filter to a stream of documents.
188        This method is used when you want to process documents one by one.
189        If `use_batch` is set to `True` in the constructor,
190        this method will process documents in batches using the `apply_batch` method.
191
192        Even if an exception occurs during processing, the process will continue, and the following actions will be taken:
193        - Set the `is_rejected` flag of the document to `True`
194        - Set the error details in `reject_reason`
195        - Increment the `errors` count in the statistics retrievable via `get_statistics`
196
197        Parameters
198        ----------
199        stream : Iterable[Document]
200            Stream of input documents
201
202        Returns
203        -------
204        Iterable[Document]
205            Stream of processed documents
206        """
207
208        if not self.use_batch:
209            for document in stream:
210                yield self._try_process(document, self._apply)
211        else:
212            batch: list[Document] = []
213            for document in stream:
214                if self._check_skip(document):
215                    yield document
216                    continue
217
218                batch.append(document)
219                if len(batch) >= self.batch_size:
220                    stats = [get_doc_info(doc) for doc in batch]
221                    batch = self._try_process(batch, self.apply_batch)
222                    batch = self._finalize_batch(batch, stats)
223                    yield from batch
224                    batch.clear()
225            if batch:
226                stats = [get_doc_info(doc) for doc in batch]
227                batch = self._try_process(batch, self.apply_batch)
228                batch = self._finalize_batch(batch, stats)
229                yield from batch
230
231    def _try_process(self, target: T, func: Callable[[T], T]) -> T:
232        try:
233            return func(target)
234        except Exception as e:
235            if isinstance(target, Document):
236                msg = f"{e!r} occurs while processing {self.name} with {target!r}"
237                target.is_rejected = True
238                target.reject_reason = {"error": msg}
239                self._statistics.errors += 1
240                self.logger.error(msg, exc_info=True)
241                return target  # type: ignore[return-value]
242            if isinstance(target, list):
243                msg = f"{e!r} occurs while batch processing {self.name}"
244                self.logger.error(msg, exc_info=True)
245                for doc in target:
246                    doc.is_rejected = True
247                    doc.reject_reason = {"error": msg}
248                self._statistics.errors += len(target)
249                return target  # type: ignore[return-value]
250            else:
251                raise e
252
253    def __call__(self, text: str, **kwargs: Any) -> str:
254        document = Document(text, **kwargs)
255        document = self._apply(document)
256        return document.text
257
258    def get_statistics(self) -> Statistics:
259        """
260        Get the statistics of this filter.
261        This method returns the statistics of the filter,
262        which includes the number of processed documents, discarded documents, and other statistics.
263        """
264        return self._statistics
265
266    def get_statistics_map(self) -> Dict[str, Any]:
267        """
268        Get the statistics of this filter as a dictionary.
269        """
270        return self._statistics.to_dict()
271
272    def shutdown(self) -> None:
273        """
274        This method is called when the filter is no longer needed.
275        You can override this method to release resources or perform cleanup tasks.
276        """
277        pass
278
279    def __enter__(self) -> "Filter":
280        return self
281
282    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
283        """
284        This method is called when the filter is used in a context manager.
285        It calls the `shutdown` method to release resources or perform cleanup tasks.
286        """
287        self.shutdown()
288
289    def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> Dict[str, Any]:
290        """
291        Get the member variable of this filter.
292        Eligible variables are primitive types; [bool, int, float, str, None],
293        and the name of the variable not starts with the underscore; `_`.
294        """
295        if exclude_keys is None:
296            exclude_keys = set()
297        return {
298            k: v
299            for k, v in vars(self).items()
300            if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_")))
301        }
302
303    def _finalize_batch(
304        self: "Filter",
305        batch: Sequence[Document],
306        old_stats: List[Dict[str, Any]] = [],
307    ) -> List[Document]:
308        new_stats = [get_doc_info(doc) for doc in batch]
309        for old, new, doc in zip(old_stats, new_stats, batch):
310            self._statistics.update_by_diff(old, new)
311            if not old["is_rejected"] and new["is_rejected"]:
312                doc.reject_reason = self.get_jsonable_vars()
313        return list(batch)
314
315    def __init_rng(self, random_state: Optional[Union[int, np.random.Generator]]) -> None:
316        self._owns_rng = True
317        if random_state is None:
318            self._rng = np.random.default_rng()
319            self._owns_rng = False
320        elif isinstance(random_state, int):
321            self._rng = np.random.default_rng(random_state)
322        elif isinstance(random_state, np.random.Generator):
323            self._rng = random_state
324
325    def _set_rng_if_not_initialized(self, rng: np.random.Generator) -> None:
326        """
327        Set the random number generator for this filter if it is not already initialized.
328        This method is called by Compose class.
329        """
330        if not self._owns_rng:
331            self._rng = rng
332
333
334@deprecated_since(version="1.0.0", alternative="Filter")
335class TokenFilter(Filter, ABC):
336    """
337    Base class for token-level filters.
338
339    Token filters, which shuld be implemented in hojichar/filters/token_filters.py,
340    must inherit from this class.
341    """
342
343    def __init__(
344        self, p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any
345    ) -> None:
346        self.name = self.__class__.__name__
347        self.logger = logging.getLogger("hojichar.token_filters." + self.name)
348        assert 0 <= p <= 1
349        self.p = p
350        self.skip_rejected = skip_rejected
351
352    def apply(self, token: Token) -> Token:  # type: ignore
353        raise NotImplementedError(f"{self.__class__.__name__}.apply method is not defined")
354        return token
355
356    def apply_filter(self, document: Document) -> Document:
357        document.tokens = [self.apply(token) for token in document.tokens if not token.is_rejected]
358        return document
359
360    def __call__(self, text: str) -> str:  # type: ignore
361        token = Token(text)
362        token = self.apply(token)
363        return token.text
364
365    def _apply(self, document: Document) -> Document:
366        """
367        Apply the token filter to a single document.
368        This method checks if the document should be skipped.
369        """
370        if self.skip_rejected and document.is_rejected:
371            return document
372        return self.apply_filter(document)
373
374    def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> dict:
375        """
376        Get the member variable of this filter.
377        Eligible variables are primitive types; [bool, int, float, str, None],
378        and the name of the variable not starts with the underscore; `_`.
379        """
380        if exclude_keys is None:
381            exclude_keys = set()
382        return {
383            k: v
384            for k, v in vars(self).items()
385            if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_")))
386        }
class Filter(abc.ABC):
 22class Filter(ABC):
 23    """
 24    Base class for all filters.
 25    Document-level filters must inherit from this class.
 26
 27    The definition of text processing is in `apply` method.
 28    If you define a new filter, override the method.
 29
 30    When this class is called, apply the filter from string to string.
 31
 32    With context manager, you can use the filter as follows:
 33    ```python
 34    with YourFilter(p=0.5) as filt:
 35        text = filt("This is a sample text.")
 36    ```
 37
 38    """
 39
 40    def __init__(
 41        self,
 42        p: float = 1.0,
 43        skip_rejected: bool = True,
 44        *args: Any,
 45        random_state: Optional[Union[int, np.random.Generator]] = None,
 46        use_batch: bool = False,
 47        batch_size: int = 128,
 48        **kwargs: Any,
 49    ) -> None:
 50        """
 51        Initialize the filter.
 52        Parameters
 53        ----------
 54        p : float
 55            The probability of applying the filter.
 56            If `p` is 1, the filter will always be applied.
 57        skip_rejected : bool
 58            If `True`, the filter will skip documents that are already rejected.
 59            If you want to apply the filter to all documents (e.g., postprocess), set this to `False`.
 60        random_state : Optional[Union[int, np.random.Generator]]
 61            Seed for the random number generator.
 62            If `None`, a new random number generator will be created.
 63            If `None`, and use in the `Compose` class, the random state is shared with the `Compose` object.
 64        use_batch : bool
 65            If `True`, the filter will process documents in batches in the `apply_stream` method.
 66        batch_size : int
 67            The size of the batch to process documents in the `apply_stream` method.
 68        kwargs : Any
 69            Additional keyword arguments to pass to the filter.
 70        """
 71        self.name = self.__class__.__name__
 72        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
 73        assert 0 <= p <= 1
 74        self.p = p
 75        self.__init_rng(random_state)
 76        self.skip_rejected = skip_rejected
 77        self.use_batch = use_batch
 78        self.batch_size = batch_size
 79
 80        self._statistics: Statistics = Statistics()
 81
 82    @abstractmethod
 83    def apply(self, document: Document) -> Document:
 84        """
 85        Definition of filter behavior.
 86
 87        The document must have a protocol `TextContent`,
 88        and mostly used hojichar.Document class.
 89
 90        In this method, the filter will modify `document.text` or
 91        `document.extras` and set `document.is_rejected = True` to discard the document.
 92
 93        Parameters
 94        ----------
 95        document : Document
 96            Input document
 97
 98        Returns
 99        -------
100        Document
101            Processed Document
102        """
103
104    @deprecated_since(version="1.0.0", alternative="apply")
105    def apply_filter(self, document: Document) -> Document:
106        document = self.apply(document)
107        return document
108
109    def _check_skip(self, document: Document) -> bool:
110        """
111        Check if the document should be skipped by this filter.
112        If `skip_rejected` is set to `True`, this method will return `True`
113        if the document is already rejected.
114        If `p` is less than 1, this method will return `True` with a probability of `1 - p`.
115        """
116        skip = self.skip_rejected and document.is_rejected
117        if skip:
118            return True
119        if self.p < 1:
120            if self._rng.random() > self.p:
121                return True
122        return False
123
124    def _apply(self, document: Document) -> Document:
125        """
126        Apply the filter to a single document.
127        This method
128          - checks if the document should be skipped
129          - counts and logging the statistics
130          - logging the reason for rejection if the document is rejected
131
132        This method may be used in `apply` method of `Compose` class.
133        """
134
135        stats = get_doc_info(document)
136
137        if not self._check_skip(document):
138            document = self.apply(document)
139
140        new_stats = get_doc_info(document)
141        self._statistics.update_by_diff(stats, new_stats)
142
143        if not stats["is_rejected"] and new_stats["is_rejected"]:
144            document.reject_reason = self.get_jsonable_vars()
145
146        return document
147
148    def apply_batch(self, batch: Sequence[Document]) -> List[Document]:
149        """
150        Apply the filter to a batch of documents.
151        You can override this method if you want to
152        apply the filter to a batch of documents at once.
153
154        This method may be used in `apply_batch` method of `Compose` class.
155
156        Parameters
157        ----------
158        documents : Sequence[Document]
159            List-like object of input documents
160
161        Returns
162        -------
163        list[Document]
164            List of processed documents
165        """
166        return [self.apply(document) for document in batch]
167
168    def _apply_batch(self, batch: Sequence[Document]) -> List[Document]:
169        """
170        Apply the filter to a batch of documents.
171        This method
172        - checks if the documents should be skipped
173        - counts and logs the statistics
174        - logs the reason for rejection if any document is rejected
175        """
176        skip = False
177        if self.p < 1:
178            skip = self._rng.random() > self.p
179
180        stats = [get_doc_info(document=doc) for doc in batch]
181        if not skip:
182            batch = self.apply_batch(batch)
183        batch = self._finalize_batch(batch, stats)
184        return batch
185
186    def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]:
187        """
188        Apply the filter to a stream of documents.
189        This method is used when you want to process documents one by one.
190        If `use_batch` is set to `True` in the constructor,
191        this method will process documents in batches using the `apply_batch` method.
192
193        Even if an exception occurs during processing, the process will continue, and the following actions will be taken:
194        - Set the `is_rejected` flag of the document to `True`
195        - Set the error details in `reject_reason`
196        - Increment the `errors` count in the statistics retrievable via `get_statistics`
197
198        Parameters
199        ----------
200        stream : Iterable[Document]
201            Stream of input documents
202
203        Returns
204        -------
205        Iterable[Document]
206            Stream of processed documents
207        """
208
209        if not self.use_batch:
210            for document in stream:
211                yield self._try_process(document, self._apply)
212        else:
213            batch: list[Document] = []
214            for document in stream:
215                if self._check_skip(document):
216                    yield document
217                    continue
218
219                batch.append(document)
220                if len(batch) >= self.batch_size:
221                    stats = [get_doc_info(doc) for doc in batch]
222                    batch = self._try_process(batch, self.apply_batch)
223                    batch = self._finalize_batch(batch, stats)
224                    yield from batch
225                    batch.clear()
226            if batch:
227                stats = [get_doc_info(doc) for doc in batch]
228                batch = self._try_process(batch, self.apply_batch)
229                batch = self._finalize_batch(batch, stats)
230                yield from batch
231
232    def _try_process(self, target: T, func: Callable[[T], T]) -> T:
233        try:
234            return func(target)
235        except Exception as e:
236            if isinstance(target, Document):
237                msg = f"{e!r} occurs while processing {self.name} with {target!r}"
238                target.is_rejected = True
239                target.reject_reason = {"error": msg}
240                self._statistics.errors += 1
241                self.logger.error(msg, exc_info=True)
242                return target  # type: ignore[return-value]
243            if isinstance(target, list):
244                msg = f"{e!r} occurs while batch processing {self.name}"
245                self.logger.error(msg, exc_info=True)
246                for doc in target:
247                    doc.is_rejected = True
248                    doc.reject_reason = {"error": msg}
249                self._statistics.errors += len(target)
250                return target  # type: ignore[return-value]
251            else:
252                raise e
253
254    def __call__(self, text: str, **kwargs: Any) -> str:
255        document = Document(text, **kwargs)
256        document = self._apply(document)
257        return document.text
258
259    def get_statistics(self) -> Statistics:
260        """
261        Get the statistics of this filter.
262        This method returns the statistics of the filter,
263        which includes the number of processed documents, discarded documents, and other statistics.
264        """
265        return self._statistics
266
267    def get_statistics_map(self) -> Dict[str, Any]:
268        """
269        Get the statistics of this filter as a dictionary.
270        """
271        return self._statistics.to_dict()
272
273    def shutdown(self) -> None:
274        """
275        This method is called when the filter is no longer needed.
276        You can override this method to release resources or perform cleanup tasks.
277        """
278        pass
279
280    def __enter__(self) -> "Filter":
281        return self
282
283    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
284        """
285        This method is called when the filter is used in a context manager.
286        It calls the `shutdown` method to release resources or perform cleanup tasks.
287        """
288        self.shutdown()
289
290    def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> Dict[str, Any]:
291        """
292        Get the member variable of this filter.
293        Eligible variables are primitive types; [bool, int, float, str, None],
294        and the name of the variable not starts with the underscore; `_`.
295        """
296        if exclude_keys is None:
297            exclude_keys = set()
298        return {
299            k: v
300            for k, v in vars(self).items()
301            if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_")))
302        }
303
304    def _finalize_batch(
305        self: "Filter",
306        batch: Sequence[Document],
307        old_stats: List[Dict[str, Any]] = [],
308    ) -> List[Document]:
309        new_stats = [get_doc_info(doc) for doc in batch]
310        for old, new, doc in zip(old_stats, new_stats, batch):
311            self._statistics.update_by_diff(old, new)
312            if not old["is_rejected"] and new["is_rejected"]:
313                doc.reject_reason = self.get_jsonable_vars()
314        return list(batch)
315
316    def __init_rng(self, random_state: Optional[Union[int, np.random.Generator]]) -> None:
317        self._owns_rng = True
318        if random_state is None:
319            self._rng = np.random.default_rng()
320            self._owns_rng = False
321        elif isinstance(random_state, int):
322            self._rng = np.random.default_rng(random_state)
323        elif isinstance(random_state, np.random.Generator):
324            self._rng = random_state
325
326    def _set_rng_if_not_initialized(self, rng: np.random.Generator) -> None:
327        """
328        Set the random number generator for this filter if it is not already initialized.
329        This method is called by Compose class.
330        """
331        if not self._owns_rng:
332            self._rng = rng

Base class for all filters. Document-level filters must inherit from this class.

The definition of text processing is in apply method. If you define a new filter, override the method.

When this class is called, apply the filter from string to string.

With context manager, you can use the filter as follows:

with YourFilter(p=0.5) as filt:
    text = filt("This is a sample text.")
Filter( p: float = 1.0, skip_rejected: bool = True, *args: Any, random_state: Union[int, numpy.random._generator.Generator, NoneType] = None, use_batch: bool = False, batch_size: int = 128, **kwargs: Any)
40    def __init__(
41        self,
42        p: float = 1.0,
43        skip_rejected: bool = True,
44        *args: Any,
45        random_state: Optional[Union[int, np.random.Generator]] = None,
46        use_batch: bool = False,
47        batch_size: int = 128,
48        **kwargs: Any,
49    ) -> None:
50        """
51        Initialize the filter.
52        Parameters
53        ----------
54        p : float
55            The probability of applying the filter.
56            If `p` is 1, the filter will always be applied.
57        skip_rejected : bool
58            If `True`, the filter will skip documents that are already rejected.
59            If you want to apply the filter to all documents (e.g., postprocess), set this to `False`.
60        random_state : Optional[Union[int, np.random.Generator]]
61            Seed for the random number generator.
62            If `None`, a new random number generator will be created.
63            If `None`, and use in the `Compose` class, the random state is shared with the `Compose` object.
64        use_batch : bool
65            If `True`, the filter will process documents in batches in the `apply_stream` method.
66        batch_size : int
67            The size of the batch to process documents in the `apply_stream` method.
68        kwargs : Any
69            Additional keyword arguments to pass to the filter.
70        """
71        self.name = self.__class__.__name__
72        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
73        assert 0 <= p <= 1
74        self.p = p
75        self.__init_rng(random_state)
76        self.skip_rejected = skip_rejected
77        self.use_batch = use_batch
78        self.batch_size = batch_size
79
80        self._statistics: Statistics = Statistics()

Initialize the filter.

Parameters

p : float The probability of applying the filter. If p is 1, the filter will always be applied. skip_rejected : bool If True, the filter will skip documents that are already rejected. If you want to apply the filter to all documents (e.g., postprocess), set this to False. random_state : Optional[Union[int, np.random.Generator]] Seed for the random number generator. If None, a new random number generator will be created. If None, and use in the Compose class, the random state is shared with the Compose object. use_batch : bool If True, the filter will process documents in batches in the apply_stream method. batch_size : int The size of the batch to process documents in the apply_stream method. kwargs : Any Additional keyword arguments to pass to the filter.

@abstractmethod
def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
 82    @abstractmethod
 83    def apply(self, document: Document) -> Document:
 84        """
 85        Definition of filter behavior.
 86
 87        The document must have a protocol `TextContent`,
 88        and mostly used hojichar.Document class.
 89
 90        In this method, the filter will modify `document.text` or
 91        `document.extras` and set `document.is_rejected = True` to discard the document.
 92
 93        Parameters
 94        ----------
 95        document : Document
 96            Input document
 97
 98        Returns
 99        -------
100        Document
101            Processed Document
102        """

Definition of filter behavior.

The document must have a protocol TextContent, and mostly used hojichar.Document class.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

@deprecated_since(version='1.0.0', alternative='apply')
def apply_filter( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
104    @deprecated_since(version="1.0.0", alternative="apply")
105    def apply_filter(self, document: Document) -> Document:
106        document = self.apply(document)
107        return document
def apply_batch( self, batch: Sequence[hojichar.core.models.Document]) -> List[hojichar.core.models.Document]:
148    def apply_batch(self, batch: Sequence[Document]) -> List[Document]:
149        """
150        Apply the filter to a batch of documents.
151        You can override this method if you want to
152        apply the filter to a batch of documents at once.
153
154        This method may be used in `apply_batch` method of `Compose` class.
155
156        Parameters
157        ----------
158        documents : Sequence[Document]
159            List-like object of input documents
160
161        Returns
162        -------
163        list[Document]
164            List of processed documents
165        """
166        return [self.apply(document) for document in batch]

Apply the filter to a batch of documents. You can override this method if you want to apply the filter to a batch of documents at once.

This method may be used in apply_batch method of Compose class.

Parameters

documents : Sequence[Document] List-like object of input documents

Returns

list[Document] List of processed documents

def apply_stream( self, stream: Iterable[hojichar.core.models.Document]) -> Iterable[hojichar.core.models.Document]:
186    def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]:
187        """
188        Apply the filter to a stream of documents.
189        This method is used when you want to process documents one by one.
190        If `use_batch` is set to `True` in the constructor,
191        this method will process documents in batches using the `apply_batch` method.
192
193        Even if an exception occurs during processing, the process will continue, and the following actions will be taken:
194        - Set the `is_rejected` flag of the document to `True`
195        - Set the error details in `reject_reason`
196        - Increment the `errors` count in the statistics retrievable via `get_statistics`
197
198        Parameters
199        ----------
200        stream : Iterable[Document]
201            Stream of input documents
202
203        Returns
204        -------
205        Iterable[Document]
206            Stream of processed documents
207        """
208
209        if not self.use_batch:
210            for document in stream:
211                yield self._try_process(document, self._apply)
212        else:
213            batch: list[Document] = []
214            for document in stream:
215                if self._check_skip(document):
216                    yield document
217                    continue
218
219                batch.append(document)
220                if len(batch) >= self.batch_size:
221                    stats = [get_doc_info(doc) for doc in batch]
222                    batch = self._try_process(batch, self.apply_batch)
223                    batch = self._finalize_batch(batch, stats)
224                    yield from batch
225                    batch.clear()
226            if batch:
227                stats = [get_doc_info(doc) for doc in batch]
228                batch = self._try_process(batch, self.apply_batch)
229                batch = self._finalize_batch(batch, stats)
230                yield from batch

Apply the filter to a stream of documents. This method is used when you want to process documents one by one. If use_batch is set to True in the constructor, this method will process documents in batches using the apply_batch method.

Even if an exception occurs during processing, the process will continue, and the following actions will be taken:

  • Set the is_rejected flag of the document to True
  • Set the error details in reject_reason
  • Increment the errors count in the statistics retrievable via get_statistics

Parameters

stream : Iterable[Document] Stream of input documents

Returns

Iterable[Document] Stream of processed documents

def get_statistics(self) -> hojichar.core.models.Statistics:
259    def get_statistics(self) -> Statistics:
260        """
261        Get the statistics of this filter.
262        This method returns the statistics of the filter,
263        which includes the number of processed documents, discarded documents, and other statistics.
264        """
265        return self._statistics

Get the statistics of this filter. This method returns the statistics of the filter, which includes the number of processed documents, discarded documents, and other statistics.

def get_statistics_map(self) -> Dict[str, Any]:
267    def get_statistics_map(self) -> Dict[str, Any]:
268        """
269        Get the statistics of this filter as a dictionary.
270        """
271        return self._statistics.to_dict()

Get the statistics of this filter as a dictionary.

def shutdown(self) -> None:
273    def shutdown(self) -> None:
274        """
275        This method is called when the filter is no longer needed.
276        You can override this method to release resources or perform cleanup tasks.
277        """
278        pass

This method is called when the filter is no longer needed. You can override this method to release resources or perform cleanup tasks.

def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> Dict[str, Any]:
290    def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> Dict[str, Any]:
291        """
292        Get the member variable of this filter.
293        Eligible variables are primitive types; [bool, int, float, str, None],
294        and the name of the variable not starts with the underscore; `_`.
295        """
296        if exclude_keys is None:
297            exclude_keys = set()
298        return {
299            k: v
300            for k, v in vars(self).items()
301            if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_")))
302        }

Get the member variable of this filter. Eligible variables are primitive types; [bool, int, float, str, None], and the name of the variable not starts with the underscore; _.

@deprecated_since(version='1.0.0', alternative='Filter')
class TokenFilter(Filter, abc.ABC):
335@deprecated_since(version="1.0.0", alternative="Filter")
336class TokenFilter(Filter, ABC):
337    """
338    Base class for token-level filters.
339
340    Token filters, which shuld be implemented in hojichar/filters/token_filters.py,
341    must inherit from this class.
342    """
343
344    def __init__(
345        self, p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any
346    ) -> None:
347        self.name = self.__class__.__name__
348        self.logger = logging.getLogger("hojichar.token_filters." + self.name)
349        assert 0 <= p <= 1
350        self.p = p
351        self.skip_rejected = skip_rejected
352
353    def apply(self, token: Token) -> Token:  # type: ignore
354        raise NotImplementedError(f"{self.__class__.__name__}.apply method is not defined")
355        return token
356
357    def apply_filter(self, document: Document) -> Document:
358        document.tokens = [self.apply(token) for token in document.tokens if not token.is_rejected]
359        return document
360
361    def __call__(self, text: str) -> str:  # type: ignore
362        token = Token(text)
363        token = self.apply(token)
364        return token.text
365
366    def _apply(self, document: Document) -> Document:
367        """
368        Apply the token filter to a single document.
369        This method checks if the document should be skipped.
370        """
371        if self.skip_rejected and document.is_rejected:
372            return document
373        return self.apply_filter(document)
374
375    def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> dict:
376        """
377        Get the member variable of this filter.
378        Eligible variables are primitive types; [bool, int, float, str, None],
379        and the name of the variable not starts with the underscore; `_`.
380        """
381        if exclude_keys is None:
382            exclude_keys = set()
383        return {
384            k: v
385            for k, v in vars(self).items()
386            if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_")))
387        }

Base class for token-level filters.

Token filters, which shuld be implemented in hojichar/filters/token_filters.py, must inherit from this class.

TokenFilter(p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any)
344    def __init__(
345        self, p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any
346    ) -> None:
347        self.name = self.__class__.__name__
348        self.logger = logging.getLogger("hojichar.token_filters." + self.name)
349        assert 0 <= p <= 1
350        self.p = p
351        self.skip_rejected = skip_rejected

Initialize the filter.

Parameters

p : float The probability of applying the filter. If p is 1, the filter will always be applied. skip_rejected : bool If True, the filter will skip documents that are already rejected. If you want to apply the filter to all documents (e.g., postprocess), set this to False. random_state : Optional[Union[int, np.random.Generator]] Seed for the random number generator. If None, a new random number generator will be created. If None, and use in the Compose class, the random state is shared with the Compose object. use_batch : bool If True, the filter will process documents in batches in the apply_stream method. batch_size : int The size of the batch to process documents in the apply_stream method. kwargs : Any Additional keyword arguments to pass to the filter.

def apply(self, token: hojichar.core.models.Token) -> hojichar.core.models.Token:
353    def apply(self, token: Token) -> Token:  # type: ignore
354        raise NotImplementedError(f"{self.__class__.__name__}.apply method is not defined")
355        return token

Definition of filter behavior.

The document must have a protocol TextContent, and mostly used hojichar.Document class.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

def apply_filter( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
357    def apply_filter(self, document: Document) -> Document:
358        document.tokens = [self.apply(token) for token in document.tokens if not token.is_rejected]
359        return document
def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> dict:
375    def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> dict:
376        """
377        Get the member variable of this filter.
378        Eligible variables are primitive types; [bool, int, float, str, None],
379        and the name of the variable not starts with the underscore; `_`.
380        """
381        if exclude_keys is None:
382            exclude_keys = set()
383        return {
384            k: v
385            for k, v in vars(self).items()
386            if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_")))
387        }

Get the member variable of this filter. Eligible variables are primitive types; [bool, int, float, str, None], and the name of the variable not starts with the underscore; _.