hojichar.core.models
1import time 2from dataclasses import dataclass, fields 3from typing import Any, Dict, List, Optional 4 5from hojichar.utils.warn_deprecation import deprecated_since 6 7 8@deprecated_since("0.1.0", "Document") 9class Token: 10 def __init__(self, text: str, is_rejected: bool = False) -> None: 11 self.text = text 12 self.__original = text 13 self.is_rejected = is_rejected 14 15 @property 16 def original(self) -> str: 17 return self.__original 18 19 def __str__(self) -> str: 20 return self.text 21 22 23class Document: 24 """ 25 Document class represents a text document with metadata. 26 It contains the text of the document, a flag indicating whether it is rejected, 27 and additional metadata stored in the `extras` dictionary. 28 29 The `tokens` attribute will be deprecated in future versions, 30 and users are encouraged to use the `extras` dictionary to store token-related information. 31 32 Attributes: 33 text (str): The text content of the document. 34 is_rejected (bool): A flag indicating whether the document is rejected. 35 extras (Dict[str, Any]): A dictionary to store additional metadata about the document. 36 reject_reason (Dict[str, Any]): A dictionary to store the reason for rejection. The 37 filter class and the member name and value will logged at the filter is logged here. 38 initial_stats (Optional[Dict[str, Any]]): Internal copy of the document statistics 39 captured before the pipeline mutates the document. 40 41 Next attributes will be deprecated in future versions: 42 tokens (List[Token]): A list of tokens extracted from the document. 43 """ 44 45 def __init__( 46 self, 47 text: str, 48 is_rejected: bool = False, 49 tokens: Optional[List[Token]] = None, 50 extras: Optional[Dict[str, Any]] = None, 51 ) -> None: 52 self.text = text 53 self.__original = text 54 self.is_rejected = is_rejected 55 if tokens is None: 56 self.tokens: List[Token] = [] 57 else: 58 self.tokens = tokens 59 60 if extras is None: 61 self.extras: Dict[str, Any] = {} 62 else: 63 self.extras = extras 64 65 self.reject_reason: Dict[str, Any] = {} 66 self._initial_stats: Optional[dict[str, Any]] = None 67 if "__init_stats" in self.extras: 68 self._initial_stats = self.extras.pop("__init_stats") 69 70 @property 71 def original(self) -> str: 72 return self.__original 73 74 def _set_initial_stats(self, stats: dict[str, Any]) -> None: 75 """ 76 Store the document statistics captured before the pipeline modifies the document. 77 Internal API: not intended for filter implementations. 78 """ 79 self._initial_stats = stats 80 81 def _get_initial_stats(self) -> Optional[dict[str, Any]]: 82 """ 83 Retrieve the stored initial statistics of the document, if available. 84 Internal API: not intended for filter implementations. 85 """ 86 return self._initial_stats 87 88 def _clear_initial_stats(self) -> None: 89 """ 90 Remove the stored initial statistics. Useful after the stats are consumed. 91 Internal API: not intended for filter implementations. 92 """ 93 self._initial_stats = None 94 95 @deprecated_since("1.0.0") 96 def set_tokens(self, tokens: List[str]) -> None: 97 self.tokens = [Token(token) for token in tokens] 98 99 @deprecated_since("1.0.0") 100 def get_tokens(self) -> List[str]: 101 return [token.text for token in self.tokens] 102 103 def __str__(self) -> str: 104 return self.text 105 106 def __repr__(self) -> str: 107 return ( 108 f"Document(text={self.text!r}, is_rejected={self.is_rejected}, extras={self.extras})" # noqa 109 ) 110 111 112@dataclass 113class Statistics: 114 """ 115 Statistics class to track the performance of the document processing pipeline. 116 """ 117 118 name: Optional[str] = None 119 input_num: int = 0 120 input_bytes: int = 0 121 input_chars: int = 0 122 output_num: int = 0 123 output_bytes: int = 0 124 output_chars: int = 0 125 discard_num: int = 0 126 diff_bytes: int = 0 127 diff_chars: int = 0 128 cumulative_time_ns: int = 0 129 errors: int = 0 130 131 def to_dict(self) -> Dict[str, Any]: 132 """ 133 Convert the Statistics object to a dictionary. 134 """ 135 return {f.name: getattr(self, f.name) for f in fields(self)} 136 137 def update(self, other: "Statistics") -> None: 138 """ 139 Update the statistics by adding another Statistics object. 140 """ 141 self.input_num += other.input_num 142 self.input_bytes += other.input_bytes 143 self.input_chars += other.input_chars 144 self.output_num += other.output_num 145 self.output_bytes += other.output_bytes 146 self.output_chars += other.output_chars 147 self.discard_num += other.discard_num 148 self.diff_bytes += other.diff_bytes 149 self.diff_chars += other.diff_chars 150 self.cumulative_time_ns += other.cumulative_time_ns 151 self.errors += other.errors 152 153 def reset(self) -> "Statistics": 154 """ 155 Reset the statistics to their initial values. 156 """ 157 self.input_num = 0 158 self.input_bytes = 0 159 self.input_chars = 0 160 self.output_num = 0 161 self.output_bytes = 0 162 self.output_chars = 0 163 self.discard_num = 0 164 self.diff_bytes = 0 165 self.diff_chars = 0 166 self.cumulative_time_ns = 0 167 self.errors = 0 168 return self 169 170 def update_by_diff( 171 self, 172 before: dict[str, Any], 173 after: dict[str, Any], 174 ) -> None: 175 """ 176 Update the statistics by calculating the differences between two Doc-info mappings. 177 This method is used to update the statistics after a filter is applied. 178 """ 179 if not before["is_rejected"] and after["is_rejected"]: 180 # Document is rejected after the filter is applied 181 self.input_num += 1 182 self.input_bytes += before["bytes"] 183 self.input_chars += before["chars"] 184 self.discard_num += 1 185 self.diff_bytes -= before["bytes"] 186 self.diff_chars -= before["chars"] 187 self.cumulative_time_ns += after["time_ns"] - before["time_ns"] 188 else: 189 # Document is not rejected or still not rejected after the filter is applied 190 self.input_num += 1 191 self.input_bytes += before["bytes"] 192 self.input_chars += before["chars"] 193 self.output_num += 1 194 self.output_bytes += after["bytes"] 195 self.output_chars += after["chars"] 196 self.diff_bytes += after["bytes"] - before["bytes"] 197 self.diff_chars += after["chars"] - before["chars"] 198 self.cumulative_time_ns += after["time_ns"] - before["time_ns"] 199 200 @staticmethod 201 def add(x: "Statistics", y: "Statistics") -> "Statistics": 202 """ 203 Add two Statistics objects together. 204 This method assumes that the names of the two Statistics objects match. 205 If they do not match, it will raise an AssertionError.""" 206 assert x.name == y.name, "Layer names must match" 207 return Statistics( 208 name=x.name, 209 input_num=x.input_num + y.input_num, 210 input_bytes=x.input_bytes + y.input_bytes, 211 input_chars=x.input_chars + y.input_chars, 212 output_num=x.output_num + y.output_num, 213 output_bytes=x.output_bytes + y.output_bytes, 214 output_chars=x.output_chars + y.output_chars, 215 discard_num=x.discard_num + y.discard_num, 216 diff_bytes=x.diff_bytes + y.diff_bytes, 217 diff_chars=x.diff_chars + y.diff_chars, 218 cumulative_time_ns=x.cumulative_time_ns + y.cumulative_time_ns, 219 errors=x.errors + y.errors, 220 ) 221 222 @staticmethod 223 def add_list_of_stats(x: List["Statistics"], y: List["Statistics"]) -> List["Statistics"]: 224 """ 225 Add FilterStatistics objects from two lists by matching their names. 226 This method assumes that both lists contain FilterStatistics objects 227 with the same names, and it will raise a ValueError if the sets of names 228 in the two lists do not match. 229 """ 230 # check if the names in both lists match 231 names_x = {stat.name for stat in x} 232 names_y = {stat.name for stat in y} 233 if names_x != names_y: 234 raise ValueError(f"name の集合が一致しません: {names_x} vs {names_y}") 235 236 y_map = {stat.name: stat for stat in y} 237 238 # keep the order of x and add corresponding y 239 result: List[Statistics] = [] 240 for stat_x in x: 241 stat_y = y_map[stat_x.name] 242 result.append(Statistics.add(stat_x, stat_y)) 243 244 return result 245 246 @staticmethod 247 def get_filter(name: str, stats: List["Statistics"]) -> "Statistics": 248 """ 249 Get a Statistics object by its name from a list of statistics. 250 If the name is not found, return None. 251 """ 252 for stat in stats: 253 if stat.name == name: 254 return stat 255 raise KeyError(f"Statistics with name '{name}' not found in the list.") 256 257 258def get_doc_info(document: Document) -> dict[str, Any]: 259 """ 260 Create a document-info mapping from a Document instance. 261 This function is used to extract metadata from the Document for statistics tracking. 262 """ 263 return { 264 "is_rejected": document.is_rejected, 265 "bytes": len(document.text.encode("utf-8")), 266 "chars": len(document.text), 267 "time_ns": time.perf_counter_ns(), 268 }
9@deprecated_since("0.1.0", "Document") 10class Token: 11 def __init__(self, text: str, is_rejected: bool = False) -> None: 12 self.text = text 13 self.__original = text 14 self.is_rejected = is_rejected 15 16 @property 17 def original(self) -> str: 18 return self.__original 19 20 def __str__(self) -> str: 21 return self.text
24class Document: 25 """ 26 Document class represents a text document with metadata. 27 It contains the text of the document, a flag indicating whether it is rejected, 28 and additional metadata stored in the `extras` dictionary. 29 30 The `tokens` attribute will be deprecated in future versions, 31 and users are encouraged to use the `extras` dictionary to store token-related information. 32 33 Attributes: 34 text (str): The text content of the document. 35 is_rejected (bool): A flag indicating whether the document is rejected. 36 extras (Dict[str, Any]): A dictionary to store additional metadata about the document. 37 reject_reason (Dict[str, Any]): A dictionary to store the reason for rejection. The 38 filter class and the member name and value will logged at the filter is logged here. 39 initial_stats (Optional[Dict[str, Any]]): Internal copy of the document statistics 40 captured before the pipeline mutates the document. 41 42 Next attributes will be deprecated in future versions: 43 tokens (List[Token]): A list of tokens extracted from the document. 44 """ 45 46 def __init__( 47 self, 48 text: str, 49 is_rejected: bool = False, 50 tokens: Optional[List[Token]] = None, 51 extras: Optional[Dict[str, Any]] = None, 52 ) -> None: 53 self.text = text 54 self.__original = text 55 self.is_rejected = is_rejected 56 if tokens is None: 57 self.tokens: List[Token] = [] 58 else: 59 self.tokens = tokens 60 61 if extras is None: 62 self.extras: Dict[str, Any] = {} 63 else: 64 self.extras = extras 65 66 self.reject_reason: Dict[str, Any] = {} 67 self._initial_stats: Optional[dict[str, Any]] = None 68 if "__init_stats" in self.extras: 69 self._initial_stats = self.extras.pop("__init_stats") 70 71 @property 72 def original(self) -> str: 73 return self.__original 74 75 def _set_initial_stats(self, stats: dict[str, Any]) -> None: 76 """ 77 Store the document statistics captured before the pipeline modifies the document. 78 Internal API: not intended for filter implementations. 79 """ 80 self._initial_stats = stats 81 82 def _get_initial_stats(self) -> Optional[dict[str, Any]]: 83 """ 84 Retrieve the stored initial statistics of the document, if available. 85 Internal API: not intended for filter implementations. 86 """ 87 return self._initial_stats 88 89 def _clear_initial_stats(self) -> None: 90 """ 91 Remove the stored initial statistics. Useful after the stats are consumed. 92 Internal API: not intended for filter implementations. 93 """ 94 self._initial_stats = None 95 96 @deprecated_since("1.0.0") 97 def set_tokens(self, tokens: List[str]) -> None: 98 self.tokens = [Token(token) for token in tokens] 99 100 @deprecated_since("1.0.0") 101 def get_tokens(self) -> List[str]: 102 return [token.text for token in self.tokens] 103 104 def __str__(self) -> str: 105 return self.text 106 107 def __repr__(self) -> str: 108 return ( 109 f"Document(text={self.text!r}, is_rejected={self.is_rejected}, extras={self.extras})" # noqa 110 )
Document class represents a text document with metadata.
It contains the text of the document, a flag indicating whether it is rejected,
and additional metadata stored in the extras dictionary.
The tokens attribute will be deprecated in future versions,
and users are encouraged to use the extras dictionary to store token-related information.
Attributes: text (str): The text content of the document. is_rejected (bool): A flag indicating whether the document is rejected. extras (Dict[str, Any]): A dictionary to store additional metadata about the document. reject_reason (Dict[str, Any]): A dictionary to store the reason for rejection. The filter class and the member name and value will logged at the filter is logged here. initial_stats (Optional[Dict[str, Any]]): Internal copy of the document statistics captured before the pipeline mutates the document.
Next attributes will be deprecated in future versions: tokens (List[Token]): A list of tokens extracted from the document.
46 def __init__( 47 self, 48 text: str, 49 is_rejected: bool = False, 50 tokens: Optional[List[Token]] = None, 51 extras: Optional[Dict[str, Any]] = None, 52 ) -> None: 53 self.text = text 54 self.__original = text 55 self.is_rejected = is_rejected 56 if tokens is None: 57 self.tokens: List[Token] = [] 58 else: 59 self.tokens = tokens 60 61 if extras is None: 62 self.extras: Dict[str, Any] = {} 63 else: 64 self.extras = extras 65 66 self.reject_reason: Dict[str, Any] = {} 67 self._initial_stats: Optional[dict[str, Any]] = None 68 if "__init_stats" in self.extras: 69 self._initial_stats = self.extras.pop("__init_stats")
113@dataclass 114class Statistics: 115 """ 116 Statistics class to track the performance of the document processing pipeline. 117 """ 118 119 name: Optional[str] = None 120 input_num: int = 0 121 input_bytes: int = 0 122 input_chars: int = 0 123 output_num: int = 0 124 output_bytes: int = 0 125 output_chars: int = 0 126 discard_num: int = 0 127 diff_bytes: int = 0 128 diff_chars: int = 0 129 cumulative_time_ns: int = 0 130 errors: int = 0 131 132 def to_dict(self) -> Dict[str, Any]: 133 """ 134 Convert the Statistics object to a dictionary. 135 """ 136 return {f.name: getattr(self, f.name) for f in fields(self)} 137 138 def update(self, other: "Statistics") -> None: 139 """ 140 Update the statistics by adding another Statistics object. 141 """ 142 self.input_num += other.input_num 143 self.input_bytes += other.input_bytes 144 self.input_chars += other.input_chars 145 self.output_num += other.output_num 146 self.output_bytes += other.output_bytes 147 self.output_chars += other.output_chars 148 self.discard_num += other.discard_num 149 self.diff_bytes += other.diff_bytes 150 self.diff_chars += other.diff_chars 151 self.cumulative_time_ns += other.cumulative_time_ns 152 self.errors += other.errors 153 154 def reset(self) -> "Statistics": 155 """ 156 Reset the statistics to their initial values. 157 """ 158 self.input_num = 0 159 self.input_bytes = 0 160 self.input_chars = 0 161 self.output_num = 0 162 self.output_bytes = 0 163 self.output_chars = 0 164 self.discard_num = 0 165 self.diff_bytes = 0 166 self.diff_chars = 0 167 self.cumulative_time_ns = 0 168 self.errors = 0 169 return self 170 171 def update_by_diff( 172 self, 173 before: dict[str, Any], 174 after: dict[str, Any], 175 ) -> None: 176 """ 177 Update the statistics by calculating the differences between two Doc-info mappings. 178 This method is used to update the statistics after a filter is applied. 179 """ 180 if not before["is_rejected"] and after["is_rejected"]: 181 # Document is rejected after the filter is applied 182 self.input_num += 1 183 self.input_bytes += before["bytes"] 184 self.input_chars += before["chars"] 185 self.discard_num += 1 186 self.diff_bytes -= before["bytes"] 187 self.diff_chars -= before["chars"] 188 self.cumulative_time_ns += after["time_ns"] - before["time_ns"] 189 else: 190 # Document is not rejected or still not rejected after the filter is applied 191 self.input_num += 1 192 self.input_bytes += before["bytes"] 193 self.input_chars += before["chars"] 194 self.output_num += 1 195 self.output_bytes += after["bytes"] 196 self.output_chars += after["chars"] 197 self.diff_bytes += after["bytes"] - before["bytes"] 198 self.diff_chars += after["chars"] - before["chars"] 199 self.cumulative_time_ns += after["time_ns"] - before["time_ns"] 200 201 @staticmethod 202 def add(x: "Statistics", y: "Statistics") -> "Statistics": 203 """ 204 Add two Statistics objects together. 205 This method assumes that the names of the two Statistics objects match. 206 If they do not match, it will raise an AssertionError.""" 207 assert x.name == y.name, "Layer names must match" 208 return Statistics( 209 name=x.name, 210 input_num=x.input_num + y.input_num, 211 input_bytes=x.input_bytes + y.input_bytes, 212 input_chars=x.input_chars + y.input_chars, 213 output_num=x.output_num + y.output_num, 214 output_bytes=x.output_bytes + y.output_bytes, 215 output_chars=x.output_chars + y.output_chars, 216 discard_num=x.discard_num + y.discard_num, 217 diff_bytes=x.diff_bytes + y.diff_bytes, 218 diff_chars=x.diff_chars + y.diff_chars, 219 cumulative_time_ns=x.cumulative_time_ns + y.cumulative_time_ns, 220 errors=x.errors + y.errors, 221 ) 222 223 @staticmethod 224 def add_list_of_stats(x: List["Statistics"], y: List["Statistics"]) -> List["Statistics"]: 225 """ 226 Add FilterStatistics objects from two lists by matching their names. 227 This method assumes that both lists contain FilterStatistics objects 228 with the same names, and it will raise a ValueError if the sets of names 229 in the two lists do not match. 230 """ 231 # check if the names in both lists match 232 names_x = {stat.name for stat in x} 233 names_y = {stat.name for stat in y} 234 if names_x != names_y: 235 raise ValueError(f"name の集合が一致しません: {names_x} vs {names_y}") 236 237 y_map = {stat.name: stat for stat in y} 238 239 # keep the order of x and add corresponding y 240 result: List[Statistics] = [] 241 for stat_x in x: 242 stat_y = y_map[stat_x.name] 243 result.append(Statistics.add(stat_x, stat_y)) 244 245 return result 246 247 @staticmethod 248 def get_filter(name: str, stats: List["Statistics"]) -> "Statistics": 249 """ 250 Get a Statistics object by its name from a list of statistics. 251 If the name is not found, return None. 252 """ 253 for stat in stats: 254 if stat.name == name: 255 return stat 256 raise KeyError(f"Statistics with name '{name}' not found in the list.")
Statistics class to track the performance of the document processing pipeline.
132 def to_dict(self) -> Dict[str, Any]: 133 """ 134 Convert the Statistics object to a dictionary. 135 """ 136 return {f.name: getattr(self, f.name) for f in fields(self)}
Convert the Statistics object to a dictionary.
138 def update(self, other: "Statistics") -> None: 139 """ 140 Update the statistics by adding another Statistics object. 141 """ 142 self.input_num += other.input_num 143 self.input_bytes += other.input_bytes 144 self.input_chars += other.input_chars 145 self.output_num += other.output_num 146 self.output_bytes += other.output_bytes 147 self.output_chars += other.output_chars 148 self.discard_num += other.discard_num 149 self.diff_bytes += other.diff_bytes 150 self.diff_chars += other.diff_chars 151 self.cumulative_time_ns += other.cumulative_time_ns 152 self.errors += other.errors
Update the statistics by adding another Statistics object.
154 def reset(self) -> "Statistics": 155 """ 156 Reset the statistics to their initial values. 157 """ 158 self.input_num = 0 159 self.input_bytes = 0 160 self.input_chars = 0 161 self.output_num = 0 162 self.output_bytes = 0 163 self.output_chars = 0 164 self.discard_num = 0 165 self.diff_bytes = 0 166 self.diff_chars = 0 167 self.cumulative_time_ns = 0 168 self.errors = 0 169 return self
Reset the statistics to their initial values.
171 def update_by_diff( 172 self, 173 before: dict[str, Any], 174 after: dict[str, Any], 175 ) -> None: 176 """ 177 Update the statistics by calculating the differences between two Doc-info mappings. 178 This method is used to update the statistics after a filter is applied. 179 """ 180 if not before["is_rejected"] and after["is_rejected"]: 181 # Document is rejected after the filter is applied 182 self.input_num += 1 183 self.input_bytes += before["bytes"] 184 self.input_chars += before["chars"] 185 self.discard_num += 1 186 self.diff_bytes -= before["bytes"] 187 self.diff_chars -= before["chars"] 188 self.cumulative_time_ns += after["time_ns"] - before["time_ns"] 189 else: 190 # Document is not rejected or still not rejected after the filter is applied 191 self.input_num += 1 192 self.input_bytes += before["bytes"] 193 self.input_chars += before["chars"] 194 self.output_num += 1 195 self.output_bytes += after["bytes"] 196 self.output_chars += after["chars"] 197 self.diff_bytes += after["bytes"] - before["bytes"] 198 self.diff_chars += after["chars"] - before["chars"] 199 self.cumulative_time_ns += after["time_ns"] - before["time_ns"]
Update the statistics by calculating the differences between two Doc-info mappings. This method is used to update the statistics after a filter is applied.
201 @staticmethod 202 def add(x: "Statistics", y: "Statistics") -> "Statistics": 203 """ 204 Add two Statistics objects together. 205 This method assumes that the names of the two Statistics objects match. 206 If they do not match, it will raise an AssertionError.""" 207 assert x.name == y.name, "Layer names must match" 208 return Statistics( 209 name=x.name, 210 input_num=x.input_num + y.input_num, 211 input_bytes=x.input_bytes + y.input_bytes, 212 input_chars=x.input_chars + y.input_chars, 213 output_num=x.output_num + y.output_num, 214 output_bytes=x.output_bytes + y.output_bytes, 215 output_chars=x.output_chars + y.output_chars, 216 discard_num=x.discard_num + y.discard_num, 217 diff_bytes=x.diff_bytes + y.diff_bytes, 218 diff_chars=x.diff_chars + y.diff_chars, 219 cumulative_time_ns=x.cumulative_time_ns + y.cumulative_time_ns, 220 errors=x.errors + y.errors, 221 )
Add two Statistics objects together. This method assumes that the names of the two Statistics objects match. If they do not match, it will raise an AssertionError.
223 @staticmethod 224 def add_list_of_stats(x: List["Statistics"], y: List["Statistics"]) -> List["Statistics"]: 225 """ 226 Add FilterStatistics objects from two lists by matching their names. 227 This method assumes that both lists contain FilterStatistics objects 228 with the same names, and it will raise a ValueError if the sets of names 229 in the two lists do not match. 230 """ 231 # check if the names in both lists match 232 names_x = {stat.name for stat in x} 233 names_y = {stat.name for stat in y} 234 if names_x != names_y: 235 raise ValueError(f"name の集合が一致しません: {names_x} vs {names_y}") 236 237 y_map = {stat.name: stat for stat in y} 238 239 # keep the order of x and add corresponding y 240 result: List[Statistics] = [] 241 for stat_x in x: 242 stat_y = y_map[stat_x.name] 243 result.append(Statistics.add(stat_x, stat_y)) 244 245 return result
Add FilterStatistics objects from two lists by matching their names. This method assumes that both lists contain FilterStatistics objects with the same names, and it will raise a ValueError if the sets of names in the two lists do not match.
247 @staticmethod 248 def get_filter(name: str, stats: List["Statistics"]) -> "Statistics": 249 """ 250 Get a Statistics object by its name from a list of statistics. 251 If the name is not found, return None. 252 """ 253 for stat in stats: 254 if stat.name == name: 255 return stat 256 raise KeyError(f"Statistics with name '{name}' not found in the list.")
Get a Statistics object by its name from a list of statistics. If the name is not found, return None.
259def get_doc_info(document: Document) -> dict[str, Any]: 260 """ 261 Create a document-info mapping from a Document instance. 262 This function is used to extract metadata from the Document for statistics tracking. 263 """ 264 return { 265 "is_rejected": document.is_rejected, 266 "bytes": len(document.text.encode("utf-8")), 267 "chars": len(document.text), 268 "time_ns": time.perf_counter_ns(), 269 }
Create a document-info mapping from a Document instance. This function is used to extract metadata from the Document for statistics tracking.