hojichar.core.composition
1import json 2import logging 3import pprint 4from typing import Any, Dict, Iterable, List, Optional, Sequence, Union 5 6import numpy as np 7 8from hojichar.core import inspection 9from hojichar.core.filter_interface import Filter, TokenFilter 10from hojichar.core.models import Document, Statistics, get_doc_info 11from hojichar.utils.warn_deprecation import deprecated_since 12 13 14class Compose(Filter): 15 def __init__( 16 self, 17 filters: List[Union[Filter, TokenFilter]], 18 random_state: Optional[Union[int, np.random.Generator]] = None, 19 *args: Any, 20 **kwargs: Any, 21 ) -> None: 22 """ 23 Compose a filter from pre-defined filter-objects. 24 Filter which has `skip_rejected` flag ignores a document which has `is_rejected` flag. 25 By doing so, Compose avoid applying filters that do not affect the output. 26 27 Parameters 28 ---------- 29 filters : List[Union[Filter, TokenFilter]] 30 Filter instances which apply to the corpus. 31 32 random_state : Union[None, int, np.random.Generator], optional 33 Default = None 34 Seed for applying filters randomly. 35 `random_state` must be int or np.random.Generator instance. 36 """ 37 super().__init__(random_state=random_state, *args, **kwargs) 38 self.set_filters(filters) 39 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 40 41 self._statistics.name = "Total" 42 43 def set_filters(self, filters: List[Union[Filter, TokenFilter]]) -> None: 44 """ 45 Set the filter to a Compose object. The filter is expanded if the 46 list of filters in the argument contains a filter bound by Compose. 47 48 Args: 49 filters (List[Union[Filter, TokenFilter]]): Target filters 50 """ 51 self.filters: List[Union[Filter, TokenFilter]] = [] 52 53 filter_idx = 0 54 for f in filters: 55 if isinstance(f, Compose): 56 for sub in f.filters: 57 sub._set_rng_if_not_initialized(self._rng) 58 name = f"{filter_idx}-{sub.__class__.__name__}" 59 sub.name = name 60 sub._statistics.name = name 61 self.filters.append(sub) 62 filter_idx += 1 63 else: 64 f._set_rng_if_not_initialized(self._rng) 65 name = f"{filter_idx}-{f.__class__.__name__}" 66 f.name = name 67 f._statistics.name = name 68 self.filters.append(f) 69 filter_idx += 1 70 71 def __call__(self, text: str, **kwargs: Any) -> str: 72 """ 73 Apply the composed filter to a text and return the processed text. 74 If the document is rejected, return an empty string. 75 """ 76 document = Document(text, **kwargs) 77 document = self.apply(document) 78 if document.is_rejected: 79 return "" 80 else: 81 return document.text 82 83 def apply(self, document: Document) -> Document: 84 """ 85 Apply the composed filter to a document and return the processed document. 86 """ 87 stat = get_doc_info(document) 88 for i, filt in enumerate(self.filters): 89 document = filt._apply(document) 90 new_stat = get_doc_info(document) 91 self._statistics.update_by_diff(stat, new_stat) 92 return document 93 94 def apply_batch(self, batch: Sequence[Document]) -> List[Document]: 95 """ 96 Apply the composed filter to a batch of documents and return the processed documents. 97 The `apply_batch` method implemented in sub-filters is called in order. 98 """ 99 100 stats = [get_doc_info(doc) for doc in batch] 101 for i, filt in enumerate(self.filters): 102 batch = filt._apply_batch(batch) 103 batch = self._finalize_batch(batch, stats) 104 return list(batch) 105 106 def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]: 107 """ 108 Apply the composed filter to a stream of documents and return the processed documents. 109 The `apply_stream` method implemented in sub-filters is called in order. 110 111 112 In a sub-filter, if `apply_batch` is overridden and implemented, you need to set `use_batch` 113 to True at that filter to utilize that implementation. Otherwise, the 114 method implemented in `apply` will be applied to the stream. 115 """ 116 stream = self._count_input_stats(stream) 117 for i, filt in enumerate(self.filters): 118 stream = filt.apply_stream(stream) 119 120 for doc in stream: 121 in_stat = doc._get_initial_stats() 122 if in_stat is None: 123 in_stat = get_doc_info(doc) 124 self.logger.debug( 125 "Initial stats missing for document during stream aggregation; " 126 "using current stats as fallback" 127 ) 128 out_stat = get_doc_info(doc) 129 130 self._statistics.update_by_diff(in_stat, out_stat) 131 doc._clear_initial_stats() 132 yield doc 133 134 def _count_input_stats(self, stream: Iterable[Document]) -> Iterable[Document]: 135 for doc in stream: 136 doc._set_initial_stats(get_doc_info(doc)) 137 yield doc 138 139 def get_total_statistics(self) -> List[Statistics]: 140 """ 141 Get the statistics of the Compose object and sub filters. 142 143 The statistics of the Compose class are stored in an object with the name "Total", 144 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 145 """ 146 stats = [] 147 stats.append(self.get_statistics()) 148 for i, filt in enumerate(self.filters): 149 stats.append(filt.get_statistics()) 150 return stats 151 152 def get_total_statistics_map(self) -> List[Dict[str, Any]]: 153 """ 154 Get the statistics of the Compose object and sub filters as a list of dictionaries. 155 """ 156 stats = self.get_total_statistics() 157 return [stat.to_dict() for stat in stats] 158 159 def shutdown(self) -> None: 160 for f in self.filters: 161 f.shutdown() 162 163 super().shutdown() 164 165 @property 166 def statistics(self) -> dict: 167 """ 168 Deprecated 169 170 Get the statistics of the Compose object and sub filters. 171 172 This property is retained for compatibility with previous versions. 173 Please use `get_total_statistics` or `get_total_statistics_map` instead. 174 """ 175 return inspection.statistics_obj_adapter( # type: ignore 176 self.get_total_statistics() 177 ).get_human_readable_values() 178 179 @property 180 def statistics_obj(self) -> inspection.StatsContainer: 181 """ 182 Deprecated 183 184 Get the statistics of the Compose object and sub filters. 185 This method returns a StatsContainer object which contains the statistics 186 of the Compose object and sub filters. 187 188 This property is retained for compatibility with previous versions. 189 Please use `get_total_statistics` or `get_total_statistics_map` instead. 190 """ 191 return inspection.statistics_obj_adapter(self.get_total_statistics()) # type: ignore 192 193 @deprecated_since("1.0.0", "get_total_statistics") 194 def summary(self, format: str = "print") -> None: 195 info = [ 196 { 197 "layer": i, 198 "name": filt.name, 199 "doc": filt.__doc__, 200 } 201 for i, filt in enumerate(self.filters) 202 ] 203 204 def to_json(filter_info: dict) -> dict: 205 filter_info["doc"] = "".join(d.strip() for d in filter_info["doc"].split("\n")) 206 return filter_info 207 208 if format == "json": 209 print(json.dumps(list(map(to_json, info)), ensure_ascii=False, indent="\t")) 210 if format == "print": 211 for layer in info: 212 print(f"[{layer['layer']}] {layer['name']}") 213 pprint.pprint(layer["doc"])
15class Compose(Filter): 16 def __init__( 17 self, 18 filters: List[Union[Filter, TokenFilter]], 19 random_state: Optional[Union[int, np.random.Generator]] = None, 20 *args: Any, 21 **kwargs: Any, 22 ) -> None: 23 """ 24 Compose a filter from pre-defined filter-objects. 25 Filter which has `skip_rejected` flag ignores a document which has `is_rejected` flag. 26 By doing so, Compose avoid applying filters that do not affect the output. 27 28 Parameters 29 ---------- 30 filters : List[Union[Filter, TokenFilter]] 31 Filter instances which apply to the corpus. 32 33 random_state : Union[None, int, np.random.Generator], optional 34 Default = None 35 Seed for applying filters randomly. 36 `random_state` must be int or np.random.Generator instance. 37 """ 38 super().__init__(random_state=random_state, *args, **kwargs) 39 self.set_filters(filters) 40 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 41 42 self._statistics.name = "Total" 43 44 def set_filters(self, filters: List[Union[Filter, TokenFilter]]) -> None: 45 """ 46 Set the filter to a Compose object. The filter is expanded if the 47 list of filters in the argument contains a filter bound by Compose. 48 49 Args: 50 filters (List[Union[Filter, TokenFilter]]): Target filters 51 """ 52 self.filters: List[Union[Filter, TokenFilter]] = [] 53 54 filter_idx = 0 55 for f in filters: 56 if isinstance(f, Compose): 57 for sub in f.filters: 58 sub._set_rng_if_not_initialized(self._rng) 59 name = f"{filter_idx}-{sub.__class__.__name__}" 60 sub.name = name 61 sub._statistics.name = name 62 self.filters.append(sub) 63 filter_idx += 1 64 else: 65 f._set_rng_if_not_initialized(self._rng) 66 name = f"{filter_idx}-{f.__class__.__name__}" 67 f.name = name 68 f._statistics.name = name 69 self.filters.append(f) 70 filter_idx += 1 71 72 def __call__(self, text: str, **kwargs: Any) -> str: 73 """ 74 Apply the composed filter to a text and return the processed text. 75 If the document is rejected, return an empty string. 76 """ 77 document = Document(text, **kwargs) 78 document = self.apply(document) 79 if document.is_rejected: 80 return "" 81 else: 82 return document.text 83 84 def apply(self, document: Document) -> Document: 85 """ 86 Apply the composed filter to a document and return the processed document. 87 """ 88 stat = get_doc_info(document) 89 for i, filt in enumerate(self.filters): 90 document = filt._apply(document) 91 new_stat = get_doc_info(document) 92 self._statistics.update_by_diff(stat, new_stat) 93 return document 94 95 def apply_batch(self, batch: Sequence[Document]) -> List[Document]: 96 """ 97 Apply the composed filter to a batch of documents and return the processed documents. 98 The `apply_batch` method implemented in sub-filters is called in order. 99 """ 100 101 stats = [get_doc_info(doc) for doc in batch] 102 for i, filt in enumerate(self.filters): 103 batch = filt._apply_batch(batch) 104 batch = self._finalize_batch(batch, stats) 105 return list(batch) 106 107 def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]: 108 """ 109 Apply the composed filter to a stream of documents and return the processed documents. 110 The `apply_stream` method implemented in sub-filters is called in order. 111 112 113 In a sub-filter, if `apply_batch` is overridden and implemented, you need to set `use_batch` 114 to True at that filter to utilize that implementation. Otherwise, the 115 method implemented in `apply` will be applied to the stream. 116 """ 117 stream = self._count_input_stats(stream) 118 for i, filt in enumerate(self.filters): 119 stream = filt.apply_stream(stream) 120 121 for doc in stream: 122 in_stat = doc._get_initial_stats() 123 if in_stat is None: 124 in_stat = get_doc_info(doc) 125 self.logger.debug( 126 "Initial stats missing for document during stream aggregation; " 127 "using current stats as fallback" 128 ) 129 out_stat = get_doc_info(doc) 130 131 self._statistics.update_by_diff(in_stat, out_stat) 132 doc._clear_initial_stats() 133 yield doc 134 135 def _count_input_stats(self, stream: Iterable[Document]) -> Iterable[Document]: 136 for doc in stream: 137 doc._set_initial_stats(get_doc_info(doc)) 138 yield doc 139 140 def get_total_statistics(self) -> List[Statistics]: 141 """ 142 Get the statistics of the Compose object and sub filters. 143 144 The statistics of the Compose class are stored in an object with the name "Total", 145 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 146 """ 147 stats = [] 148 stats.append(self.get_statistics()) 149 for i, filt in enumerate(self.filters): 150 stats.append(filt.get_statistics()) 151 return stats 152 153 def get_total_statistics_map(self) -> List[Dict[str, Any]]: 154 """ 155 Get the statistics of the Compose object and sub filters as a list of dictionaries. 156 """ 157 stats = self.get_total_statistics() 158 return [stat.to_dict() for stat in stats] 159 160 def shutdown(self) -> None: 161 for f in self.filters: 162 f.shutdown() 163 164 super().shutdown() 165 166 @property 167 def statistics(self) -> dict: 168 """ 169 Deprecated 170 171 Get the statistics of the Compose object and sub filters. 172 173 This property is retained for compatibility with previous versions. 174 Please use `get_total_statistics` or `get_total_statistics_map` instead. 175 """ 176 return inspection.statistics_obj_adapter( # type: ignore 177 self.get_total_statistics() 178 ).get_human_readable_values() 179 180 @property 181 def statistics_obj(self) -> inspection.StatsContainer: 182 """ 183 Deprecated 184 185 Get the statistics of the Compose object and sub filters. 186 This method returns a StatsContainer object which contains the statistics 187 of the Compose object and sub filters. 188 189 This property is retained for compatibility with previous versions. 190 Please use `get_total_statistics` or `get_total_statistics_map` instead. 191 """ 192 return inspection.statistics_obj_adapter(self.get_total_statistics()) # type: ignore 193 194 @deprecated_since("1.0.0", "get_total_statistics") 195 def summary(self, format: str = "print") -> None: 196 info = [ 197 { 198 "layer": i, 199 "name": filt.name, 200 "doc": filt.__doc__, 201 } 202 for i, filt in enumerate(self.filters) 203 ] 204 205 def to_json(filter_info: dict) -> dict: 206 filter_info["doc"] = "".join(d.strip() for d in filter_info["doc"].split("\n")) 207 return filter_info 208 209 if format == "json": 210 print(json.dumps(list(map(to_json, info)), ensure_ascii=False, indent="\t")) 211 if format == "print": 212 for layer in info: 213 print(f"[{layer['layer']}] {layer['name']}") 214 pprint.pprint(layer["doc"])
Base class for all filters. Document-level filters must inherit from this class.
The definition of text processing is in apply method.
If you define a new filter, override the method.
When this class is called, apply the filter from string to string.
With context manager, you can use the filter as follows:
with YourFilter(p=0.5) as filt:
text = filt("This is a sample text.")
16 def __init__( 17 self, 18 filters: List[Union[Filter, TokenFilter]], 19 random_state: Optional[Union[int, np.random.Generator]] = None, 20 *args: Any, 21 **kwargs: Any, 22 ) -> None: 23 """ 24 Compose a filter from pre-defined filter-objects. 25 Filter which has `skip_rejected` flag ignores a document which has `is_rejected` flag. 26 By doing so, Compose avoid applying filters that do not affect the output. 27 28 Parameters 29 ---------- 30 filters : List[Union[Filter, TokenFilter]] 31 Filter instances which apply to the corpus. 32 33 random_state : Union[None, int, np.random.Generator], optional 34 Default = None 35 Seed for applying filters randomly. 36 `random_state` must be int or np.random.Generator instance. 37 """ 38 super().__init__(random_state=random_state, *args, **kwargs) 39 self.set_filters(filters) 40 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 41 42 self._statistics.name = "Total"
Compose a filter from pre-defined filter-objects.
Filter which has skip_rejected flag ignores a document which has is_rejected flag.
By doing so, Compose avoid applying filters that do not affect the output.
Parameters
filters : List[Union[Filter, TokenFilter]] Filter instances which apply to the corpus.
random_state : Union[None, int, np.random.Generator], optional
Default = None
Seed for applying filters randomly.
random_state must be int or np.random.Generator instance.
44 def set_filters(self, filters: List[Union[Filter, TokenFilter]]) -> None: 45 """ 46 Set the filter to a Compose object. The filter is expanded if the 47 list of filters in the argument contains a filter bound by Compose. 48 49 Args: 50 filters (List[Union[Filter, TokenFilter]]): Target filters 51 """ 52 self.filters: List[Union[Filter, TokenFilter]] = [] 53 54 filter_idx = 0 55 for f in filters: 56 if isinstance(f, Compose): 57 for sub in f.filters: 58 sub._set_rng_if_not_initialized(self._rng) 59 name = f"{filter_idx}-{sub.__class__.__name__}" 60 sub.name = name 61 sub._statistics.name = name 62 self.filters.append(sub) 63 filter_idx += 1 64 else: 65 f._set_rng_if_not_initialized(self._rng) 66 name = f"{filter_idx}-{f.__class__.__name__}" 67 f.name = name 68 f._statistics.name = name 69 self.filters.append(f) 70 filter_idx += 1
Set the filter to a Compose object. The filter is expanded if the list of filters in the argument contains a filter bound by Compose.
Args: filters (List[Union[Filter, TokenFilter]]): Target filters
84 def apply(self, document: Document) -> Document: 85 """ 86 Apply the composed filter to a document and return the processed document. 87 """ 88 stat = get_doc_info(document) 89 for i, filt in enumerate(self.filters): 90 document = filt._apply(document) 91 new_stat = get_doc_info(document) 92 self._statistics.update_by_diff(stat, new_stat) 93 return document
Apply the composed filter to a document and return the processed document.
95 def apply_batch(self, batch: Sequence[Document]) -> List[Document]: 96 """ 97 Apply the composed filter to a batch of documents and return the processed documents. 98 The `apply_batch` method implemented in sub-filters is called in order. 99 """ 100 101 stats = [get_doc_info(doc) for doc in batch] 102 for i, filt in enumerate(self.filters): 103 batch = filt._apply_batch(batch) 104 batch = self._finalize_batch(batch, stats) 105 return list(batch)
Apply the composed filter to a batch of documents and return the processed documents.
The apply_batch method implemented in sub-filters is called in order.
107 def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]: 108 """ 109 Apply the composed filter to a stream of documents and return the processed documents. 110 The `apply_stream` method implemented in sub-filters is called in order. 111 112 113 In a sub-filter, if `apply_batch` is overridden and implemented, you need to set `use_batch` 114 to True at that filter to utilize that implementation. Otherwise, the 115 method implemented in `apply` will be applied to the stream. 116 """ 117 stream = self._count_input_stats(stream) 118 for i, filt in enumerate(self.filters): 119 stream = filt.apply_stream(stream) 120 121 for doc in stream: 122 in_stat = doc._get_initial_stats() 123 if in_stat is None: 124 in_stat = get_doc_info(doc) 125 self.logger.debug( 126 "Initial stats missing for document during stream aggregation; " 127 "using current stats as fallback" 128 ) 129 out_stat = get_doc_info(doc) 130 131 self._statistics.update_by_diff(in_stat, out_stat) 132 doc._clear_initial_stats() 133 yield doc
Apply the composed filter to a stream of documents and return the processed documents.
The apply_stream method implemented in sub-filters is called in order.
In a sub-filter, if apply_batch is overridden and implemented, you need to set use_batch
to True at that filter to utilize that implementation. Otherwise, the
method implemented in apply will be applied to the stream.
140 def get_total_statistics(self) -> List[Statistics]: 141 """ 142 Get the statistics of the Compose object and sub filters. 143 144 The statistics of the Compose class are stored in an object with the name "Total", 145 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 146 """ 147 stats = [] 148 stats.append(self.get_statistics()) 149 for i, filt in enumerate(self.filters): 150 stats.append(filt.get_statistics()) 151 return stats
Get the statistics of the Compose object and sub filters.
The statistics of the Compose class are stored in an object with the name "Total", and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
153 def get_total_statistics_map(self) -> List[Dict[str, Any]]: 154 """ 155 Get the statistics of the Compose object and sub filters as a list of dictionaries. 156 """ 157 stats = self.get_total_statistics() 158 return [stat.to_dict() for stat in stats]
Get the statistics of the Compose object and sub filters as a list of dictionaries.
160 def shutdown(self) -> None: 161 for f in self.filters: 162 f.shutdown() 163 164 super().shutdown()
This method is called when the filter is no longer needed. You can override this method to release resources or perform cleanup tasks.
Deprecated
Get the statistics of the Compose object and sub filters.
This property is retained for compatibility with previous versions.
Please use get_total_statistics or get_total_statistics_map instead.
Deprecated
Get the statistics of the Compose object and sub filters. This method returns a StatsContainer object which contains the statistics of the Compose object and sub filters.
This property is retained for compatibility with previous versions.
Please use get_total_statistics or get_total_statistics_map instead.
194 @deprecated_since("1.0.0", "get_total_statistics") 195 def summary(self, format: str = "print") -> None: 196 info = [ 197 { 198 "layer": i, 199 "name": filt.name, 200 "doc": filt.__doc__, 201 } 202 for i, filt in enumerate(self.filters) 203 ] 204 205 def to_json(filter_info: dict) -> dict: 206 filter_info["doc"] = "".join(d.strip() for d in filter_info["doc"].split("\n")) 207 return filter_info 208 209 if format == "json": 210 print(json.dumps(list(map(to_json, info)), ensure_ascii=False, indent="\t")) 211 if format == "print": 212 for layer in info: 213 print(f"[{layer['layer']}] {layer['name']}") 214 pprint.pprint(layer["doc"])