hojichar.core.composition
1import json 2import logging 3import pprint 4from typing import Any, Dict, Iterable, List, Optional, Sequence, Union 5 6import numpy as np 7 8from hojichar.core import inspection 9from hojichar.core.filter_interface import Filter, TokenFilter 10from hojichar.core.models import Document, Statistics, get_doc_info 11from hojichar.utils.warn_deprecation import deprecated_since 12 13 14class Compose(Filter): 15 def __init__( 16 self, 17 filters: List[Union[Filter, TokenFilter]], 18 random_state: Optional[Union[int, np.random.Generator]] = None, 19 *args: Any, 20 **kwargs: Any, 21 ) -> None: 22 """ 23 Compose a filter from pre-defined filter-objects. 24 Filter which has `skip_rejected` flag ignores a document which has `is_rejected` flag. 25 By doing so, Compose avoid applying filters that do not affect the output. 26 27 Parameters 28 ---------- 29 filters : List[Union[Filter, TokenFilter]] 30 Filter instances which apply to the corpus. 31 32 random_state : Union[None, int, np.random.Generator], optional 33 Default = None 34 Seed for applying filters randomly. 35 `random_state` must be int or np.random.Generator instance. 36 """ 37 super().__init__(random_state=random_state, *args, **kwargs) 38 self.set_filters(filters) 39 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 40 41 self._statistics.name = "Total" 42 43 def set_filters(self, filters: List[Union[Filter, TokenFilter]]) -> None: 44 """ 45 Set the filter to a Compose object. The filter is expanded if the 46 list of filters in the argument contains a filter bound by Compose. 47 48 Args: 49 filters (List[Union[Filter, TokenFilter]]): Target filters 50 """ 51 self.filters: List[Union[Filter, TokenFilter]] = [] 52 53 filter_idx = 0 54 for f in filters: 55 if isinstance(f, Compose): 56 for sub in f.filters: 57 sub._set_rng_if_not_initialized(self._rng) 58 name = f"{filter_idx}-{sub.__class__.__name__}" 59 sub.name = name 60 sub._statistics.name = name 61 self.filters.append(sub) 62 filter_idx += 1 63 else: 64 f._set_rng_if_not_initialized(self._rng) 65 name = f"{filter_idx}-{f.__class__.__name__}" 66 f.name = name 67 f._statistics.name = name 68 self.filters.append(f) 69 filter_idx += 1 70 71 def __call__(self, text: str, **kwargs: Any) -> str: 72 """ 73 Apply the composed filter to a text and return the processed text. 74 If the document is rejected, return an empty string. 75 """ 76 document = Document(text, **kwargs) 77 document = self.apply(document) 78 if document.is_rejected: 79 return "" 80 else: 81 return document.text 82 83 def apply(self, document: Document) -> Document: 84 """ 85 Apply the composed filter to a document and return the processed document. 86 """ 87 stat = get_doc_info(document) 88 for i, filt in enumerate(self.filters): 89 document = filt._apply(document) 90 new_stat = get_doc_info(document) 91 self._statistics.update_by_diff(stat, new_stat) 92 return document 93 94 def apply_batch(self, batch: Sequence[Document]) -> List[Document]: 95 """ 96 Apply the composed filter to a batch of documents and return the processed documents. 97 The `apply_batch` method implemented in sub-filters is called in order. 98 """ 99 100 stats = [get_doc_info(doc) for doc in batch] 101 for i, filt in enumerate(self.filters): 102 batch = filt._apply_batch(batch) 103 batch = self._finalize_batch(batch, stats) 104 return list(batch) 105 106 def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]: 107 """ 108 Apply the composed filter to a stream of documents and return the processed documents. 109 The `apply_stream` method implemented in sub-filters is called in order. 110 111 112 In a sub-filter, if `apply_batch` is overridden and implemented, you need to set `use_batch` 113 to True at that filter to utilize that implementation. Otherwise, the 114 method implemented in `apply` will be applied to the stream. 115 """ 116 stream = self._count_input_stats(stream) 117 for i, filt in enumerate(self.filters): 118 stream = filt.apply_stream(stream) 119 120 for doc in stream: 121 in_stat = doc.extras["__init_stats"] 122 out_stat = get_doc_info(doc) 123 124 self._statistics.update_by_diff(in_stat, out_stat) 125 del doc.extras["__init_stats"] 126 yield doc 127 128 def _count_input_stats(self, stream: Iterable[Document]) -> Iterable[Document]: 129 for doc in stream: 130 doc.extras["__init_stats"] = get_doc_info(doc) 131 yield doc 132 133 def get_total_statistics(self) -> List[Statistics]: 134 """ 135 Get the statistics of the Compose object and sub filters. 136 137 The statistics of the Compose class are stored in an object with the name "Total", 138 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 139 """ 140 stats = [] 141 stats.append(self.get_statistics()) 142 for i, filt in enumerate(self.filters): 143 stats.append(filt.get_statistics()) 144 return stats 145 146 def get_total_statistics_map(self) -> List[Dict[str, Any]]: 147 """ 148 Get the statistics of the Compose object and sub filters as a list of dictionaries. 149 """ 150 stats = self.get_total_statistics() 151 return [stat.to_dict() for stat in stats] 152 153 def shutdown(self) -> None: 154 for f in self.filters: 155 f.shutdown() 156 157 super().shutdown() 158 159 @property 160 def statistics(self) -> dict: 161 """ 162 Deprecated 163 164 Get the statistics of the Compose object and sub filters. 165 166 This property is retained for compatibility with previous versions. 167 Please use `get_total_statistics` or `get_total_statistics_map` instead. 168 """ 169 return inspection.statistics_obj_adapter( # type: ignore 170 self.get_total_statistics() 171 ).get_human_readable_values() 172 173 @property 174 def statistics_obj(self) -> inspection.StatsContainer: 175 """ 176 Deprecated 177 178 Get the statistics of the Compose object and sub filters. 179 This method returns a StatsContainer object which contains the statistics 180 of the Compose object and sub filters. 181 182 This property is retained for compatibility with previous versions. 183 Please use `get_total_statistics` or `get_total_statistics_map` instead. 184 """ 185 return inspection.statistics_obj_adapter(self.get_total_statistics()) # type: ignore 186 187 @deprecated_since("1.0.0", "get_total_statistics") 188 def summary(self, format: str = "print") -> None: 189 info = [ 190 { 191 "layer": i, 192 "name": filt.name, 193 "doc": filt.__doc__, 194 } 195 for i, filt in enumerate(self.filters) 196 ] 197 198 def to_json(filter_info: dict) -> dict: 199 filter_info["doc"] = "".join(d.strip() for d in filter_info["doc"].split("\n")) 200 return filter_info 201 202 if format == "json": 203 print(json.dumps(list(map(to_json, info)), ensure_ascii=False, indent="\t")) 204 if format == "print": 205 for layer in info: 206 print(f"[{layer['layer']}] {layer['name']}") 207 pprint.pprint(layer["doc"])
15class Compose(Filter): 16 def __init__( 17 self, 18 filters: List[Union[Filter, TokenFilter]], 19 random_state: Optional[Union[int, np.random.Generator]] = None, 20 *args: Any, 21 **kwargs: Any, 22 ) -> None: 23 """ 24 Compose a filter from pre-defined filter-objects. 25 Filter which has `skip_rejected` flag ignores a document which has `is_rejected` flag. 26 By doing so, Compose avoid applying filters that do not affect the output. 27 28 Parameters 29 ---------- 30 filters : List[Union[Filter, TokenFilter]] 31 Filter instances which apply to the corpus. 32 33 random_state : Union[None, int, np.random.Generator], optional 34 Default = None 35 Seed for applying filters randomly. 36 `random_state` must be int or np.random.Generator instance. 37 """ 38 super().__init__(random_state=random_state, *args, **kwargs) 39 self.set_filters(filters) 40 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 41 42 self._statistics.name = "Total" 43 44 def set_filters(self, filters: List[Union[Filter, TokenFilter]]) -> None: 45 """ 46 Set the filter to a Compose object. The filter is expanded if the 47 list of filters in the argument contains a filter bound by Compose. 48 49 Args: 50 filters (List[Union[Filter, TokenFilter]]): Target filters 51 """ 52 self.filters: List[Union[Filter, TokenFilter]] = [] 53 54 filter_idx = 0 55 for f in filters: 56 if isinstance(f, Compose): 57 for sub in f.filters: 58 sub._set_rng_if_not_initialized(self._rng) 59 name = f"{filter_idx}-{sub.__class__.__name__}" 60 sub.name = name 61 sub._statistics.name = name 62 self.filters.append(sub) 63 filter_idx += 1 64 else: 65 f._set_rng_if_not_initialized(self._rng) 66 name = f"{filter_idx}-{f.__class__.__name__}" 67 f.name = name 68 f._statistics.name = name 69 self.filters.append(f) 70 filter_idx += 1 71 72 def __call__(self, text: str, **kwargs: Any) -> str: 73 """ 74 Apply the composed filter to a text and return the processed text. 75 If the document is rejected, return an empty string. 76 """ 77 document = Document(text, **kwargs) 78 document = self.apply(document) 79 if document.is_rejected: 80 return "" 81 else: 82 return document.text 83 84 def apply(self, document: Document) -> Document: 85 """ 86 Apply the composed filter to a document and return the processed document. 87 """ 88 stat = get_doc_info(document) 89 for i, filt in enumerate(self.filters): 90 document = filt._apply(document) 91 new_stat = get_doc_info(document) 92 self._statistics.update_by_diff(stat, new_stat) 93 return document 94 95 def apply_batch(self, batch: Sequence[Document]) -> List[Document]: 96 """ 97 Apply the composed filter to a batch of documents and return the processed documents. 98 The `apply_batch` method implemented in sub-filters is called in order. 99 """ 100 101 stats = [get_doc_info(doc) for doc in batch] 102 for i, filt in enumerate(self.filters): 103 batch = filt._apply_batch(batch) 104 batch = self._finalize_batch(batch, stats) 105 return list(batch) 106 107 def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]: 108 """ 109 Apply the composed filter to a stream of documents and return the processed documents. 110 The `apply_stream` method implemented in sub-filters is called in order. 111 112 113 In a sub-filter, if `apply_batch` is overridden and implemented, you need to set `use_batch` 114 to True at that filter to utilize that implementation. Otherwise, the 115 method implemented in `apply` will be applied to the stream. 116 """ 117 stream = self._count_input_stats(stream) 118 for i, filt in enumerate(self.filters): 119 stream = filt.apply_stream(stream) 120 121 for doc in stream: 122 in_stat = doc.extras["__init_stats"] 123 out_stat = get_doc_info(doc) 124 125 self._statistics.update_by_diff(in_stat, out_stat) 126 del doc.extras["__init_stats"] 127 yield doc 128 129 def _count_input_stats(self, stream: Iterable[Document]) -> Iterable[Document]: 130 for doc in stream: 131 doc.extras["__init_stats"] = get_doc_info(doc) 132 yield doc 133 134 def get_total_statistics(self) -> List[Statistics]: 135 """ 136 Get the statistics of the Compose object and sub filters. 137 138 The statistics of the Compose class are stored in an object with the name "Total", 139 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 140 """ 141 stats = [] 142 stats.append(self.get_statistics()) 143 for i, filt in enumerate(self.filters): 144 stats.append(filt.get_statistics()) 145 return stats 146 147 def get_total_statistics_map(self) -> List[Dict[str, Any]]: 148 """ 149 Get the statistics of the Compose object and sub filters as a list of dictionaries. 150 """ 151 stats = self.get_total_statistics() 152 return [stat.to_dict() for stat in stats] 153 154 def shutdown(self) -> None: 155 for f in self.filters: 156 f.shutdown() 157 158 super().shutdown() 159 160 @property 161 def statistics(self) -> dict: 162 """ 163 Deprecated 164 165 Get the statistics of the Compose object and sub filters. 166 167 This property is retained for compatibility with previous versions. 168 Please use `get_total_statistics` or `get_total_statistics_map` instead. 169 """ 170 return inspection.statistics_obj_adapter( # type: ignore 171 self.get_total_statistics() 172 ).get_human_readable_values() 173 174 @property 175 def statistics_obj(self) -> inspection.StatsContainer: 176 """ 177 Deprecated 178 179 Get the statistics of the Compose object and sub filters. 180 This method returns a StatsContainer object which contains the statistics 181 of the Compose object and sub filters. 182 183 This property is retained for compatibility with previous versions. 184 Please use `get_total_statistics` or `get_total_statistics_map` instead. 185 """ 186 return inspection.statistics_obj_adapter(self.get_total_statistics()) # type: ignore 187 188 @deprecated_since("1.0.0", "get_total_statistics") 189 def summary(self, format: str = "print") -> None: 190 info = [ 191 { 192 "layer": i, 193 "name": filt.name, 194 "doc": filt.__doc__, 195 } 196 for i, filt in enumerate(self.filters) 197 ] 198 199 def to_json(filter_info: dict) -> dict: 200 filter_info["doc"] = "".join(d.strip() for d in filter_info["doc"].split("\n")) 201 return filter_info 202 203 if format == "json": 204 print(json.dumps(list(map(to_json, info)), ensure_ascii=False, indent="\t")) 205 if format == "print": 206 for layer in info: 207 print(f"[{layer['layer']}] {layer['name']}") 208 pprint.pprint(layer["doc"])
Base class for all filters. Document-level filters must inherit from this class.
The definition of text processing is in apply
method.
If you define a new filter, override the method.
When this class is called, apply the filter from string to string.
With context manager, you can use the filter as follows:
with YourFilter(p=0.5) as filt:
text = filt("This is a sample text.")
16 def __init__( 17 self, 18 filters: List[Union[Filter, TokenFilter]], 19 random_state: Optional[Union[int, np.random.Generator]] = None, 20 *args: Any, 21 **kwargs: Any, 22 ) -> None: 23 """ 24 Compose a filter from pre-defined filter-objects. 25 Filter which has `skip_rejected` flag ignores a document which has `is_rejected` flag. 26 By doing so, Compose avoid applying filters that do not affect the output. 27 28 Parameters 29 ---------- 30 filters : List[Union[Filter, TokenFilter]] 31 Filter instances which apply to the corpus. 32 33 random_state : Union[None, int, np.random.Generator], optional 34 Default = None 35 Seed for applying filters randomly. 36 `random_state` must be int or np.random.Generator instance. 37 """ 38 super().__init__(random_state=random_state, *args, **kwargs) 39 self.set_filters(filters) 40 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 41 42 self._statistics.name = "Total"
Compose a filter from pre-defined filter-objects.
Filter which has skip_rejected
flag ignores a document which has is_rejected
flag.
By doing so, Compose avoid applying filters that do not affect the output.
Parameters
filters : List[Union[Filter, TokenFilter]] Filter instances which apply to the corpus.
random_state : Union[None, int, np.random.Generator], optional
Default = None
Seed for applying filters randomly.
random_state
must be int or np.random.Generator instance.
44 def set_filters(self, filters: List[Union[Filter, TokenFilter]]) -> None: 45 """ 46 Set the filter to a Compose object. The filter is expanded if the 47 list of filters in the argument contains a filter bound by Compose. 48 49 Args: 50 filters (List[Union[Filter, TokenFilter]]): Target filters 51 """ 52 self.filters: List[Union[Filter, TokenFilter]] = [] 53 54 filter_idx = 0 55 for f in filters: 56 if isinstance(f, Compose): 57 for sub in f.filters: 58 sub._set_rng_if_not_initialized(self._rng) 59 name = f"{filter_idx}-{sub.__class__.__name__}" 60 sub.name = name 61 sub._statistics.name = name 62 self.filters.append(sub) 63 filter_idx += 1 64 else: 65 f._set_rng_if_not_initialized(self._rng) 66 name = f"{filter_idx}-{f.__class__.__name__}" 67 f.name = name 68 f._statistics.name = name 69 self.filters.append(f) 70 filter_idx += 1
Set the filter to a Compose object. The filter is expanded if the list of filters in the argument contains a filter bound by Compose.
Args: filters (List[Union[Filter, TokenFilter]]): Target filters
84 def apply(self, document: Document) -> Document: 85 """ 86 Apply the composed filter to a document and return the processed document. 87 """ 88 stat = get_doc_info(document) 89 for i, filt in enumerate(self.filters): 90 document = filt._apply(document) 91 new_stat = get_doc_info(document) 92 self._statistics.update_by_diff(stat, new_stat) 93 return document
Apply the composed filter to a document and return the processed document.
95 def apply_batch(self, batch: Sequence[Document]) -> List[Document]: 96 """ 97 Apply the composed filter to a batch of documents and return the processed documents. 98 The `apply_batch` method implemented in sub-filters is called in order. 99 """ 100 101 stats = [get_doc_info(doc) for doc in batch] 102 for i, filt in enumerate(self.filters): 103 batch = filt._apply_batch(batch) 104 batch = self._finalize_batch(batch, stats) 105 return list(batch)
Apply the composed filter to a batch of documents and return the processed documents.
The apply_batch
method implemented in sub-filters is called in order.
107 def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]: 108 """ 109 Apply the composed filter to a stream of documents and return the processed documents. 110 The `apply_stream` method implemented in sub-filters is called in order. 111 112 113 In a sub-filter, if `apply_batch` is overridden and implemented, you need to set `use_batch` 114 to True at that filter to utilize that implementation. Otherwise, the 115 method implemented in `apply` will be applied to the stream. 116 """ 117 stream = self._count_input_stats(stream) 118 for i, filt in enumerate(self.filters): 119 stream = filt.apply_stream(stream) 120 121 for doc in stream: 122 in_stat = doc.extras["__init_stats"] 123 out_stat = get_doc_info(doc) 124 125 self._statistics.update_by_diff(in_stat, out_stat) 126 del doc.extras["__init_stats"] 127 yield doc
Apply the composed filter to a stream of documents and return the processed documents.
The apply_stream
method implemented in sub-filters is called in order.
In a sub-filter, if apply_batch
is overridden and implemented, you need to set use_batch
to True at that filter to utilize that implementation. Otherwise, the
method implemented in apply
will be applied to the stream.
134 def get_total_statistics(self) -> List[Statistics]: 135 """ 136 Get the statistics of the Compose object and sub filters. 137 138 The statistics of the Compose class are stored in an object with the name "Total", 139 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 140 """ 141 stats = [] 142 stats.append(self.get_statistics()) 143 for i, filt in enumerate(self.filters): 144 stats.append(filt.get_statistics()) 145 return stats
Get the statistics of the Compose object and sub filters.
The statistics of the Compose class are stored in an object with the name "Total", and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
147 def get_total_statistics_map(self) -> List[Dict[str, Any]]: 148 """ 149 Get the statistics of the Compose object and sub filters as a list of dictionaries. 150 """ 151 stats = self.get_total_statistics() 152 return [stat.to_dict() for stat in stats]
Get the statistics of the Compose object and sub filters as a list of dictionaries.
154 def shutdown(self) -> None: 155 for f in self.filters: 156 f.shutdown() 157 158 super().shutdown()
This method is called when the filter is no longer needed. You can override this method to release resources or perform cleanup tasks.
Deprecated
Get the statistics of the Compose object and sub filters.
This property is retained for compatibility with previous versions.
Please use get_total_statistics
or get_total_statistics_map
instead.
Deprecated
Get the statistics of the Compose object and sub filters. This method returns a StatsContainer object which contains the statistics of the Compose object and sub filters.
This property is retained for compatibility with previous versions.
Please use get_total_statistics
or get_total_statistics_map
instead.
188 @deprecated_since("1.0.0", "get_total_statistics") 189 def summary(self, format: str = "print") -> None: 190 info = [ 191 { 192 "layer": i, 193 "name": filt.name, 194 "doc": filt.__doc__, 195 } 196 for i, filt in enumerate(self.filters) 197 ] 198 199 def to_json(filter_info: dict) -> dict: 200 filter_info["doc"] = "".join(d.strip() for d in filter_info["doc"].split("\n")) 201 return filter_info 202 203 if format == "json": 204 print(json.dumps(list(map(to_json, info)), ensure_ascii=False, indent="\t")) 205 if format == "print": 206 for layer in info: 207 print(f"[{layer['layer']}] {layer['name']}") 208 pprint.pprint(layer["doc"])