hojichar.core.inspection

  1from __future__ import annotations
  2
  3import dataclasses
  4import logging
  5import time
  6from typing import Any, Dict, List, Union
  7
  8from hojichar.core.filter_interface import Filter, TokenFilter
  9from hojichar.core.models import Document, Statistics
 10from hojichar.utils.warn_deprecation import deprecated_since
 11
 12logger = logging.getLogger(__name__)
 13
 14
 15class Inspector(Filter):
 16    def __init__(
 17        self, target_filter: Union[Filter, TokenFilter], filter_idx: int, *args: Any, **kwargs: Any
 18    ) -> None:
 19        super().__init__(*args, **kwargs)
 20        self.logger = logging.getLogger("hojichar.Inspector")
 21        self.target_filter = target_filter
 22        self.filter_idx = filter_idx
 23        self.target = f"{filter_idx}-{target_filter.name}"
 24
 25        self.is_rejected = False
 26        self.text_hash = 0
 27        self.tokens_hash = 0
 28
 29    def apply(self, document: Document) -> Document:
 30        self.inspect(document)
 31        return document
 32
 33    def inspect(self, document: Document) -> None:
 34        self.is_rejected = False
 35        self.is_rejected = document.is_rejected
 36        self.bytes = len(document.text.encode("utf-8"))
 37        self.time_ns = time.perf_counter_ns()
 38
 39
 40@dataclasses.dataclass
 41class FilterStatistics:
 42    name: str
 43    discard_num: int = 0
 44    diff_bytes: int = 0
 45    cumulative_time_ns: int = 0
 46    params: Dict[str, Any] = dataclasses.field(default_factory=dict)
 47
 48    def get_human_readable_values(self) -> dict:
 49        ret = {
 50            "name": self.name,
 51            "discard_num": self.discard_num,
 52            "diff_MB": (self.diff_bytes / 1048576),  # 1024**2
 53            "cumulative_time": (self.cumulative_time_ns / 10**9),
 54            "params": self.params,
 55        }
 56        return ret
 57
 58    def __add__(self, other: FilterStatistics) -> FilterStatistics:
 59        assert self.name == other.name, "Layer names must match"
 60        return FilterStatistics(
 61            self.name,
 62            self.discard_num + other.discard_num,
 63            self.diff_bytes + other.diff_bytes,
 64            self.cumulative_time_ns + other.cumulative_time_ns,
 65            self.params,
 66        )
 67
 68    def reset(self) -> FilterStatistics:
 69        self.discard_num = 0
 70        self.diff_bytes = 0
 71        self.cumulative_time_ns = 0
 72        return self
 73
 74
 75@dataclasses.dataclass
 76class DocStatistics:
 77    processed_num: int = 0
 78    discard_num: int = 0
 79    input_bytes: int = 0
 80    output_bytes: int = 0
 81    cumulative_time_ns: int = 0
 82    total_token_num: int = 0
 83
 84    def get_human_readable_values(self) -> dict:
 85        ret = {
 86            "processed_num": self.processed_num,
 87            "discard_num": self.discard_num,
 88            "input_MB": (self.input_bytes / 1000**2),
 89            "output_MB": (self.output_bytes / 1000**2),
 90            "cumulative_time": (self.cumulative_time_ns / 10**9),
 91            "total_token_num": self.total_token_num,
 92        }
 93        return ret
 94
 95    def __add__(self, other: DocStatistics) -> DocStatistics:
 96        return DocStatistics(
 97            self.processed_num + other.processed_num,
 98            self.discard_num + other.discard_num,
 99            self.input_bytes + other.input_bytes,
100            self.output_bytes + other.output_bytes,
101            self.cumulative_time_ns + other.cumulative_time_ns,
102            self.total_token_num + other.total_token_num,
103        )
104
105    def reset(self) -> DocStatistics:
106        self.processed_num = 0
107        self.discard_num = 0
108        self.input_bytes = 0
109        self.output_bytes = 0
110        self.cumulative_time_ns = 0
111        self.total_token_num = 0
112        return self
113
114
115@dataclasses.dataclass
116class StatsContainer:
117    total_info: DocStatistics
118    layers_info: Dict[str, FilterStatistics]  # Key of the dict is filter name.
119
120    def __add__(self, other: StatsContainer) -> StatsContainer:
121        assert self.layers_info.keys() == other.layers_info.keys(), "Layer names must match"
122        return StatsContainer(
123            self.total_info + other.total_info,
124            {k: v + other.layers_info[k] for k, v in self.layers_info.items()},
125        )
126
127    def get_human_readable_values(self) -> dict:
128        return {
129            "total_info": self.total_info.get_human_readable_values(),
130            "layers_info": [
131                layer.get_human_readable_values() for layer in self.layers_info.values()
132            ],
133        }
134
135    def reset(self) -> StatsContainer:
136        self.total_info.reset
137        for layer in self.layers_info.values():
138            layer.reset()
139        return self
140
141
142class StatisticsCounter:
143    def __init__(self, inspectors: List[Inspector]) -> None:
144        counts = dict()
145        for inspector in inspectors:
146            counts[inspector.target] = FilterStatistics(
147                name=inspector.target,
148                params=inspector.target_filter.get_jsonable_vars(),
149            )
150        self.stats = StatsContainer(
151            DocStatistics(),
152            counts,
153        )
154
155    def update_changes(
156        self,
157        document: Document,
158        before_process_inspector: Inspector,
159        inspectors: List[Inspector],
160    ) -> None:
161        # Counting statistics for each filter
162        previous_inspector = before_process_inspector
163        for idx, inspector in enumerate(inspectors):
164            # Logging how many docs are discarded in each filter
165            if (not previous_inspector.is_rejected) and inspector.is_rejected:
166                self.stats.layers_info[inspector.target].discard_num += 1
167
168            # logging how much volume of docs are changed in each filter.
169            if (not previous_inspector.is_rejected) and inspector.is_rejected:
170                diff_bytes = -inspector.bytes
171            elif previous_inspector.is_rejected and inspector.is_rejected:
172                diff_bytes = 0
173            else:
174                diff_bytes = inspector.bytes - previous_inspector.bytes
175
176            self.stats.layers_info[inspector.target].diff_bytes += diff_bytes
177
178            process_time_ns = inspector.time_ns - previous_inspector.time_ns
179            self.stats.layers_info[inspector.target].cumulative_time_ns += process_time_ns
180
181            previous_inspector = inspector
182
183        # Counting total statistics
184        self.stats.total_info.processed_num += 1
185        self.stats.total_info.discard_num += (
186            1 if any([inspector.is_rejected for inspector in inspectors]) > 0 else 0
187        )
188        self.stats.total_info.input_bytes += len(document.original.encode("utf-8"))
189        self.stats.total_info.output_bytes += (
190            0 if document.is_rejected else len(document.text.encode("utf-8"))
191        )
192        self.stats.total_info.cumulative_time_ns += inspectors[-1].time_ns - inspectors[0].time_ns
193        self.stats.total_info.total_token_num += len(document.tokens)
194
195    def get_statistics(self) -> dict:
196        return self.stats.get_human_readable_values()
197
198
199@deprecated_since("1.0.0", "Compose.get_total_statistics")
200def statistics_obj_adapter(stats: List["Statistics"]) -> StatsContainer:
201    total = Statistics.get_filter("Total", stats)
202    total_info = DocStatistics(
203        processed_num=total.input_num,
204        discard_num=total.discard_num,
205        input_bytes=total.input_bytes,
206        output_bytes=total.output_bytes,
207        cumulative_time_ns=total.cumulative_time_ns,
208        total_token_num=total.input_chars,  # Assuming total_token_num is equivalent to input_chars
209    )
210
211    layers_info = {
212        stat.name: FilterStatistics(
213            name=stat.name,  # type: ignore
214            discard_num=stat.discard_num,
215            diff_bytes=stat.diff_bytes,
216            cumulative_time_ns=stat.cumulative_time_ns,
217            params={},
218        )
219        for stat in stats
220        if stat.name != "Total"
221    }
222
223    return StatsContainer(
224        total_info=total_info,
225        layers_info=layers_info,  # type: ignore
226    )
class Inspector(hojichar.core.filter_interface.Filter):
16class Inspector(Filter):
17    def __init__(
18        self, target_filter: Union[Filter, TokenFilter], filter_idx: int, *args: Any, **kwargs: Any
19    ) -> None:
20        super().__init__(*args, **kwargs)
21        self.logger = logging.getLogger("hojichar.Inspector")
22        self.target_filter = target_filter
23        self.filter_idx = filter_idx
24        self.target = f"{filter_idx}-{target_filter.name}"
25
26        self.is_rejected = False
27        self.text_hash = 0
28        self.tokens_hash = 0
29
30    def apply(self, document: Document) -> Document:
31        self.inspect(document)
32        return document
33
34    def inspect(self, document: Document) -> None:
35        self.is_rejected = False
36        self.is_rejected = document.is_rejected
37        self.bytes = len(document.text.encode("utf-8"))
38        self.time_ns = time.perf_counter_ns()

Base class for all filters. Document-level filters must inherit from this class.

The definition of text processing is in apply method. If you define a new filter, override the method.

When this class is called, apply the filter from string to string.

With context manager, you can use the filter as follows:

with YourFilter(p=0.5) as filt:
    text = filt("This is a sample text.")
Inspector( target_filter: Union[hojichar.core.filter_interface.Filter, hojichar.core.filter_interface.TokenFilter], filter_idx: int, *args: Any, **kwargs: Any)
17    def __init__(
18        self, target_filter: Union[Filter, TokenFilter], filter_idx: int, *args: Any, **kwargs: Any
19    ) -> None:
20        super().__init__(*args, **kwargs)
21        self.logger = logging.getLogger("hojichar.Inspector")
22        self.target_filter = target_filter
23        self.filter_idx = filter_idx
24        self.target = f"{filter_idx}-{target_filter.name}"
25
26        self.is_rejected = False
27        self.text_hash = 0
28        self.tokens_hash = 0

Initialize the filter.

Parameters

p : float The probability of applying the filter. If p is 1, the filter will always be applied. skip_rejected : bool If True, the filter will skip documents that are already rejected. If you want to apply the filter to all documents (e.g., postprocess), set this to False. random_state : Optional[Union[int, np.random.Generator]] Seed for the random number generator. If None, a new random number generator will be created. If None, and use in the Compose class, the random state is shared with the Compose object. use_batch : bool If True, the filter will process documents in batches in the apply_stream method. batch_size : int The size of the batch to process documents in the apply_stream method. kwargs : Any Additional keyword arguments to pass to the filter.

def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
30    def apply(self, document: Document) -> Document:
31        self.inspect(document)
32        return document

Definition of filter behavior.

The document must have a protocol TextContent, and mostly used hojichar.Document class.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

def inspect(self, document: hojichar.core.models.Document) -> None:
34    def inspect(self, document: Document) -> None:
35        self.is_rejected = False
36        self.is_rejected = document.is_rejected
37        self.bytes = len(document.text.encode("utf-8"))
38        self.time_ns = time.perf_counter_ns()
@dataclasses.dataclass
class FilterStatistics:
41@dataclasses.dataclass
42class FilterStatistics:
43    name: str
44    discard_num: int = 0
45    diff_bytes: int = 0
46    cumulative_time_ns: int = 0
47    params: Dict[str, Any] = dataclasses.field(default_factory=dict)
48
49    def get_human_readable_values(self) -> dict:
50        ret = {
51            "name": self.name,
52            "discard_num": self.discard_num,
53            "diff_MB": (self.diff_bytes / 1048576),  # 1024**2
54            "cumulative_time": (self.cumulative_time_ns / 10**9),
55            "params": self.params,
56        }
57        return ret
58
59    def __add__(self, other: FilterStatistics) -> FilterStatistics:
60        assert self.name == other.name, "Layer names must match"
61        return FilterStatistics(
62            self.name,
63            self.discard_num + other.discard_num,
64            self.diff_bytes + other.diff_bytes,
65            self.cumulative_time_ns + other.cumulative_time_ns,
66            self.params,
67        )
68
69    def reset(self) -> FilterStatistics:
70        self.discard_num = 0
71        self.diff_bytes = 0
72        self.cumulative_time_ns = 0
73        return self
FilterStatistics( name: str, discard_num: int = 0, diff_bytes: int = 0, cumulative_time_ns: int = 0, params: Dict[str, Any] = <factory>)
def get_human_readable_values(self) -> dict:
49    def get_human_readable_values(self) -> dict:
50        ret = {
51            "name": self.name,
52            "discard_num": self.discard_num,
53            "diff_MB": (self.diff_bytes / 1048576),  # 1024**2
54            "cumulative_time": (self.cumulative_time_ns / 10**9),
55            "params": self.params,
56        }
57        return ret
def reset(self) -> hojichar.core.inspection.FilterStatistics:
69    def reset(self) -> FilterStatistics:
70        self.discard_num = 0
71        self.diff_bytes = 0
72        self.cumulative_time_ns = 0
73        return self
@dataclasses.dataclass
class DocStatistics:
 76@dataclasses.dataclass
 77class DocStatistics:
 78    processed_num: int = 0
 79    discard_num: int = 0
 80    input_bytes: int = 0
 81    output_bytes: int = 0
 82    cumulative_time_ns: int = 0
 83    total_token_num: int = 0
 84
 85    def get_human_readable_values(self) -> dict:
 86        ret = {
 87            "processed_num": self.processed_num,
 88            "discard_num": self.discard_num,
 89            "input_MB": (self.input_bytes / 1000**2),
 90            "output_MB": (self.output_bytes / 1000**2),
 91            "cumulative_time": (self.cumulative_time_ns / 10**9),
 92            "total_token_num": self.total_token_num,
 93        }
 94        return ret
 95
 96    def __add__(self, other: DocStatistics) -> DocStatistics:
 97        return DocStatistics(
 98            self.processed_num + other.processed_num,
 99            self.discard_num + other.discard_num,
100            self.input_bytes + other.input_bytes,
101            self.output_bytes + other.output_bytes,
102            self.cumulative_time_ns + other.cumulative_time_ns,
103            self.total_token_num + other.total_token_num,
104        )
105
106    def reset(self) -> DocStatistics:
107        self.processed_num = 0
108        self.discard_num = 0
109        self.input_bytes = 0
110        self.output_bytes = 0
111        self.cumulative_time_ns = 0
112        self.total_token_num = 0
113        return self
DocStatistics( processed_num: int = 0, discard_num: int = 0, input_bytes: int = 0, output_bytes: int = 0, cumulative_time_ns: int = 0, total_token_num: int = 0)
def get_human_readable_values(self) -> dict:
85    def get_human_readable_values(self) -> dict:
86        ret = {
87            "processed_num": self.processed_num,
88            "discard_num": self.discard_num,
89            "input_MB": (self.input_bytes / 1000**2),
90            "output_MB": (self.output_bytes / 1000**2),
91            "cumulative_time": (self.cumulative_time_ns / 10**9),
92            "total_token_num": self.total_token_num,
93        }
94        return ret
def reset(self) -> hojichar.core.inspection.DocStatistics:
106    def reset(self) -> DocStatistics:
107        self.processed_num = 0
108        self.discard_num = 0
109        self.input_bytes = 0
110        self.output_bytes = 0
111        self.cumulative_time_ns = 0
112        self.total_token_num = 0
113        return self
@dataclasses.dataclass
class StatsContainer:
116@dataclasses.dataclass
117class StatsContainer:
118    total_info: DocStatistics
119    layers_info: Dict[str, FilterStatistics]  # Key of the dict is filter name.
120
121    def __add__(self, other: StatsContainer) -> StatsContainer:
122        assert self.layers_info.keys() == other.layers_info.keys(), "Layer names must match"
123        return StatsContainer(
124            self.total_info + other.total_info,
125            {k: v + other.layers_info[k] for k, v in self.layers_info.items()},
126        )
127
128    def get_human_readable_values(self) -> dict:
129        return {
130            "total_info": self.total_info.get_human_readable_values(),
131            "layers_info": [
132                layer.get_human_readable_values() for layer in self.layers_info.values()
133            ],
134        }
135
136    def reset(self) -> StatsContainer:
137        self.total_info.reset
138        for layer in self.layers_info.values():
139            layer.reset()
140        return self
StatsContainer( total_info: hojichar.core.inspection.DocStatistics, layers_info: Dict[str, hojichar.core.inspection.FilterStatistics])
def get_human_readable_values(self) -> dict:
128    def get_human_readable_values(self) -> dict:
129        return {
130            "total_info": self.total_info.get_human_readable_values(),
131            "layers_info": [
132                layer.get_human_readable_values() for layer in self.layers_info.values()
133            ],
134        }
def reset(self) -> hojichar.core.inspection.StatsContainer:
136    def reset(self) -> StatsContainer:
137        self.total_info.reset
138        for layer in self.layers_info.values():
139            layer.reset()
140        return self
class StatisticsCounter:
143class StatisticsCounter:
144    def __init__(self, inspectors: List[Inspector]) -> None:
145        counts = dict()
146        for inspector in inspectors:
147            counts[inspector.target] = FilterStatistics(
148                name=inspector.target,
149                params=inspector.target_filter.get_jsonable_vars(),
150            )
151        self.stats = StatsContainer(
152            DocStatistics(),
153            counts,
154        )
155
156    def update_changes(
157        self,
158        document: Document,
159        before_process_inspector: Inspector,
160        inspectors: List[Inspector],
161    ) -> None:
162        # Counting statistics for each filter
163        previous_inspector = before_process_inspector
164        for idx, inspector in enumerate(inspectors):
165            # Logging how many docs are discarded in each filter
166            if (not previous_inspector.is_rejected) and inspector.is_rejected:
167                self.stats.layers_info[inspector.target].discard_num += 1
168
169            # logging how much volume of docs are changed in each filter.
170            if (not previous_inspector.is_rejected) and inspector.is_rejected:
171                diff_bytes = -inspector.bytes
172            elif previous_inspector.is_rejected and inspector.is_rejected:
173                diff_bytes = 0
174            else:
175                diff_bytes = inspector.bytes - previous_inspector.bytes
176
177            self.stats.layers_info[inspector.target].diff_bytes += diff_bytes
178
179            process_time_ns = inspector.time_ns - previous_inspector.time_ns
180            self.stats.layers_info[inspector.target].cumulative_time_ns += process_time_ns
181
182            previous_inspector = inspector
183
184        # Counting total statistics
185        self.stats.total_info.processed_num += 1
186        self.stats.total_info.discard_num += (
187            1 if any([inspector.is_rejected for inspector in inspectors]) > 0 else 0
188        )
189        self.stats.total_info.input_bytes += len(document.original.encode("utf-8"))
190        self.stats.total_info.output_bytes += (
191            0 if document.is_rejected else len(document.text.encode("utf-8"))
192        )
193        self.stats.total_info.cumulative_time_ns += inspectors[-1].time_ns - inspectors[0].time_ns
194        self.stats.total_info.total_token_num += len(document.tokens)
195
196    def get_statistics(self) -> dict:
197        return self.stats.get_human_readable_values()
StatisticsCounter(inspectors: List[hojichar.core.inspection.Inspector])
144    def __init__(self, inspectors: List[Inspector]) -> None:
145        counts = dict()
146        for inspector in inspectors:
147            counts[inspector.target] = FilterStatistics(
148                name=inspector.target,
149                params=inspector.target_filter.get_jsonable_vars(),
150            )
151        self.stats = StatsContainer(
152            DocStatistics(),
153            counts,
154        )
def update_changes( self, document: hojichar.core.models.Document, before_process_inspector: hojichar.core.inspection.Inspector, inspectors: List[hojichar.core.inspection.Inspector]) -> None:
156    def update_changes(
157        self,
158        document: Document,
159        before_process_inspector: Inspector,
160        inspectors: List[Inspector],
161    ) -> None:
162        # Counting statistics for each filter
163        previous_inspector = before_process_inspector
164        for idx, inspector in enumerate(inspectors):
165            # Logging how many docs are discarded in each filter
166            if (not previous_inspector.is_rejected) and inspector.is_rejected:
167                self.stats.layers_info[inspector.target].discard_num += 1
168
169            # logging how much volume of docs are changed in each filter.
170            if (not previous_inspector.is_rejected) and inspector.is_rejected:
171                diff_bytes = -inspector.bytes
172            elif previous_inspector.is_rejected and inspector.is_rejected:
173                diff_bytes = 0
174            else:
175                diff_bytes = inspector.bytes - previous_inspector.bytes
176
177            self.stats.layers_info[inspector.target].diff_bytes += diff_bytes
178
179            process_time_ns = inspector.time_ns - previous_inspector.time_ns
180            self.stats.layers_info[inspector.target].cumulative_time_ns += process_time_ns
181
182            previous_inspector = inspector
183
184        # Counting total statistics
185        self.stats.total_info.processed_num += 1
186        self.stats.total_info.discard_num += (
187            1 if any([inspector.is_rejected for inspector in inspectors]) > 0 else 0
188        )
189        self.stats.total_info.input_bytes += len(document.original.encode("utf-8"))
190        self.stats.total_info.output_bytes += (
191            0 if document.is_rejected else len(document.text.encode("utf-8"))
192        )
193        self.stats.total_info.cumulative_time_ns += inspectors[-1].time_ns - inspectors[0].time_ns
194        self.stats.total_info.total_token_num += len(document.tokens)
def get_statistics(self) -> dict:
196    def get_statistics(self) -> dict:
197        return self.stats.get_human_readable_values()
@deprecated_since('1.0.0', 'Compose.get_total_statistics')
def statistics_obj_adapter( stats: List[hojichar.core.models.Statistics]) -> hojichar.core.inspection.StatsContainer:
200@deprecated_since("1.0.0", "Compose.get_total_statistics")
201def statistics_obj_adapter(stats: List["Statistics"]) -> StatsContainer:
202    total = Statistics.get_filter("Total", stats)
203    total_info = DocStatistics(
204        processed_num=total.input_num,
205        discard_num=total.discard_num,
206        input_bytes=total.input_bytes,
207        output_bytes=total.output_bytes,
208        cumulative_time_ns=total.cumulative_time_ns,
209        total_token_num=total.input_chars,  # Assuming total_token_num is equivalent to input_chars
210    )
211
212    layers_info = {
213        stat.name: FilterStatistics(
214            name=stat.name,  # type: ignore
215            discard_num=stat.discard_num,
216            diff_bytes=stat.diff_bytes,
217            cumulative_time_ns=stat.cumulative_time_ns,
218            params={},
219        )
220        for stat in stats
221        if stat.name != "Total"
222    }
223
224    return StatsContainer(
225        total_info=total_info,
226        layers_info=layers_info,  # type: ignore
227    )