hojichar.core.inspection
1from __future__ import annotations 2 3import dataclasses 4import logging 5import time 6from typing import Any, Dict, List, Union 7 8from hojichar.core.filter_interface import Filter, TokenFilter 9from hojichar.core.models import Document, Statistics 10from hojichar.utils.warn_deprecation import deprecated_since 11 12logger = logging.getLogger(__name__) 13 14 15class Inspector(Filter): 16 def __init__( 17 self, target_filter: Union[Filter, TokenFilter], filter_idx: int, *args: Any, **kwargs: Any 18 ) -> None: 19 super().__init__(*args, **kwargs) 20 self.logger = logging.getLogger("hojichar.Inspector") 21 self.target_filter = target_filter 22 self.filter_idx = filter_idx 23 self.target = f"{filter_idx}-{target_filter.name}" 24 25 self.is_rejected = False 26 self.text_hash = 0 27 self.tokens_hash = 0 28 29 def apply(self, document: Document) -> Document: 30 self.inspect(document) 31 return document 32 33 def inspect(self, document: Document) -> None: 34 self.is_rejected = False 35 self.is_rejected = document.is_rejected 36 self.bytes = len(document.text.encode("utf-8")) 37 self.time_ns = time.perf_counter_ns() 38 39 40@dataclasses.dataclass 41class FilterStatistics: 42 name: str 43 discard_num: int = 0 44 diff_bytes: int = 0 45 cumulative_time_ns: int = 0 46 params: Dict[str, Any] = dataclasses.field(default_factory=dict) 47 48 def get_human_readable_values(self) -> dict: 49 ret = { 50 "name": self.name, 51 "discard_num": self.discard_num, 52 "diff_MB": (self.diff_bytes / 1048576), # 1024**2 53 "cumulative_time": (self.cumulative_time_ns / 10**9), 54 "params": self.params, 55 } 56 return ret 57 58 def __add__(self, other: FilterStatistics) -> FilterStatistics: 59 assert self.name == other.name, "Layer names must match" 60 return FilterStatistics( 61 self.name, 62 self.discard_num + other.discard_num, 63 self.diff_bytes + other.diff_bytes, 64 self.cumulative_time_ns + other.cumulative_time_ns, 65 self.params, 66 ) 67 68 def reset(self) -> FilterStatistics: 69 self.discard_num = 0 70 self.diff_bytes = 0 71 self.cumulative_time_ns = 0 72 return self 73 74 75@dataclasses.dataclass 76class DocStatistics: 77 processed_num: int = 0 78 discard_num: int = 0 79 input_bytes: int = 0 80 output_bytes: int = 0 81 cumulative_time_ns: int = 0 82 total_token_num: int = 0 83 84 def get_human_readable_values(self) -> dict: 85 ret = { 86 "processed_num": self.processed_num, 87 "discard_num": self.discard_num, 88 "input_MB": (self.input_bytes / 1000**2), 89 "output_MB": (self.output_bytes / 1000**2), 90 "cumulative_time": (self.cumulative_time_ns / 10**9), 91 "total_token_num": self.total_token_num, 92 } 93 return ret 94 95 def __add__(self, other: DocStatistics) -> DocStatistics: 96 return DocStatistics( 97 self.processed_num + other.processed_num, 98 self.discard_num + other.discard_num, 99 self.input_bytes + other.input_bytes, 100 self.output_bytes + other.output_bytes, 101 self.cumulative_time_ns + other.cumulative_time_ns, 102 self.total_token_num + other.total_token_num, 103 ) 104 105 def reset(self) -> DocStatistics: 106 self.processed_num = 0 107 self.discard_num = 0 108 self.input_bytes = 0 109 self.output_bytes = 0 110 self.cumulative_time_ns = 0 111 self.total_token_num = 0 112 return self 113 114 115@dataclasses.dataclass 116class StatsContainer: 117 total_info: DocStatistics 118 layers_info: Dict[str, FilterStatistics] # Key of the dict is filter name. 119 120 def __add__(self, other: StatsContainer) -> StatsContainer: 121 assert self.layers_info.keys() == other.layers_info.keys(), "Layer names must match" 122 return StatsContainer( 123 self.total_info + other.total_info, 124 {k: v + other.layers_info[k] for k, v in self.layers_info.items()}, 125 ) 126 127 def get_human_readable_values(self) -> dict: 128 return { 129 "total_info": self.total_info.get_human_readable_values(), 130 "layers_info": [ 131 layer.get_human_readable_values() for layer in self.layers_info.values() 132 ], 133 } 134 135 def reset(self) -> StatsContainer: 136 self.total_info.reset 137 for layer in self.layers_info.values(): 138 layer.reset() 139 return self 140 141 142class StatisticsCounter: 143 def __init__(self, inspectors: List[Inspector]) -> None: 144 counts = dict() 145 for inspector in inspectors: 146 counts[inspector.target] = FilterStatistics( 147 name=inspector.target, 148 params=inspector.target_filter.get_jsonable_vars(), 149 ) 150 self.stats = StatsContainer( 151 DocStatistics(), 152 counts, 153 ) 154 155 def update_changes( 156 self, 157 document: Document, 158 before_process_inspector: Inspector, 159 inspectors: List[Inspector], 160 ) -> None: 161 # Counting statistics for each filter 162 previous_inspector = before_process_inspector 163 for idx, inspector in enumerate(inspectors): 164 # Logging how many docs are discarded in each filter 165 if (not previous_inspector.is_rejected) and inspector.is_rejected: 166 self.stats.layers_info[inspector.target].discard_num += 1 167 168 # logging how much volume of docs are changed in each filter. 169 if (not previous_inspector.is_rejected) and inspector.is_rejected: 170 diff_bytes = -inspector.bytes 171 elif previous_inspector.is_rejected and inspector.is_rejected: 172 diff_bytes = 0 173 else: 174 diff_bytes = inspector.bytes - previous_inspector.bytes 175 176 self.stats.layers_info[inspector.target].diff_bytes += diff_bytes 177 178 process_time_ns = inspector.time_ns - previous_inspector.time_ns 179 self.stats.layers_info[inspector.target].cumulative_time_ns += process_time_ns 180 181 previous_inspector = inspector 182 183 # Counting total statistics 184 self.stats.total_info.processed_num += 1 185 self.stats.total_info.discard_num += ( 186 1 if any([inspector.is_rejected for inspector in inspectors]) > 0 else 0 187 ) 188 self.stats.total_info.input_bytes += len(document.original.encode("utf-8")) 189 self.stats.total_info.output_bytes += ( 190 0 if document.is_rejected else len(document.text.encode("utf-8")) 191 ) 192 self.stats.total_info.cumulative_time_ns += inspectors[-1].time_ns - inspectors[0].time_ns 193 self.stats.total_info.total_token_num += len(document.tokens) 194 195 def get_statistics(self) -> dict: 196 return self.stats.get_human_readable_values() 197 198 199@deprecated_since("1.0.0", "Compose.get_total_statistics") 200def statistics_obj_adapter(stats: List["Statistics"]) -> StatsContainer: 201 total = Statistics.get_filter("Total", stats) 202 total_info = DocStatistics( 203 processed_num=total.input_num, 204 discard_num=total.discard_num, 205 input_bytes=total.input_bytes, 206 output_bytes=total.output_bytes, 207 cumulative_time_ns=total.cumulative_time_ns, 208 total_token_num=total.input_chars, # Assuming total_token_num is equivalent to input_chars 209 ) 210 211 layers_info = { 212 stat.name: FilterStatistics( 213 name=stat.name, # type: ignore 214 discard_num=stat.discard_num, 215 diff_bytes=stat.diff_bytes, 216 cumulative_time_ns=stat.cumulative_time_ns, 217 params={}, 218 ) 219 for stat in stats 220 if stat.name != "Total" 221 } 222 223 return StatsContainer( 224 total_info=total_info, 225 layers_info=layers_info, # type: ignore 226 )
16class Inspector(Filter): 17 def __init__( 18 self, target_filter: Union[Filter, TokenFilter], filter_idx: int, *args: Any, **kwargs: Any 19 ) -> None: 20 super().__init__(*args, **kwargs) 21 self.logger = logging.getLogger("hojichar.Inspector") 22 self.target_filter = target_filter 23 self.filter_idx = filter_idx 24 self.target = f"{filter_idx}-{target_filter.name}" 25 26 self.is_rejected = False 27 self.text_hash = 0 28 self.tokens_hash = 0 29 30 def apply(self, document: Document) -> Document: 31 self.inspect(document) 32 return document 33 34 def inspect(self, document: Document) -> None: 35 self.is_rejected = False 36 self.is_rejected = document.is_rejected 37 self.bytes = len(document.text.encode("utf-8")) 38 self.time_ns = time.perf_counter_ns()
Base class for all filters. Document-level filters must inherit from this class.
The definition of text processing is in apply
method.
If you define a new filter, override the method.
When this class is called, apply the filter from string to string.
With context manager, you can use the filter as follows:
with YourFilter(p=0.5) as filt:
text = filt("This is a sample text.")
17 def __init__( 18 self, target_filter: Union[Filter, TokenFilter], filter_idx: int, *args: Any, **kwargs: Any 19 ) -> None: 20 super().__init__(*args, **kwargs) 21 self.logger = logging.getLogger("hojichar.Inspector") 22 self.target_filter = target_filter 23 self.filter_idx = filter_idx 24 self.target = f"{filter_idx}-{target_filter.name}" 25 26 self.is_rejected = False 27 self.text_hash = 0 28 self.tokens_hash = 0
Initialize the filter.
Parameters
p : float
The probability of applying the filter.
If p
is 1, the filter will always be applied.
skip_rejected : bool
If True
, the filter will skip documents that are already rejected.
If you want to apply the filter to all documents (e.g., postprocess), set this to False
.
random_state : Optional[Union[int, np.random.Generator]]
Seed for the random number generator.
If None
, a new random number generator will be created.
If None
, and use in the Compose
class, the random state is shared with the Compose
object.
use_batch : bool
If True
, the filter will process documents in batches in the apply_stream
method.
batch_size : int
The size of the batch to process documents in the apply_stream
method.
kwargs : Any
Additional keyword arguments to pass to the filter.
Definition of filter behavior.
The document must have a protocol TextContent
,
and mostly used hojichar.Document class.
In this method, the filter will modify document.text
or
document.extras
and set document.is_rejected = True
to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
41@dataclasses.dataclass 42class FilterStatistics: 43 name: str 44 discard_num: int = 0 45 diff_bytes: int = 0 46 cumulative_time_ns: int = 0 47 params: Dict[str, Any] = dataclasses.field(default_factory=dict) 48 49 def get_human_readable_values(self) -> dict: 50 ret = { 51 "name": self.name, 52 "discard_num": self.discard_num, 53 "diff_MB": (self.diff_bytes / 1048576), # 1024**2 54 "cumulative_time": (self.cumulative_time_ns / 10**9), 55 "params": self.params, 56 } 57 return ret 58 59 def __add__(self, other: FilterStatistics) -> FilterStatistics: 60 assert self.name == other.name, "Layer names must match" 61 return FilterStatistics( 62 self.name, 63 self.discard_num + other.discard_num, 64 self.diff_bytes + other.diff_bytes, 65 self.cumulative_time_ns + other.cumulative_time_ns, 66 self.params, 67 ) 68 69 def reset(self) -> FilterStatistics: 70 self.discard_num = 0 71 self.diff_bytes = 0 72 self.cumulative_time_ns = 0 73 return self
76@dataclasses.dataclass 77class DocStatistics: 78 processed_num: int = 0 79 discard_num: int = 0 80 input_bytes: int = 0 81 output_bytes: int = 0 82 cumulative_time_ns: int = 0 83 total_token_num: int = 0 84 85 def get_human_readable_values(self) -> dict: 86 ret = { 87 "processed_num": self.processed_num, 88 "discard_num": self.discard_num, 89 "input_MB": (self.input_bytes / 1000**2), 90 "output_MB": (self.output_bytes / 1000**2), 91 "cumulative_time": (self.cumulative_time_ns / 10**9), 92 "total_token_num": self.total_token_num, 93 } 94 return ret 95 96 def __add__(self, other: DocStatistics) -> DocStatistics: 97 return DocStatistics( 98 self.processed_num + other.processed_num, 99 self.discard_num + other.discard_num, 100 self.input_bytes + other.input_bytes, 101 self.output_bytes + other.output_bytes, 102 self.cumulative_time_ns + other.cumulative_time_ns, 103 self.total_token_num + other.total_token_num, 104 ) 105 106 def reset(self) -> DocStatistics: 107 self.processed_num = 0 108 self.discard_num = 0 109 self.input_bytes = 0 110 self.output_bytes = 0 111 self.cumulative_time_ns = 0 112 self.total_token_num = 0 113 return self
85 def get_human_readable_values(self) -> dict: 86 ret = { 87 "processed_num": self.processed_num, 88 "discard_num": self.discard_num, 89 "input_MB": (self.input_bytes / 1000**2), 90 "output_MB": (self.output_bytes / 1000**2), 91 "cumulative_time": (self.cumulative_time_ns / 10**9), 92 "total_token_num": self.total_token_num, 93 } 94 return ret
116@dataclasses.dataclass 117class StatsContainer: 118 total_info: DocStatistics 119 layers_info: Dict[str, FilterStatistics] # Key of the dict is filter name. 120 121 def __add__(self, other: StatsContainer) -> StatsContainer: 122 assert self.layers_info.keys() == other.layers_info.keys(), "Layer names must match" 123 return StatsContainer( 124 self.total_info + other.total_info, 125 {k: v + other.layers_info[k] for k, v in self.layers_info.items()}, 126 ) 127 128 def get_human_readable_values(self) -> dict: 129 return { 130 "total_info": self.total_info.get_human_readable_values(), 131 "layers_info": [ 132 layer.get_human_readable_values() for layer in self.layers_info.values() 133 ], 134 } 135 136 def reset(self) -> StatsContainer: 137 self.total_info.reset 138 for layer in self.layers_info.values(): 139 layer.reset() 140 return self
143class StatisticsCounter: 144 def __init__(self, inspectors: List[Inspector]) -> None: 145 counts = dict() 146 for inspector in inspectors: 147 counts[inspector.target] = FilterStatistics( 148 name=inspector.target, 149 params=inspector.target_filter.get_jsonable_vars(), 150 ) 151 self.stats = StatsContainer( 152 DocStatistics(), 153 counts, 154 ) 155 156 def update_changes( 157 self, 158 document: Document, 159 before_process_inspector: Inspector, 160 inspectors: List[Inspector], 161 ) -> None: 162 # Counting statistics for each filter 163 previous_inspector = before_process_inspector 164 for idx, inspector in enumerate(inspectors): 165 # Logging how many docs are discarded in each filter 166 if (not previous_inspector.is_rejected) and inspector.is_rejected: 167 self.stats.layers_info[inspector.target].discard_num += 1 168 169 # logging how much volume of docs are changed in each filter. 170 if (not previous_inspector.is_rejected) and inspector.is_rejected: 171 diff_bytes = -inspector.bytes 172 elif previous_inspector.is_rejected and inspector.is_rejected: 173 diff_bytes = 0 174 else: 175 diff_bytes = inspector.bytes - previous_inspector.bytes 176 177 self.stats.layers_info[inspector.target].diff_bytes += diff_bytes 178 179 process_time_ns = inspector.time_ns - previous_inspector.time_ns 180 self.stats.layers_info[inspector.target].cumulative_time_ns += process_time_ns 181 182 previous_inspector = inspector 183 184 # Counting total statistics 185 self.stats.total_info.processed_num += 1 186 self.stats.total_info.discard_num += ( 187 1 if any([inspector.is_rejected for inspector in inspectors]) > 0 else 0 188 ) 189 self.stats.total_info.input_bytes += len(document.original.encode("utf-8")) 190 self.stats.total_info.output_bytes += ( 191 0 if document.is_rejected else len(document.text.encode("utf-8")) 192 ) 193 self.stats.total_info.cumulative_time_ns += inspectors[-1].time_ns - inspectors[0].time_ns 194 self.stats.total_info.total_token_num += len(document.tokens) 195 196 def get_statistics(self) -> dict: 197 return self.stats.get_human_readable_values()
144 def __init__(self, inspectors: List[Inspector]) -> None: 145 counts = dict() 146 for inspector in inspectors: 147 counts[inspector.target] = FilterStatistics( 148 name=inspector.target, 149 params=inspector.target_filter.get_jsonable_vars(), 150 ) 151 self.stats = StatsContainer( 152 DocStatistics(), 153 counts, 154 )
156 def update_changes( 157 self, 158 document: Document, 159 before_process_inspector: Inspector, 160 inspectors: List[Inspector], 161 ) -> None: 162 # Counting statistics for each filter 163 previous_inspector = before_process_inspector 164 for idx, inspector in enumerate(inspectors): 165 # Logging how many docs are discarded in each filter 166 if (not previous_inspector.is_rejected) and inspector.is_rejected: 167 self.stats.layers_info[inspector.target].discard_num += 1 168 169 # logging how much volume of docs are changed in each filter. 170 if (not previous_inspector.is_rejected) and inspector.is_rejected: 171 diff_bytes = -inspector.bytes 172 elif previous_inspector.is_rejected and inspector.is_rejected: 173 diff_bytes = 0 174 else: 175 diff_bytes = inspector.bytes - previous_inspector.bytes 176 177 self.stats.layers_info[inspector.target].diff_bytes += diff_bytes 178 179 process_time_ns = inspector.time_ns - previous_inspector.time_ns 180 self.stats.layers_info[inspector.target].cumulative_time_ns += process_time_ns 181 182 previous_inspector = inspector 183 184 # Counting total statistics 185 self.stats.total_info.processed_num += 1 186 self.stats.total_info.discard_num += ( 187 1 if any([inspector.is_rejected for inspector in inspectors]) > 0 else 0 188 ) 189 self.stats.total_info.input_bytes += len(document.original.encode("utf-8")) 190 self.stats.total_info.output_bytes += ( 191 0 if document.is_rejected else len(document.text.encode("utf-8")) 192 ) 193 self.stats.total_info.cumulative_time_ns += inspectors[-1].time_ns - inspectors[0].time_ns 194 self.stats.total_info.total_token_num += len(document.tokens)
200@deprecated_since("1.0.0", "Compose.get_total_statistics") 201def statistics_obj_adapter(stats: List["Statistics"]) -> StatsContainer: 202 total = Statistics.get_filter("Total", stats) 203 total_info = DocStatistics( 204 processed_num=total.input_num, 205 discard_num=total.discard_num, 206 input_bytes=total.input_bytes, 207 output_bytes=total.output_bytes, 208 cumulative_time_ns=total.cumulative_time_ns, 209 total_token_num=total.input_chars, # Assuming total_token_num is equivalent to input_chars 210 ) 211 212 layers_info = { 213 stat.name: FilterStatistics( 214 name=stat.name, # type: ignore 215 discard_num=stat.discard_num, 216 diff_bytes=stat.diff_bytes, 217 cumulative_time_ns=stat.cumulative_time_ns, 218 params={}, 219 ) 220 for stat in stats 221 if stat.name != "Total" 222 } 223 224 return StatsContainer( 225 total_info=total_info, 226 layers_info=layers_info, # type: ignore 227 )