hojichar.core.inspection

  1from __future__ import annotations
  2
  3import dataclasses
  4import logging
  5import time
  6from typing import Any, Dict, List, Union
  7
  8from hojichar.core.filter_interface import Filter, TokenFilter
  9from hojichar.core.models import Document
 10
 11logger = logging.getLogger(__name__)
 12
 13
 14class Inspector(Filter):
 15    def __init__(
 16        self, target_filter: Union[Filter, TokenFilter], filter_idx: int, *args: Any, **kwargs: Any
 17    ) -> None:
 18        super().__init__(*args, **kwargs)
 19        self.logger = logging.getLogger("hojichar.Inspector")
 20        self.target_filter = target_filter
 21        self.filter_idx = filter_idx
 22        self.target = f"{filter_idx}-{target_filter.name}"
 23
 24        self.is_rejected = False
 25        self.text_hash = 0
 26        self.tokens_hash = 0
 27
 28    def apply(self, document: Document) -> Document:
 29        self.inspect(document)
 30        return document
 31
 32    def inspect(self, document: Document) -> None:
 33        self.is_rejected = False
 34        self.is_rejected = document.is_rejected
 35        self.bytes = len(document.text.encode("utf-8"))
 36        self.time_ns = time.perf_counter_ns()
 37
 38
 39@dataclasses.dataclass
 40class FilterStatistics:
 41    name: str
 42    discard_num: int = 0
 43    diff_bytes: int = 0
 44    cumulative_time_ns: int = 0
 45    params: Dict[str, Any] = dataclasses.field(default_factory=dict)
 46
 47    def get_human_readable_values(self) -> dict:
 48        ret = {
 49            "name": self.name,
 50            "discard_num": self.discard_num,
 51            "diff_MB": (self.diff_bytes / 1048576),  # 1024**2
 52            "cumulative_time": (self.cumulative_time_ns / 10**9),
 53            "params": self.params,
 54        }
 55        return ret
 56
 57    def __add__(self, other: FilterStatistics) -> FilterStatistics:
 58        assert self.name == other.name, "Layer names must match"
 59        return FilterStatistics(
 60            self.name,
 61            self.discard_num + other.discard_num,
 62            self.diff_bytes + other.diff_bytes,
 63            self.cumulative_time_ns + other.cumulative_time_ns,
 64            self.params,
 65        )
 66
 67    def reset(self) -> FilterStatistics:
 68        self.discard_num = 0
 69        self.diff_bytes = 0
 70        self.cumulative_time_ns = 0
 71        return self
 72
 73
 74@dataclasses.dataclass
 75class DocStatistics:
 76    processed_num: int = 0
 77    discard_num: int = 0
 78    input_bytes: int = 0
 79    output_bytes: int = 0
 80    cumulative_time_ns: int = 0
 81    total_token_num: int = 0
 82
 83    def get_human_readable_values(self) -> dict:
 84        ret = {
 85            "processed_num": self.processed_num,
 86            "discard_num": self.discard_num,
 87            "input_MB": (self.input_bytes / 1000**2),
 88            "output_MB": (self.output_bytes / 1000**2),
 89            "cumulative_time": (self.cumulative_time_ns / 10**9),
 90            "total_token_num": self.total_token_num,
 91        }
 92        return ret
 93
 94    def __add__(self, other: DocStatistics) -> DocStatistics:
 95        return DocStatistics(
 96            self.processed_num + other.processed_num,
 97            self.discard_num + other.discard_num,
 98            self.input_bytes + other.input_bytes,
 99            self.output_bytes + other.output_bytes,
100            self.cumulative_time_ns + other.cumulative_time_ns,
101            self.total_token_num + other.total_token_num,
102        )
103
104    def reset(self) -> DocStatistics:
105        self.processed_num = 0
106        self.discard_num = 0
107        self.input_bytes = 0
108        self.output_bytes = 0
109        self.cumulative_time_ns = 0
110        self.total_token_num = 0
111        return self
112
113
114@dataclasses.dataclass
115class StatsContainer:
116    total_info: DocStatistics
117    layers_info: Dict[str, FilterStatistics]  # Key of the dict is filter name.
118
119    def __add__(self, other: StatsContainer) -> StatsContainer:
120        assert self.layers_info.keys() == other.layers_info.keys(), "Layer names must match"
121        return StatsContainer(
122            self.total_info + other.total_info,
123            {k: v + other.layers_info[k] for k, v in self.layers_info.items()},
124        )
125
126    def get_human_readable_values(self) -> dict:
127        return {
128            "total_info": self.total_info.get_human_readable_values(),
129            "layers_info": [
130                layer.get_human_readable_values() for layer in self.layers_info.values()
131            ],
132        }
133
134    def reset(self) -> StatsContainer:
135        self.total_info.reset
136        for layer in self.layers_info.values():
137            layer.reset()
138        return self
139
140
141class StatisticsCounter:
142    def __init__(self, inspectors: List[Inspector]) -> None:
143        counts = dict()
144        for inspector in inspectors:
145            counts[inspector.target] = FilterStatistics(
146                name=inspector.target,
147                params=inspector.target_filter.get_jsonalbe_vars(),
148            )
149        self.stats = StatsContainer(
150            DocStatistics(),
151            counts,
152        )
153
154    def update_changes(
155        self,
156        document: Document,
157        before_process_inspector: Inspector,
158        inspectors: List[Inspector],
159    ) -> None:
160
161        # Counting statistics for each filter
162        previous_inspector = before_process_inspector
163        for idx, inspector in enumerate(inspectors):
164            # Logging how many docs are discarded in each filter
165            if (not previous_inspector.is_rejected) and inspector.is_rejected:
166                self.stats.layers_info[inspector.target].discard_num += 1
167
168            # logging how much volume of docs are changed in each filter.
169            if (not previous_inspector.is_rejected) and inspector.is_rejected:
170                diff_bytes = -inspector.bytes
171            elif previous_inspector.is_rejected and inspector.is_rejected:
172                diff_bytes = 0
173            else:
174                diff_bytes = inspector.bytes - previous_inspector.bytes
175
176            self.stats.layers_info[inspector.target].diff_bytes += diff_bytes
177
178            process_time_ns = inspector.time_ns - previous_inspector.time_ns
179            self.stats.layers_info[inspector.target].cumulative_time_ns += process_time_ns
180
181            previous_inspector = inspector
182
183        # Counting total statistics
184        self.stats.total_info.processed_num += 1
185        self.stats.total_info.discard_num += (
186            1 if any([inspector.is_rejected for inspector in inspectors]) > 0 else 0
187        )
188        self.stats.total_info.input_bytes += len(document.original.encode("utf-8"))
189        self.stats.total_info.output_bytes += (
190            0 if document.is_rejected else len(document.text.encode("utf-8"))
191        )
192        self.stats.total_info.cumulative_time_ns += inspectors[-1].time_ns - inspectors[0].time_ns
193        self.stats.total_info.total_token_num += len(document.tokens)
194
195    def get_statistics(self) -> dict:
196        return self.stats.get_human_readable_values()
class Inspector(hojichar.core.filter_interface.Filter):
15class Inspector(Filter):
16    def __init__(
17        self, target_filter: Union[Filter, TokenFilter], filter_idx: int, *args: Any, **kwargs: Any
18    ) -> None:
19        super().__init__(*args, **kwargs)
20        self.logger = logging.getLogger("hojichar.Inspector")
21        self.target_filter = target_filter
22        self.filter_idx = filter_idx
23        self.target = f"{filter_idx}-{target_filter.name}"
24
25        self.is_rejected = False
26        self.text_hash = 0
27        self.tokens_hash = 0
28
29    def apply(self, document: Document) -> Document:
30        self.inspect(document)
31        return document
32
33    def inspect(self, document: Document) -> None:
34        self.is_rejected = False
35        self.is_rejected = document.is_rejected
36        self.bytes = len(document.text.encode("utf-8"))
37        self.time_ns = time.perf_counter_ns()

Base class for all filters. Document-level filters must inherit from this class.

The definition of filter function is in apply method. If you define a new filter, you must define the method. When this class is called, apply the filter from string to string.

If the filter create Document.tokens form Document.text, you must implement tokenize method. If the filter update Document.text by merging Document.tokens, you must implement merge method. Do not define a filter that changes both Document.text and Document.token to prevent unexpected behavior.

If you apply the filter to tokens, you can use TokenFilter class.

Parameters

p: float The probability apply the filter organized by hojichar.Compose skip_reject: bool If set True, hojichar.Compose make this filter ignore the document which has is_rejected flag. This flag is True by default since processing discarded documents in subsequent filters is meaningless. However, in some cases, docs that have been rejected need another filter. For example, analyzing false-positive, discarded docs must be passed to JSON Dump filters. In such case, set the skip_reject flag as False and make it pass all docs.

Inspector( target_filter: Union[hojichar.core.filter_interface.Filter, hojichar.core.filter_interface.TokenFilter], filter_idx: int, *args: Any, **kwargs: Any)
16    def __init__(
17        self, target_filter: Union[Filter, TokenFilter], filter_idx: int, *args: Any, **kwargs: Any
18    ) -> None:
19        super().__init__(*args, **kwargs)
20        self.logger = logging.getLogger("hojichar.Inspector")
21        self.target_filter = target_filter
22        self.filter_idx = filter_idx
23        self.target = f"{filter_idx}-{target_filter.name}"
24
25        self.is_rejected = False
26        self.text_hash = 0
27        self.tokens_hash = 0

Parameters

p : float, optional Probability that this filter will be applied. Default=1

def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
29    def apply(self, document: Document) -> Document:
30        self.inspect(document)
31        return document

Definition of filter behavior.

In this method, the filter will modify document.text, or set document.is_rejected = True to discard the document.

Do not define a filter that changes both document.text and document.token

Parameters

document : Document Input document

Returns

Document Processed Document

def inspect(self, document: hojichar.core.models.Document) -> None:
33    def inspect(self, document: Document) -> None:
34        self.is_rejected = False
35        self.is_rejected = document.is_rejected
36        self.bytes = len(document.text.encode("utf-8"))
37        self.time_ns = time.perf_counter_ns()
@dataclasses.dataclass
class FilterStatistics:
40@dataclasses.dataclass
41class FilterStatistics:
42    name: str
43    discard_num: int = 0
44    diff_bytes: int = 0
45    cumulative_time_ns: int = 0
46    params: Dict[str, Any] = dataclasses.field(default_factory=dict)
47
48    def get_human_readable_values(self) -> dict:
49        ret = {
50            "name": self.name,
51            "discard_num": self.discard_num,
52            "diff_MB": (self.diff_bytes / 1048576),  # 1024**2
53            "cumulative_time": (self.cumulative_time_ns / 10**9),
54            "params": self.params,
55        }
56        return ret
57
58    def __add__(self, other: FilterStatistics) -> FilterStatistics:
59        assert self.name == other.name, "Layer names must match"
60        return FilterStatistics(
61            self.name,
62            self.discard_num + other.discard_num,
63            self.diff_bytes + other.diff_bytes,
64            self.cumulative_time_ns + other.cumulative_time_ns,
65            self.params,
66        )
67
68    def reset(self) -> FilterStatistics:
69        self.discard_num = 0
70        self.diff_bytes = 0
71        self.cumulative_time_ns = 0
72        return self
FilterStatistics( name: str, discard_num: int = 0, diff_bytes: int = 0, cumulative_time_ns: int = 0, params: Dict[str, Any] = <factory>)
def get_human_readable_values(self) -> dict:
48    def get_human_readable_values(self) -> dict:
49        ret = {
50            "name": self.name,
51            "discard_num": self.discard_num,
52            "diff_MB": (self.diff_bytes / 1048576),  # 1024**2
53            "cumulative_time": (self.cumulative_time_ns / 10**9),
54            "params": self.params,
55        }
56        return ret
def reset(self) -> hojichar.core.inspection.FilterStatistics:
68    def reset(self) -> FilterStatistics:
69        self.discard_num = 0
70        self.diff_bytes = 0
71        self.cumulative_time_ns = 0
72        return self
@dataclasses.dataclass
class DocStatistics:
 75@dataclasses.dataclass
 76class DocStatistics:
 77    processed_num: int = 0
 78    discard_num: int = 0
 79    input_bytes: int = 0
 80    output_bytes: int = 0
 81    cumulative_time_ns: int = 0
 82    total_token_num: int = 0
 83
 84    def get_human_readable_values(self) -> dict:
 85        ret = {
 86            "processed_num": self.processed_num,
 87            "discard_num": self.discard_num,
 88            "input_MB": (self.input_bytes / 1000**2),
 89            "output_MB": (self.output_bytes / 1000**2),
 90            "cumulative_time": (self.cumulative_time_ns / 10**9),
 91            "total_token_num": self.total_token_num,
 92        }
 93        return ret
 94
 95    def __add__(self, other: DocStatistics) -> DocStatistics:
 96        return DocStatistics(
 97            self.processed_num + other.processed_num,
 98            self.discard_num + other.discard_num,
 99            self.input_bytes + other.input_bytes,
100            self.output_bytes + other.output_bytes,
101            self.cumulative_time_ns + other.cumulative_time_ns,
102            self.total_token_num + other.total_token_num,
103        )
104
105    def reset(self) -> DocStatistics:
106        self.processed_num = 0
107        self.discard_num = 0
108        self.input_bytes = 0
109        self.output_bytes = 0
110        self.cumulative_time_ns = 0
111        self.total_token_num = 0
112        return self
DocStatistics( processed_num: int = 0, discard_num: int = 0, input_bytes: int = 0, output_bytes: int = 0, cumulative_time_ns: int = 0, total_token_num: int = 0)
def get_human_readable_values(self) -> dict:
84    def get_human_readable_values(self) -> dict:
85        ret = {
86            "processed_num": self.processed_num,
87            "discard_num": self.discard_num,
88            "input_MB": (self.input_bytes / 1000**2),
89            "output_MB": (self.output_bytes / 1000**2),
90            "cumulative_time": (self.cumulative_time_ns / 10**9),
91            "total_token_num": self.total_token_num,
92        }
93        return ret
def reset(self) -> hojichar.core.inspection.DocStatistics:
105    def reset(self) -> DocStatistics:
106        self.processed_num = 0
107        self.discard_num = 0
108        self.input_bytes = 0
109        self.output_bytes = 0
110        self.cumulative_time_ns = 0
111        self.total_token_num = 0
112        return self
@dataclasses.dataclass
class StatsContainer:
115@dataclasses.dataclass
116class StatsContainer:
117    total_info: DocStatistics
118    layers_info: Dict[str, FilterStatistics]  # Key of the dict is filter name.
119
120    def __add__(self, other: StatsContainer) -> StatsContainer:
121        assert self.layers_info.keys() == other.layers_info.keys(), "Layer names must match"
122        return StatsContainer(
123            self.total_info + other.total_info,
124            {k: v + other.layers_info[k] for k, v in self.layers_info.items()},
125        )
126
127    def get_human_readable_values(self) -> dict:
128        return {
129            "total_info": self.total_info.get_human_readable_values(),
130            "layers_info": [
131                layer.get_human_readable_values() for layer in self.layers_info.values()
132            ],
133        }
134
135    def reset(self) -> StatsContainer:
136        self.total_info.reset
137        for layer in self.layers_info.values():
138            layer.reset()
139        return self
StatsContainer( total_info: hojichar.core.inspection.DocStatistics, layers_info: Dict[str, hojichar.core.inspection.FilterStatistics])
def get_human_readable_values(self) -> dict:
127    def get_human_readable_values(self) -> dict:
128        return {
129            "total_info": self.total_info.get_human_readable_values(),
130            "layers_info": [
131                layer.get_human_readable_values() for layer in self.layers_info.values()
132            ],
133        }
def reset(self) -> hojichar.core.inspection.StatsContainer:
135    def reset(self) -> StatsContainer:
136        self.total_info.reset
137        for layer in self.layers_info.values():
138            layer.reset()
139        return self
class StatisticsCounter:
142class StatisticsCounter:
143    def __init__(self, inspectors: List[Inspector]) -> None:
144        counts = dict()
145        for inspector in inspectors:
146            counts[inspector.target] = FilterStatistics(
147                name=inspector.target,
148                params=inspector.target_filter.get_jsonalbe_vars(),
149            )
150        self.stats = StatsContainer(
151            DocStatistics(),
152            counts,
153        )
154
155    def update_changes(
156        self,
157        document: Document,
158        before_process_inspector: Inspector,
159        inspectors: List[Inspector],
160    ) -> None:
161
162        # Counting statistics for each filter
163        previous_inspector = before_process_inspector
164        for idx, inspector in enumerate(inspectors):
165            # Logging how many docs are discarded in each filter
166            if (not previous_inspector.is_rejected) and inspector.is_rejected:
167                self.stats.layers_info[inspector.target].discard_num += 1
168
169            # logging how much volume of docs are changed in each filter.
170            if (not previous_inspector.is_rejected) and inspector.is_rejected:
171                diff_bytes = -inspector.bytes
172            elif previous_inspector.is_rejected and inspector.is_rejected:
173                diff_bytes = 0
174            else:
175                diff_bytes = inspector.bytes - previous_inspector.bytes
176
177            self.stats.layers_info[inspector.target].diff_bytes += diff_bytes
178
179            process_time_ns = inspector.time_ns - previous_inspector.time_ns
180            self.stats.layers_info[inspector.target].cumulative_time_ns += process_time_ns
181
182            previous_inspector = inspector
183
184        # Counting total statistics
185        self.stats.total_info.processed_num += 1
186        self.stats.total_info.discard_num += (
187            1 if any([inspector.is_rejected for inspector in inspectors]) > 0 else 0
188        )
189        self.stats.total_info.input_bytes += len(document.original.encode("utf-8"))
190        self.stats.total_info.output_bytes += (
191            0 if document.is_rejected else len(document.text.encode("utf-8"))
192        )
193        self.stats.total_info.cumulative_time_ns += inspectors[-1].time_ns - inspectors[0].time_ns
194        self.stats.total_info.total_token_num += len(document.tokens)
195
196    def get_statistics(self) -> dict:
197        return self.stats.get_human_readable_values()
StatisticsCounter(inspectors: List[hojichar.core.inspection.Inspector])
143    def __init__(self, inspectors: List[Inspector]) -> None:
144        counts = dict()
145        for inspector in inspectors:
146            counts[inspector.target] = FilterStatistics(
147                name=inspector.target,
148                params=inspector.target_filter.get_jsonalbe_vars(),
149            )
150        self.stats = StatsContainer(
151            DocStatistics(),
152            counts,
153        )
def update_changes( self, document: hojichar.core.models.Document, before_process_inspector: hojichar.core.inspection.Inspector, inspectors: List[hojichar.core.inspection.Inspector]) -> None:
155    def update_changes(
156        self,
157        document: Document,
158        before_process_inspector: Inspector,
159        inspectors: List[Inspector],
160    ) -> None:
161
162        # Counting statistics for each filter
163        previous_inspector = before_process_inspector
164        for idx, inspector in enumerate(inspectors):
165            # Logging how many docs are discarded in each filter
166            if (not previous_inspector.is_rejected) and inspector.is_rejected:
167                self.stats.layers_info[inspector.target].discard_num += 1
168
169            # logging how much volume of docs are changed in each filter.
170            if (not previous_inspector.is_rejected) and inspector.is_rejected:
171                diff_bytes = -inspector.bytes
172            elif previous_inspector.is_rejected and inspector.is_rejected:
173                diff_bytes = 0
174            else:
175                diff_bytes = inspector.bytes - previous_inspector.bytes
176
177            self.stats.layers_info[inspector.target].diff_bytes += diff_bytes
178
179            process_time_ns = inspector.time_ns - previous_inspector.time_ns
180            self.stats.layers_info[inspector.target].cumulative_time_ns += process_time_ns
181
182            previous_inspector = inspector
183
184        # Counting total statistics
185        self.stats.total_info.processed_num += 1
186        self.stats.total_info.discard_num += (
187            1 if any([inspector.is_rejected for inspector in inspectors]) > 0 else 0
188        )
189        self.stats.total_info.input_bytes += len(document.original.encode("utf-8"))
190        self.stats.total_info.output_bytes += (
191            0 if document.is_rejected else len(document.text.encode("utf-8"))
192        )
193        self.stats.total_info.cumulative_time_ns += inspectors[-1].time_ns - inspectors[0].time_ns
194        self.stats.total_info.total_token_num += len(document.tokens)
def get_statistics(self) -> dict:
196    def get_statistics(self) -> dict:
197        return self.stats.get_human_readable_values()