hojichar.core.inspection
1from __future__ import annotations 2 3import dataclasses 4import logging 5import time 6from typing import Any, Dict, List, Union 7 8from hojichar.core.filter_interface import Filter, TokenFilter 9from hojichar.core.models import Document 10 11logger = logging.getLogger(__name__) 12 13 14class Inspector(Filter): 15 def __init__( 16 self, target_filter: Union[Filter, TokenFilter], filter_idx: int, *args: Any, **kwargs: Any 17 ) -> None: 18 super().__init__(*args, **kwargs) 19 self.logger = logging.getLogger("hojichar.Inspector") 20 self.target_filter = target_filter 21 self.filter_idx = filter_idx 22 self.target = f"{filter_idx}-{target_filter.name}" 23 24 self.is_rejected = False 25 self.text_hash = 0 26 self.tokens_hash = 0 27 28 def apply(self, document: Document) -> Document: 29 self.inspect(document) 30 return document 31 32 def inspect(self, document: Document) -> None: 33 self.is_rejected = False 34 self.is_rejected = document.is_rejected 35 self.bytes = len(document.text.encode("utf-8")) 36 self.time_ns = time.perf_counter_ns() 37 38 39@dataclasses.dataclass 40class FilterStatistics: 41 name: str 42 discard_num: int = 0 43 diff_bytes: int = 0 44 cumulative_time_ns: int = 0 45 params: Dict[str, Any] = dataclasses.field(default_factory=dict) 46 47 def get_human_readable_values(self) -> dict: 48 ret = { 49 "name": self.name, 50 "discard_num": self.discard_num, 51 "diff_MB": (self.diff_bytes / 1048576), # 1024**2 52 "cumulative_time": (self.cumulative_time_ns / 10**9), 53 "params": self.params, 54 } 55 return ret 56 57 def __add__(self, other: FilterStatistics) -> FilterStatistics: 58 assert self.name == other.name, "Layer names must match" 59 return FilterStatistics( 60 self.name, 61 self.discard_num + other.discard_num, 62 self.diff_bytes + other.diff_bytes, 63 self.cumulative_time_ns + other.cumulative_time_ns, 64 self.params, 65 ) 66 67 def reset(self) -> FilterStatistics: 68 self.discard_num = 0 69 self.diff_bytes = 0 70 self.cumulative_time_ns = 0 71 return self 72 73 74@dataclasses.dataclass 75class DocStatistics: 76 processed_num: int = 0 77 discard_num: int = 0 78 input_bytes: int = 0 79 output_bytes: int = 0 80 cumulative_time_ns: int = 0 81 total_token_num: int = 0 82 83 def get_human_readable_values(self) -> dict: 84 ret = { 85 "processed_num": self.processed_num, 86 "discard_num": self.discard_num, 87 "input_MB": (self.input_bytes / 1000**2), 88 "output_MB": (self.output_bytes / 1000**2), 89 "cumulative_time": (self.cumulative_time_ns / 10**9), 90 "total_token_num": self.total_token_num, 91 } 92 return ret 93 94 def __add__(self, other: DocStatistics) -> DocStatistics: 95 return DocStatistics( 96 self.processed_num + other.processed_num, 97 self.discard_num + other.discard_num, 98 self.input_bytes + other.input_bytes, 99 self.output_bytes + other.output_bytes, 100 self.cumulative_time_ns + other.cumulative_time_ns, 101 self.total_token_num + other.total_token_num, 102 ) 103 104 def reset(self) -> DocStatistics: 105 self.processed_num = 0 106 self.discard_num = 0 107 self.input_bytes = 0 108 self.output_bytes = 0 109 self.cumulative_time_ns = 0 110 self.total_token_num = 0 111 return self 112 113 114@dataclasses.dataclass 115class StatsContainer: 116 total_info: DocStatistics 117 layers_info: Dict[str, FilterStatistics] # Key of the dict is filter name. 118 119 def __add__(self, other: StatsContainer) -> StatsContainer: 120 assert self.layers_info.keys() == other.layers_info.keys(), "Layer names must match" 121 return StatsContainer( 122 self.total_info + other.total_info, 123 {k: v + other.layers_info[k] for k, v in self.layers_info.items()}, 124 ) 125 126 def get_human_readable_values(self) -> dict: 127 return { 128 "total_info": self.total_info.get_human_readable_values(), 129 "layers_info": [ 130 layer.get_human_readable_values() for layer in self.layers_info.values() 131 ], 132 } 133 134 def reset(self) -> StatsContainer: 135 self.total_info.reset 136 for layer in self.layers_info.values(): 137 layer.reset() 138 return self 139 140 141class StatisticsCounter: 142 def __init__(self, inspectors: List[Inspector]) -> None: 143 counts = dict() 144 for inspector in inspectors: 145 counts[inspector.target] = FilterStatistics( 146 name=inspector.target, 147 params=inspector.target_filter.get_jsonalbe_vars(), 148 ) 149 self.stats = StatsContainer( 150 DocStatistics(), 151 counts, 152 ) 153 154 def update_changes( 155 self, 156 document: Document, 157 before_process_inspector: Inspector, 158 inspectors: List[Inspector], 159 ) -> None: 160 161 # Counting statistics for each filter 162 previous_inspector = before_process_inspector 163 for idx, inspector in enumerate(inspectors): 164 # Logging how many docs are discarded in each filter 165 if (not previous_inspector.is_rejected) and inspector.is_rejected: 166 self.stats.layers_info[inspector.target].discard_num += 1 167 168 # logging how much volume of docs are changed in each filter. 169 if (not previous_inspector.is_rejected) and inspector.is_rejected: 170 diff_bytes = -inspector.bytes 171 elif previous_inspector.is_rejected and inspector.is_rejected: 172 diff_bytes = 0 173 else: 174 diff_bytes = inspector.bytes - previous_inspector.bytes 175 176 self.stats.layers_info[inspector.target].diff_bytes += diff_bytes 177 178 process_time_ns = inspector.time_ns - previous_inspector.time_ns 179 self.stats.layers_info[inspector.target].cumulative_time_ns += process_time_ns 180 181 previous_inspector = inspector 182 183 # Counting total statistics 184 self.stats.total_info.processed_num += 1 185 self.stats.total_info.discard_num += ( 186 1 if any([inspector.is_rejected for inspector in inspectors]) > 0 else 0 187 ) 188 self.stats.total_info.input_bytes += len(document.original.encode("utf-8")) 189 self.stats.total_info.output_bytes += ( 190 0 if document.is_rejected else len(document.text.encode("utf-8")) 191 ) 192 self.stats.total_info.cumulative_time_ns += inspectors[-1].time_ns - inspectors[0].time_ns 193 self.stats.total_info.total_token_num += len(document.tokens) 194 195 def get_statistics(self) -> dict: 196 return self.stats.get_human_readable_values()
15class Inspector(Filter): 16 def __init__( 17 self, target_filter: Union[Filter, TokenFilter], filter_idx: int, *args: Any, **kwargs: Any 18 ) -> None: 19 super().__init__(*args, **kwargs) 20 self.logger = logging.getLogger("hojichar.Inspector") 21 self.target_filter = target_filter 22 self.filter_idx = filter_idx 23 self.target = f"{filter_idx}-{target_filter.name}" 24 25 self.is_rejected = False 26 self.text_hash = 0 27 self.tokens_hash = 0 28 29 def apply(self, document: Document) -> Document: 30 self.inspect(document) 31 return document 32 33 def inspect(self, document: Document) -> None: 34 self.is_rejected = False 35 self.is_rejected = document.is_rejected 36 self.bytes = len(document.text.encode("utf-8")) 37 self.time_ns = time.perf_counter_ns()
Base class for all filters. Document-level filters must inherit from this class.
The definition of filter function is in apply
method.
If you define a new filter, you must define the method.
When this class is called, apply the filter from string to string.
If the filter create Document.tokens
form Document.text
, you
must implement tokenize
method.
If the filter update Document.text
by merging Document.tokens
, you
must implement merge
method.
Do not define a filter that changes both Document.text
and Document.token
to prevent unexpected behavior.
If you apply the filter to tokens, you can use TokenFilter
class.
Parameters
p: float
The probability apply the filter organized by hojichar.Compose
skip_reject: bool
If set True
, hojichar.Compose
make this filter ignore the document
which has is_rejected
flag.
This flag is True
by default since processing discarded documents
in subsequent filters is meaningless. However, in some cases, docs that
have been rejected need another filter. For example, analyzing false-positive,
discarded docs must be passed to JSON Dump filters. In such case,
set the skip_reject
flag as False
and make it pass all docs.
16 def __init__( 17 self, target_filter: Union[Filter, TokenFilter], filter_idx: int, *args: Any, **kwargs: Any 18 ) -> None: 19 super().__init__(*args, **kwargs) 20 self.logger = logging.getLogger("hojichar.Inspector") 21 self.target_filter = target_filter 22 self.filter_idx = filter_idx 23 self.target = f"{filter_idx}-{target_filter.name}" 24 25 self.is_rejected = False 26 self.text_hash = 0 27 self.tokens_hash = 0
Parameters
p : float, optional Probability that this filter will be applied. Default=1
Definition of filter behavior.
In this method, the filter will modify document.text
, or
set document.is_rejected = True
to discard the document.
Do not define a filter that changes both document.text
and document.token
Parameters
document : Document Input document
Returns
Document Processed Document
Inherited Members
40@dataclasses.dataclass 41class FilterStatistics: 42 name: str 43 discard_num: int = 0 44 diff_bytes: int = 0 45 cumulative_time_ns: int = 0 46 params: Dict[str, Any] = dataclasses.field(default_factory=dict) 47 48 def get_human_readable_values(self) -> dict: 49 ret = { 50 "name": self.name, 51 "discard_num": self.discard_num, 52 "diff_MB": (self.diff_bytes / 1048576), # 1024**2 53 "cumulative_time": (self.cumulative_time_ns / 10**9), 54 "params": self.params, 55 } 56 return ret 57 58 def __add__(self, other: FilterStatistics) -> FilterStatistics: 59 assert self.name == other.name, "Layer names must match" 60 return FilterStatistics( 61 self.name, 62 self.discard_num + other.discard_num, 63 self.diff_bytes + other.diff_bytes, 64 self.cumulative_time_ns + other.cumulative_time_ns, 65 self.params, 66 ) 67 68 def reset(self) -> FilterStatistics: 69 self.discard_num = 0 70 self.diff_bytes = 0 71 self.cumulative_time_ns = 0 72 return self
75@dataclasses.dataclass 76class DocStatistics: 77 processed_num: int = 0 78 discard_num: int = 0 79 input_bytes: int = 0 80 output_bytes: int = 0 81 cumulative_time_ns: int = 0 82 total_token_num: int = 0 83 84 def get_human_readable_values(self) -> dict: 85 ret = { 86 "processed_num": self.processed_num, 87 "discard_num": self.discard_num, 88 "input_MB": (self.input_bytes / 1000**2), 89 "output_MB": (self.output_bytes / 1000**2), 90 "cumulative_time": (self.cumulative_time_ns / 10**9), 91 "total_token_num": self.total_token_num, 92 } 93 return ret 94 95 def __add__(self, other: DocStatistics) -> DocStatistics: 96 return DocStatistics( 97 self.processed_num + other.processed_num, 98 self.discard_num + other.discard_num, 99 self.input_bytes + other.input_bytes, 100 self.output_bytes + other.output_bytes, 101 self.cumulative_time_ns + other.cumulative_time_ns, 102 self.total_token_num + other.total_token_num, 103 ) 104 105 def reset(self) -> DocStatistics: 106 self.processed_num = 0 107 self.discard_num = 0 108 self.input_bytes = 0 109 self.output_bytes = 0 110 self.cumulative_time_ns = 0 111 self.total_token_num = 0 112 return self
84 def get_human_readable_values(self) -> dict: 85 ret = { 86 "processed_num": self.processed_num, 87 "discard_num": self.discard_num, 88 "input_MB": (self.input_bytes / 1000**2), 89 "output_MB": (self.output_bytes / 1000**2), 90 "cumulative_time": (self.cumulative_time_ns / 10**9), 91 "total_token_num": self.total_token_num, 92 } 93 return ret
115@dataclasses.dataclass 116class StatsContainer: 117 total_info: DocStatistics 118 layers_info: Dict[str, FilterStatistics] # Key of the dict is filter name. 119 120 def __add__(self, other: StatsContainer) -> StatsContainer: 121 assert self.layers_info.keys() == other.layers_info.keys(), "Layer names must match" 122 return StatsContainer( 123 self.total_info + other.total_info, 124 {k: v + other.layers_info[k] for k, v in self.layers_info.items()}, 125 ) 126 127 def get_human_readable_values(self) -> dict: 128 return { 129 "total_info": self.total_info.get_human_readable_values(), 130 "layers_info": [ 131 layer.get_human_readable_values() for layer in self.layers_info.values() 132 ], 133 } 134 135 def reset(self) -> StatsContainer: 136 self.total_info.reset 137 for layer in self.layers_info.values(): 138 layer.reset() 139 return self
142class StatisticsCounter: 143 def __init__(self, inspectors: List[Inspector]) -> None: 144 counts = dict() 145 for inspector in inspectors: 146 counts[inspector.target] = FilterStatistics( 147 name=inspector.target, 148 params=inspector.target_filter.get_jsonalbe_vars(), 149 ) 150 self.stats = StatsContainer( 151 DocStatistics(), 152 counts, 153 ) 154 155 def update_changes( 156 self, 157 document: Document, 158 before_process_inspector: Inspector, 159 inspectors: List[Inspector], 160 ) -> None: 161 162 # Counting statistics for each filter 163 previous_inspector = before_process_inspector 164 for idx, inspector in enumerate(inspectors): 165 # Logging how many docs are discarded in each filter 166 if (not previous_inspector.is_rejected) and inspector.is_rejected: 167 self.stats.layers_info[inspector.target].discard_num += 1 168 169 # logging how much volume of docs are changed in each filter. 170 if (not previous_inspector.is_rejected) and inspector.is_rejected: 171 diff_bytes = -inspector.bytes 172 elif previous_inspector.is_rejected and inspector.is_rejected: 173 diff_bytes = 0 174 else: 175 diff_bytes = inspector.bytes - previous_inspector.bytes 176 177 self.stats.layers_info[inspector.target].diff_bytes += diff_bytes 178 179 process_time_ns = inspector.time_ns - previous_inspector.time_ns 180 self.stats.layers_info[inspector.target].cumulative_time_ns += process_time_ns 181 182 previous_inspector = inspector 183 184 # Counting total statistics 185 self.stats.total_info.processed_num += 1 186 self.stats.total_info.discard_num += ( 187 1 if any([inspector.is_rejected for inspector in inspectors]) > 0 else 0 188 ) 189 self.stats.total_info.input_bytes += len(document.original.encode("utf-8")) 190 self.stats.total_info.output_bytes += ( 191 0 if document.is_rejected else len(document.text.encode("utf-8")) 192 ) 193 self.stats.total_info.cumulative_time_ns += inspectors[-1].time_ns - inspectors[0].time_ns 194 self.stats.total_info.total_token_num += len(document.tokens) 195 196 def get_statistics(self) -> dict: 197 return self.stats.get_human_readable_values()
143 def __init__(self, inspectors: List[Inspector]) -> None: 144 counts = dict() 145 for inspector in inspectors: 146 counts[inspector.target] = FilterStatistics( 147 name=inspector.target, 148 params=inspector.target_filter.get_jsonalbe_vars(), 149 ) 150 self.stats = StatsContainer( 151 DocStatistics(), 152 counts, 153 )
155 def update_changes( 156 self, 157 document: Document, 158 before_process_inspector: Inspector, 159 inspectors: List[Inspector], 160 ) -> None: 161 162 # Counting statistics for each filter 163 previous_inspector = before_process_inspector 164 for idx, inspector in enumerate(inspectors): 165 # Logging how many docs are discarded in each filter 166 if (not previous_inspector.is_rejected) and inspector.is_rejected: 167 self.stats.layers_info[inspector.target].discard_num += 1 168 169 # logging how much volume of docs are changed in each filter. 170 if (not previous_inspector.is_rejected) and inspector.is_rejected: 171 diff_bytes = -inspector.bytes 172 elif previous_inspector.is_rejected and inspector.is_rejected: 173 diff_bytes = 0 174 else: 175 diff_bytes = inspector.bytes - previous_inspector.bytes 176 177 self.stats.layers_info[inspector.target].diff_bytes += diff_bytes 178 179 process_time_ns = inspector.time_ns - previous_inspector.time_ns 180 self.stats.layers_info[inspector.target].cumulative_time_ns += process_time_ns 181 182 previous_inspector = inspector 183 184 # Counting total statistics 185 self.stats.total_info.processed_num += 1 186 self.stats.total_info.discard_num += ( 187 1 if any([inspector.is_rejected for inspector in inspectors]) > 0 else 0 188 ) 189 self.stats.total_info.input_bytes += len(document.original.encode("utf-8")) 190 self.stats.total_info.output_bytes += ( 191 0 if document.is_rejected else len(document.text.encode("utf-8")) 192 ) 193 self.stats.total_info.cumulative_time_ns += inspectors[-1].time_ns - inspectors[0].time_ns 194 self.stats.total_info.total_token_num += len(document.tokens)