hojichar.core.filter_interface
1import logging 2from abc import ABC, abstractmethod 3from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Set, TypeVar, Union 4 5import numpy as np 6 7from hojichar.core.models import Document, Statistics, Token, get_doc_info 8from hojichar.utils.warn_deprecation import deprecated_since 9 10T = TypeVar("T") 11 12 13def _is_jsonable(data: Any) -> bool: 14 if data is None: 15 return True 16 elif isinstance(data, (bool, int, float, str)): 17 return True 18 return False 19 20 21class Filter(ABC): 22 """ 23 Base class for all filters. 24 Document-level filters must inherit from this class. 25 26 The definition of text processing is in `apply` method. 27 If you define a new filter, override the method. 28 29 When this class is called, apply the filter from string to string. 30 31 With context manager, you can use the filter as follows: 32 ```python 33 with YourFilter(p=0.5) as filt: 34 text = filt("This is a sample text.") 35 ``` 36 37 """ 38 39 def __init__( 40 self, 41 p: float = 1.0, 42 skip_rejected: bool = True, 43 *args: Any, 44 random_state: Optional[Union[int, np.random.Generator]] = None, 45 use_batch: bool = False, 46 batch_size: int = 128, 47 **kwargs: Any, 48 ) -> None: 49 """ 50 Initialize the filter. 51 Parameters 52 ---------- 53 p : float 54 The probability of applying the filter. 55 If `p` is 1, the filter will always be applied. 56 skip_rejected : bool 57 If `True`, the filter will skip documents that are already rejected. 58 If you want to apply the filter to all documents (e.g., postprocess), set this to `False`. 59 random_state : Optional[Union[int, np.random.Generator]] 60 Seed for the random number generator. 61 If `None`, a new random number generator will be created. 62 If `None`, and use in the `Compose` class, the random state is shared with the `Compose` object. 63 use_batch : bool 64 If `True`, the filter will process documents in batches in the `apply_stream` method. 65 batch_size : int 66 The size of the batch to process documents in the `apply_stream` method. 67 kwargs : Any 68 Additional keyword arguments to pass to the filter. 69 """ 70 self.name = self.__class__.__name__ 71 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 72 assert 0 <= p <= 1 73 self.p = p 74 self.__init_rng(random_state) 75 self.skip_rejected = skip_rejected 76 self.use_batch = use_batch 77 self.batch_size = batch_size 78 79 self._statistics: Statistics = Statistics() 80 81 @abstractmethod 82 def apply(self, document: Document) -> Document: 83 """ 84 Definition of filter behavior. 85 86 The document must have a protocol `TextContent`, 87 and mostly used hojichar.Document class. 88 89 In this method, the filter will modify `document.text` or 90 `document.extras` and set `document.is_rejected = True` to discard the document. 91 92 Parameters 93 ---------- 94 document : Document 95 Input document 96 97 Returns 98 ------- 99 Document 100 Processed Document 101 """ 102 103 @deprecated_since(version="1.0.0", alternative="apply") 104 def apply_filter(self, document: Document) -> Document: 105 document = self.apply(document) 106 return document 107 108 def _check_skip(self, document: Document) -> bool: 109 """ 110 Check if the document should be skipped by this filter. 111 If `skip_rejected` is set to `True`, this method will return `True` 112 if the document is already rejected. 113 If `p` is less than 1, this method will return `True` with a probability of `1 - p`. 114 """ 115 skip = self.skip_rejected and document.is_rejected 116 if skip: 117 return True 118 if self.p < 1: 119 if self._rng.random() > self.p: 120 return True 121 return False 122 123 def _apply(self, document: Document) -> Document: 124 """ 125 Apply the filter to a single document. 126 This method 127 - checks if the document should be skipped 128 - counts and logging the statistics 129 - logging the reason for rejection if the document is rejected 130 131 This method may be used in `apply` method of `Compose` class. 132 """ 133 134 stats = get_doc_info(document) 135 136 if not self._check_skip(document): 137 document = self.apply(document) 138 139 new_stats = get_doc_info(document) 140 self._statistics.update_by_diff(stats, new_stats) 141 142 if not stats["is_rejected"] and new_stats["is_rejected"]: 143 document.reject_reason = self.get_jsonable_vars() 144 145 return document 146 147 def apply_batch(self, batch: Sequence[Document]) -> List[Document]: 148 """ 149 Apply the filter to a batch of documents. 150 You can override this method if you want to 151 apply the filter to a batch of documents at once. 152 153 This method may be used in `apply_batch` method of `Compose` class. 154 155 Parameters 156 ---------- 157 documents : Sequence[Document] 158 List-like object of input documents 159 160 Returns 161 ------- 162 list[Document] 163 List of processed documents 164 """ 165 return [self.apply(document) for document in batch] 166 167 def _apply_batch(self, batch: Sequence[Document]) -> List[Document]: 168 """ 169 Apply the filter to a batch of documents. 170 This method 171 - checks if the documents should be skipped 172 - counts and logs the statistics 173 - logs the reason for rejection if any document is rejected 174 """ 175 skip = False 176 if self.p < 1: 177 skip = self._rng.random() > self.p 178 179 stats = [get_doc_info(document=doc) for doc in batch] 180 if not skip: 181 batch = self.apply_batch(batch) 182 batch = self._finalize_batch(batch, stats) 183 return batch 184 185 def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]: 186 """ 187 Apply the filter to a stream of documents. 188 This method is used when you want to process documents one by one. 189 If `use_batch` is set to `True` in the constructor, 190 this method will process documents in batches using the `apply_batch` method. 191 192 Even if an exception occurs during processing, the process will continue, and the following actions will be taken: 193 - Set the `is_rejected` flag of the document to `True` 194 - Set the error details in `reject_reason` 195 - Increment the `errors` count in the statistics retrievable via `get_statistics` 196 197 Parameters 198 ---------- 199 stream : Iterable[Document] 200 Stream of input documents 201 202 Returns 203 ------- 204 Iterable[Document] 205 Stream of processed documents 206 """ 207 208 if not self.use_batch: 209 for document in stream: 210 yield self._try_process(document, self._apply) 211 else: 212 batch: list[Document] = [] 213 for document in stream: 214 if self._check_skip(document): 215 yield document 216 continue 217 218 batch.append(document) 219 if len(batch) >= self.batch_size: 220 stats = [get_doc_info(doc) for doc in batch] 221 batch = self._try_process(batch, self.apply_batch) 222 batch = self._finalize_batch(batch, stats) 223 yield from batch 224 batch.clear() 225 if batch: 226 stats = [get_doc_info(doc) for doc in batch] 227 batch = self._try_process(batch, self.apply_batch) 228 batch = self._finalize_batch(batch, stats) 229 yield from batch 230 231 def _try_process(self, target: T, func: Callable[[T], T]) -> T: 232 try: 233 return func(target) 234 except Exception as e: 235 if isinstance(target, Document): 236 msg = f"{e!r} occurs while processing {self.name} with {target!r}" 237 target.is_rejected = True 238 target.reject_reason = {"error": msg} 239 self._statistics.errors += 1 240 self.logger.error(msg, exc_info=True) 241 return target # type: ignore[return-value] 242 if isinstance(target, list): 243 msg = f"{e!r} occurs while batch processing {self.name}" 244 self.logger.error(msg, exc_info=True) 245 for doc in target: 246 doc.is_rejected = True 247 doc.reject_reason = {"error": msg} 248 self._statistics.errors += len(target) 249 return target # type: ignore[return-value] 250 else: 251 raise e 252 253 def __call__(self, text: str, **kwargs: Any) -> str: 254 document = Document(text, **kwargs) 255 document = self._apply(document) 256 return document.text 257 258 def get_statistics(self) -> Statistics: 259 """ 260 Get the statistics of this filter. 261 This method returns the statistics of the filter, 262 which includes the number of processed documents, discarded documents, and other statistics. 263 """ 264 return self._statistics 265 266 def get_statistics_map(self) -> Dict[str, Any]: 267 """ 268 Get the statistics of this filter as a dictionary. 269 """ 270 return self._statistics.to_dict() 271 272 def shutdown(self) -> None: 273 """ 274 This method is called when the filter is no longer needed. 275 You can override this method to release resources or perform cleanup tasks. 276 """ 277 pass 278 279 def __enter__(self) -> "Filter": 280 return self 281 282 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 283 """ 284 This method is called when the filter is used in a context manager. 285 It calls the `shutdown` method to release resources or perform cleanup tasks. 286 """ 287 self.shutdown() 288 289 def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> Dict[str, Any]: 290 """ 291 Get the member variable of this filter. 292 Eligible variables are primitive types; [bool, int, float, str, None], 293 and the name of the variable not starts with the underscore; `_`. 294 """ 295 if exclude_keys is None: 296 exclude_keys = set() 297 return { 298 k: v 299 for k, v in vars(self).items() 300 if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_"))) 301 } 302 303 def _finalize_batch( 304 self: "Filter", 305 batch: Sequence[Document], 306 old_stats: List[Dict[str, Any]] = [], 307 ) -> List[Document]: 308 new_stats = [get_doc_info(doc) for doc in batch] 309 for old, new, doc in zip(old_stats, new_stats, batch): 310 self._statistics.update_by_diff(old, new) 311 if not old["is_rejected"] and new["is_rejected"]: 312 doc.reject_reason = self.get_jsonable_vars() 313 return list(batch) 314 315 def __init_rng(self, random_state: Optional[Union[int, np.random.Generator]]) -> None: 316 self._owns_rng = True 317 if random_state is None: 318 self._rng = np.random.default_rng() 319 self._owns_rng = False 320 elif isinstance(random_state, int): 321 self._rng = np.random.default_rng(random_state) 322 elif isinstance(random_state, np.random.Generator): 323 self._rng = random_state 324 325 def _set_rng_if_not_initialized(self, rng: np.random.Generator) -> None: 326 """ 327 Set the random number generator for this filter if it is not already initialized. 328 This method is called by Compose class. 329 """ 330 if not self._owns_rng: 331 self._rng = rng 332 333 334@deprecated_since(version="1.0.0", alternative="Filter") 335class TokenFilter(Filter, ABC): 336 """ 337 Base class for token-level filters. 338 339 Token filters, which shuld be implemented in hojichar/filters/token_filters.py, 340 must inherit from this class. 341 """ 342 343 def __init__( 344 self, p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any 345 ) -> None: 346 self.name = self.__class__.__name__ 347 self.logger = logging.getLogger("hojichar.token_filters." + self.name) 348 assert 0 <= p <= 1 349 self.p = p 350 self.skip_rejected = skip_rejected 351 352 def apply(self, token: Token) -> Token: # type: ignore 353 raise NotImplementedError(f"{self.__class__.__name__}.apply method is not defined") 354 return token 355 356 def apply_filter(self, document: Document) -> Document: 357 document.tokens = [self.apply(token) for token in document.tokens if not token.is_rejected] 358 return document 359 360 def __call__(self, text: str) -> str: # type: ignore 361 token = Token(text) 362 token = self.apply(token) 363 return token.text 364 365 def _apply(self, document: Document) -> Document: 366 """ 367 Apply the token filter to a single document. 368 This method checks if the document should be skipped. 369 """ 370 if self.skip_rejected and document.is_rejected: 371 return document 372 return self.apply_filter(document) 373 374 def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> dict: 375 """ 376 Get the member variable of this filter. 377 Eligible variables are primitive types; [bool, int, float, str, None], 378 and the name of the variable not starts with the underscore; `_`. 379 """ 380 if exclude_keys is None: 381 exclude_keys = set() 382 return { 383 k: v 384 for k, v in vars(self).items() 385 if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_"))) 386 }
22class Filter(ABC): 23 """ 24 Base class for all filters. 25 Document-level filters must inherit from this class. 26 27 The definition of text processing is in `apply` method. 28 If you define a new filter, override the method. 29 30 When this class is called, apply the filter from string to string. 31 32 With context manager, you can use the filter as follows: 33 ```python 34 with YourFilter(p=0.5) as filt: 35 text = filt("This is a sample text.") 36 ``` 37 38 """ 39 40 def __init__( 41 self, 42 p: float = 1.0, 43 skip_rejected: bool = True, 44 *args: Any, 45 random_state: Optional[Union[int, np.random.Generator]] = None, 46 use_batch: bool = False, 47 batch_size: int = 128, 48 **kwargs: Any, 49 ) -> None: 50 """ 51 Initialize the filter. 52 Parameters 53 ---------- 54 p : float 55 The probability of applying the filter. 56 If `p` is 1, the filter will always be applied. 57 skip_rejected : bool 58 If `True`, the filter will skip documents that are already rejected. 59 If you want to apply the filter to all documents (e.g., postprocess), set this to `False`. 60 random_state : Optional[Union[int, np.random.Generator]] 61 Seed for the random number generator. 62 If `None`, a new random number generator will be created. 63 If `None`, and use in the `Compose` class, the random state is shared with the `Compose` object. 64 use_batch : bool 65 If `True`, the filter will process documents in batches in the `apply_stream` method. 66 batch_size : int 67 The size of the batch to process documents in the `apply_stream` method. 68 kwargs : Any 69 Additional keyword arguments to pass to the filter. 70 """ 71 self.name = self.__class__.__name__ 72 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 73 assert 0 <= p <= 1 74 self.p = p 75 self.__init_rng(random_state) 76 self.skip_rejected = skip_rejected 77 self.use_batch = use_batch 78 self.batch_size = batch_size 79 80 self._statistics: Statistics = Statistics() 81 82 @abstractmethod 83 def apply(self, document: Document) -> Document: 84 """ 85 Definition of filter behavior. 86 87 The document must have a protocol `TextContent`, 88 and mostly used hojichar.Document class. 89 90 In this method, the filter will modify `document.text` or 91 `document.extras` and set `document.is_rejected = True` to discard the document. 92 93 Parameters 94 ---------- 95 document : Document 96 Input document 97 98 Returns 99 ------- 100 Document 101 Processed Document 102 """ 103 104 @deprecated_since(version="1.0.0", alternative="apply") 105 def apply_filter(self, document: Document) -> Document: 106 document = self.apply(document) 107 return document 108 109 def _check_skip(self, document: Document) -> bool: 110 """ 111 Check if the document should be skipped by this filter. 112 If `skip_rejected` is set to `True`, this method will return `True` 113 if the document is already rejected. 114 If `p` is less than 1, this method will return `True` with a probability of `1 - p`. 115 """ 116 skip = self.skip_rejected and document.is_rejected 117 if skip: 118 return True 119 if self.p < 1: 120 if self._rng.random() > self.p: 121 return True 122 return False 123 124 def _apply(self, document: Document) -> Document: 125 """ 126 Apply the filter to a single document. 127 This method 128 - checks if the document should be skipped 129 - counts and logging the statistics 130 - logging the reason for rejection if the document is rejected 131 132 This method may be used in `apply` method of `Compose` class. 133 """ 134 135 stats = get_doc_info(document) 136 137 if not self._check_skip(document): 138 document = self.apply(document) 139 140 new_stats = get_doc_info(document) 141 self._statistics.update_by_diff(stats, new_stats) 142 143 if not stats["is_rejected"] and new_stats["is_rejected"]: 144 document.reject_reason = self.get_jsonable_vars() 145 146 return document 147 148 def apply_batch(self, batch: Sequence[Document]) -> List[Document]: 149 """ 150 Apply the filter to a batch of documents. 151 You can override this method if you want to 152 apply the filter to a batch of documents at once. 153 154 This method may be used in `apply_batch` method of `Compose` class. 155 156 Parameters 157 ---------- 158 documents : Sequence[Document] 159 List-like object of input documents 160 161 Returns 162 ------- 163 list[Document] 164 List of processed documents 165 """ 166 return [self.apply(document) for document in batch] 167 168 def _apply_batch(self, batch: Sequence[Document]) -> List[Document]: 169 """ 170 Apply the filter to a batch of documents. 171 This method 172 - checks if the documents should be skipped 173 - counts and logs the statistics 174 - logs the reason for rejection if any document is rejected 175 """ 176 skip = False 177 if self.p < 1: 178 skip = self._rng.random() > self.p 179 180 stats = [get_doc_info(document=doc) for doc in batch] 181 if not skip: 182 batch = self.apply_batch(batch) 183 batch = self._finalize_batch(batch, stats) 184 return batch 185 186 def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]: 187 """ 188 Apply the filter to a stream of documents. 189 This method is used when you want to process documents one by one. 190 If `use_batch` is set to `True` in the constructor, 191 this method will process documents in batches using the `apply_batch` method. 192 193 Even if an exception occurs during processing, the process will continue, and the following actions will be taken: 194 - Set the `is_rejected` flag of the document to `True` 195 - Set the error details in `reject_reason` 196 - Increment the `errors` count in the statistics retrievable via `get_statistics` 197 198 Parameters 199 ---------- 200 stream : Iterable[Document] 201 Stream of input documents 202 203 Returns 204 ------- 205 Iterable[Document] 206 Stream of processed documents 207 """ 208 209 if not self.use_batch: 210 for document in stream: 211 yield self._try_process(document, self._apply) 212 else: 213 batch: list[Document] = [] 214 for document in stream: 215 if self._check_skip(document): 216 yield document 217 continue 218 219 batch.append(document) 220 if len(batch) >= self.batch_size: 221 stats = [get_doc_info(doc) for doc in batch] 222 batch = self._try_process(batch, self.apply_batch) 223 batch = self._finalize_batch(batch, stats) 224 yield from batch 225 batch.clear() 226 if batch: 227 stats = [get_doc_info(doc) for doc in batch] 228 batch = self._try_process(batch, self.apply_batch) 229 batch = self._finalize_batch(batch, stats) 230 yield from batch 231 232 def _try_process(self, target: T, func: Callable[[T], T]) -> T: 233 try: 234 return func(target) 235 except Exception as e: 236 if isinstance(target, Document): 237 msg = f"{e!r} occurs while processing {self.name} with {target!r}" 238 target.is_rejected = True 239 target.reject_reason = {"error": msg} 240 self._statistics.errors += 1 241 self.logger.error(msg, exc_info=True) 242 return target # type: ignore[return-value] 243 if isinstance(target, list): 244 msg = f"{e!r} occurs while batch processing {self.name}" 245 self.logger.error(msg, exc_info=True) 246 for doc in target: 247 doc.is_rejected = True 248 doc.reject_reason = {"error": msg} 249 self._statistics.errors += len(target) 250 return target # type: ignore[return-value] 251 else: 252 raise e 253 254 def __call__(self, text: str, **kwargs: Any) -> str: 255 document = Document(text, **kwargs) 256 document = self._apply(document) 257 return document.text 258 259 def get_statistics(self) -> Statistics: 260 """ 261 Get the statistics of this filter. 262 This method returns the statistics of the filter, 263 which includes the number of processed documents, discarded documents, and other statistics. 264 """ 265 return self._statistics 266 267 def get_statistics_map(self) -> Dict[str, Any]: 268 """ 269 Get the statistics of this filter as a dictionary. 270 """ 271 return self._statistics.to_dict() 272 273 def shutdown(self) -> None: 274 """ 275 This method is called when the filter is no longer needed. 276 You can override this method to release resources or perform cleanup tasks. 277 """ 278 pass 279 280 def __enter__(self) -> "Filter": 281 return self 282 283 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 284 """ 285 This method is called when the filter is used in a context manager. 286 It calls the `shutdown` method to release resources or perform cleanup tasks. 287 """ 288 self.shutdown() 289 290 def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> Dict[str, Any]: 291 """ 292 Get the member variable of this filter. 293 Eligible variables are primitive types; [bool, int, float, str, None], 294 and the name of the variable not starts with the underscore; `_`. 295 """ 296 if exclude_keys is None: 297 exclude_keys = set() 298 return { 299 k: v 300 for k, v in vars(self).items() 301 if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_"))) 302 } 303 304 def _finalize_batch( 305 self: "Filter", 306 batch: Sequence[Document], 307 old_stats: List[Dict[str, Any]] = [], 308 ) -> List[Document]: 309 new_stats = [get_doc_info(doc) for doc in batch] 310 for old, new, doc in zip(old_stats, new_stats, batch): 311 self._statistics.update_by_diff(old, new) 312 if not old["is_rejected"] and new["is_rejected"]: 313 doc.reject_reason = self.get_jsonable_vars() 314 return list(batch) 315 316 def __init_rng(self, random_state: Optional[Union[int, np.random.Generator]]) -> None: 317 self._owns_rng = True 318 if random_state is None: 319 self._rng = np.random.default_rng() 320 self._owns_rng = False 321 elif isinstance(random_state, int): 322 self._rng = np.random.default_rng(random_state) 323 elif isinstance(random_state, np.random.Generator): 324 self._rng = random_state 325 326 def _set_rng_if_not_initialized(self, rng: np.random.Generator) -> None: 327 """ 328 Set the random number generator for this filter if it is not already initialized. 329 This method is called by Compose class. 330 """ 331 if not self._owns_rng: 332 self._rng = rng
Base class for all filters. Document-level filters must inherit from this class.
The definition of text processing is in apply
method.
If you define a new filter, override the method.
When this class is called, apply the filter from string to string.
With context manager, you can use the filter as follows:
with YourFilter(p=0.5) as filt:
text = filt("This is a sample text.")
40 def __init__( 41 self, 42 p: float = 1.0, 43 skip_rejected: bool = True, 44 *args: Any, 45 random_state: Optional[Union[int, np.random.Generator]] = None, 46 use_batch: bool = False, 47 batch_size: int = 128, 48 **kwargs: Any, 49 ) -> None: 50 """ 51 Initialize the filter. 52 Parameters 53 ---------- 54 p : float 55 The probability of applying the filter. 56 If `p` is 1, the filter will always be applied. 57 skip_rejected : bool 58 If `True`, the filter will skip documents that are already rejected. 59 If you want to apply the filter to all documents (e.g., postprocess), set this to `False`. 60 random_state : Optional[Union[int, np.random.Generator]] 61 Seed for the random number generator. 62 If `None`, a new random number generator will be created. 63 If `None`, and use in the `Compose` class, the random state is shared with the `Compose` object. 64 use_batch : bool 65 If `True`, the filter will process documents in batches in the `apply_stream` method. 66 batch_size : int 67 The size of the batch to process documents in the `apply_stream` method. 68 kwargs : Any 69 Additional keyword arguments to pass to the filter. 70 """ 71 self.name = self.__class__.__name__ 72 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 73 assert 0 <= p <= 1 74 self.p = p 75 self.__init_rng(random_state) 76 self.skip_rejected = skip_rejected 77 self.use_batch = use_batch 78 self.batch_size = batch_size 79 80 self._statistics: Statistics = Statistics()
Initialize the filter.
Parameters
p : float
The probability of applying the filter.
If p
is 1, the filter will always be applied.
skip_rejected : bool
If True
, the filter will skip documents that are already rejected.
If you want to apply the filter to all documents (e.g., postprocess), set this to False
.
random_state : Optional[Union[int, np.random.Generator]]
Seed for the random number generator.
If None
, a new random number generator will be created.
If None
, and use in the Compose
class, the random state is shared with the Compose
object.
use_batch : bool
If True
, the filter will process documents in batches in the apply_stream
method.
batch_size : int
The size of the batch to process documents in the apply_stream
method.
kwargs : Any
Additional keyword arguments to pass to the filter.
82 @abstractmethod 83 def apply(self, document: Document) -> Document: 84 """ 85 Definition of filter behavior. 86 87 The document must have a protocol `TextContent`, 88 and mostly used hojichar.Document class. 89 90 In this method, the filter will modify `document.text` or 91 `document.extras` and set `document.is_rejected = True` to discard the document. 92 93 Parameters 94 ---------- 95 document : Document 96 Input document 97 98 Returns 99 ------- 100 Document 101 Processed Document 102 """
Definition of filter behavior.
The document must have a protocol TextContent
,
and mostly used hojichar.Document class.
In this method, the filter will modify document.text
or
document.extras
and set document.is_rejected = True
to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
148 def apply_batch(self, batch: Sequence[Document]) -> List[Document]: 149 """ 150 Apply the filter to a batch of documents. 151 You can override this method if you want to 152 apply the filter to a batch of documents at once. 153 154 This method may be used in `apply_batch` method of `Compose` class. 155 156 Parameters 157 ---------- 158 documents : Sequence[Document] 159 List-like object of input documents 160 161 Returns 162 ------- 163 list[Document] 164 List of processed documents 165 """ 166 return [self.apply(document) for document in batch]
Apply the filter to a batch of documents. You can override this method if you want to apply the filter to a batch of documents at once.
This method may be used in apply_batch
method of Compose
class.
Parameters
documents : Sequence[Document] List-like object of input documents
Returns
list[Document] List of processed documents
186 def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]: 187 """ 188 Apply the filter to a stream of documents. 189 This method is used when you want to process documents one by one. 190 If `use_batch` is set to `True` in the constructor, 191 this method will process documents in batches using the `apply_batch` method. 192 193 Even if an exception occurs during processing, the process will continue, and the following actions will be taken: 194 - Set the `is_rejected` flag of the document to `True` 195 - Set the error details in `reject_reason` 196 - Increment the `errors` count in the statistics retrievable via `get_statistics` 197 198 Parameters 199 ---------- 200 stream : Iterable[Document] 201 Stream of input documents 202 203 Returns 204 ------- 205 Iterable[Document] 206 Stream of processed documents 207 """ 208 209 if not self.use_batch: 210 for document in stream: 211 yield self._try_process(document, self._apply) 212 else: 213 batch: list[Document] = [] 214 for document in stream: 215 if self._check_skip(document): 216 yield document 217 continue 218 219 batch.append(document) 220 if len(batch) >= self.batch_size: 221 stats = [get_doc_info(doc) for doc in batch] 222 batch = self._try_process(batch, self.apply_batch) 223 batch = self._finalize_batch(batch, stats) 224 yield from batch 225 batch.clear() 226 if batch: 227 stats = [get_doc_info(doc) for doc in batch] 228 batch = self._try_process(batch, self.apply_batch) 229 batch = self._finalize_batch(batch, stats) 230 yield from batch
Apply the filter to a stream of documents.
This method is used when you want to process documents one by one.
If use_batch
is set to True
in the constructor,
this method will process documents in batches using the apply_batch
method.
Even if an exception occurs during processing, the process will continue, and the following actions will be taken:
- Set the
is_rejected
flag of the document toTrue
- Set the error details in
reject_reason
- Increment the
errors
count in the statistics retrievable viaget_statistics
Parameters
stream : Iterable[Document] Stream of input documents
Returns
Iterable[Document] Stream of processed documents
259 def get_statistics(self) -> Statistics: 260 """ 261 Get the statistics of this filter. 262 This method returns the statistics of the filter, 263 which includes the number of processed documents, discarded documents, and other statistics. 264 """ 265 return self._statistics
Get the statistics of this filter. This method returns the statistics of the filter, which includes the number of processed documents, discarded documents, and other statistics.
267 def get_statistics_map(self) -> Dict[str, Any]: 268 """ 269 Get the statistics of this filter as a dictionary. 270 """ 271 return self._statistics.to_dict()
Get the statistics of this filter as a dictionary.
273 def shutdown(self) -> None: 274 """ 275 This method is called when the filter is no longer needed. 276 You can override this method to release resources or perform cleanup tasks. 277 """ 278 pass
This method is called when the filter is no longer needed. You can override this method to release resources or perform cleanup tasks.
290 def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> Dict[str, Any]: 291 """ 292 Get the member variable of this filter. 293 Eligible variables are primitive types; [bool, int, float, str, None], 294 and the name of the variable not starts with the underscore; `_`. 295 """ 296 if exclude_keys is None: 297 exclude_keys = set() 298 return { 299 k: v 300 for k, v in vars(self).items() 301 if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_"))) 302 }
Get the member variable of this filter.
Eligible variables are primitive types; [bool, int, float, str, None],
and the name of the variable not starts with the underscore; _
.
335@deprecated_since(version="1.0.0", alternative="Filter") 336class TokenFilter(Filter, ABC): 337 """ 338 Base class for token-level filters. 339 340 Token filters, which shuld be implemented in hojichar/filters/token_filters.py, 341 must inherit from this class. 342 """ 343 344 def __init__( 345 self, p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any 346 ) -> None: 347 self.name = self.__class__.__name__ 348 self.logger = logging.getLogger("hojichar.token_filters." + self.name) 349 assert 0 <= p <= 1 350 self.p = p 351 self.skip_rejected = skip_rejected 352 353 def apply(self, token: Token) -> Token: # type: ignore 354 raise NotImplementedError(f"{self.__class__.__name__}.apply method is not defined") 355 return token 356 357 def apply_filter(self, document: Document) -> Document: 358 document.tokens = [self.apply(token) for token in document.tokens if not token.is_rejected] 359 return document 360 361 def __call__(self, text: str) -> str: # type: ignore 362 token = Token(text) 363 token = self.apply(token) 364 return token.text 365 366 def _apply(self, document: Document) -> Document: 367 """ 368 Apply the token filter to a single document. 369 This method checks if the document should be skipped. 370 """ 371 if self.skip_rejected and document.is_rejected: 372 return document 373 return self.apply_filter(document) 374 375 def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> dict: 376 """ 377 Get the member variable of this filter. 378 Eligible variables are primitive types; [bool, int, float, str, None], 379 and the name of the variable not starts with the underscore; `_`. 380 """ 381 if exclude_keys is None: 382 exclude_keys = set() 383 return { 384 k: v 385 for k, v in vars(self).items() 386 if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_"))) 387 }
Base class for token-level filters.
Token filters, which shuld be implemented in hojichar/filters/token_filters.py, must inherit from this class.
344 def __init__( 345 self, p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any 346 ) -> None: 347 self.name = self.__class__.__name__ 348 self.logger = logging.getLogger("hojichar.token_filters." + self.name) 349 assert 0 <= p <= 1 350 self.p = p 351 self.skip_rejected = skip_rejected
Initialize the filter.
Parameters
p : float
The probability of applying the filter.
If p
is 1, the filter will always be applied.
skip_rejected : bool
If True
, the filter will skip documents that are already rejected.
If you want to apply the filter to all documents (e.g., postprocess), set this to False
.
random_state : Optional[Union[int, np.random.Generator]]
Seed for the random number generator.
If None
, a new random number generator will be created.
If None
, and use in the Compose
class, the random state is shared with the Compose
object.
use_batch : bool
If True
, the filter will process documents in batches in the apply_stream
method.
batch_size : int
The size of the batch to process documents in the apply_stream
method.
kwargs : Any
Additional keyword arguments to pass to the filter.
353 def apply(self, token: Token) -> Token: # type: ignore 354 raise NotImplementedError(f"{self.__class__.__name__}.apply method is not defined") 355 return token
Definition of filter behavior.
The document must have a protocol TextContent
,
and mostly used hojichar.Document class.
In this method, the filter will modify document.text
or
document.extras
and set document.is_rejected = True
to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
375 def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> dict: 376 """ 377 Get the member variable of this filter. 378 Eligible variables are primitive types; [bool, int, float, str, None], 379 and the name of the variable not starts with the underscore; `_`. 380 """ 381 if exclude_keys is None: 382 exclude_keys = set() 383 return { 384 k: v 385 for k, v in vars(self).items() 386 if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_"))) 387 }
Get the member variable of this filter.
Eligible variables are primitive types; [bool, int, float, str, None],
and the name of the variable not starts with the underscore; _
.