hojichar.core.async_composition
1from __future__ import annotations 2 3import asyncio 4import logging 5from concurrent.futures import Executor, ThreadPoolExecutor 6from typing import Any, AsyncGenerator, AsyncIterable, Iterable, Sequence 7 8import numpy as np 9 10from hojichar.core import inspection 11from hojichar.core.async_filter_interface import AsyncFilter 12from hojichar.core.composition import Compose 13from hojichar.core.filter_interface import Filter 14from hojichar.core.models import Document, Statistics, get_doc_info 15from hojichar.utils.async_handlers import handle_stream_as_async 16 17 18class AsyncFilterAdapter(AsyncFilter): 19 """ 20 Adapter class for executing hojichar.Filter asynchronously. 21 """ 22 23 def __init__( 24 self, 25 sync_filter: Filter, 26 *args: Any, 27 executor: Executor | None = None, 28 use_batch: bool = True, 29 **kwargs: Any, 30 ): 31 """ 32 Adapter class for executing hojichar.Filter asynchronously. 33 Used to incorporate Filter into AsyncCompose. 34 35 To reduce the overhead of asynchronous context switching, 36 use_batch is set to True by default to process in batches 37 in apply_stream, regardless of the sync_filter's use_batch setting. 38 39 If performing CPU-bound and heavy processing, you can specify an executor 40 to offload the processing to the executor. However, due to Python's GIL 41 constraints, using ThreadPoolExecutor will not parallelize CPU-bound 42 processing, and the entire process will be locked. 43 44 By using ProcessPoolExecutor as the executor, it may be possible to 45 parallelize CPU-bound processing. However, for parallelizing CPU-bound 46 processing, it is recommended to use the hojichar.Parallel class to 47 parallelize synchronous Compose pipeline. 48 """ 49 super().__init__(*args, use_batch=use_batch, **kwargs) 50 self.sync_filter = sync_filter 51 self._has_external_executor = executor is not None 52 self._executor = executor or ThreadPoolExecutor() 53 self.batch_size = sync_filter.batch_size 54 55 async def apply(self, document: Document) -> Document: 56 loop = asyncio.get_running_loop() 57 return await loop.run_in_executor(self._executor, self.sync_filter.apply, document) 58 59 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 60 loop = asyncio.get_running_loop() 61 return await loop.run_in_executor( 62 self._executor, 63 lambda: self.sync_filter.apply_batch(batch), 64 ) 65 66 async def shutdown(self) -> None: 67 self.sync_filter.shutdown() 68 if not self._has_external_executor: 69 self._executor.shutdown() 70 71 72class AsyncCompose(AsyncFilter): 73 def __init__( 74 self, 75 filters: list[AsyncFilter | Filter], 76 random_state: int | np.random.Generator | None = None, 77 executor: ThreadPoolExecutor | None = None, 78 *args: Any, 79 **kwargs: Any, 80 ): 81 super().__init__(random_state=random_state, *args, **kwargs) 82 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 83 self._statistics.name = "Total" 84 self._has_external_executor = executor is not None 85 self._executor = executor or ThreadPoolExecutor() 86 self.set_filters(filters) 87 88 def set_filters(self, filters: list[AsyncFilter | Filter]) -> None: 89 self.filters: list[AsyncFilter] = [] 90 filter_idx = 0 91 for f in filters: 92 if isinstance(f, (AsyncCompose, Compose)): 93 for sub in f.filters: 94 name = f"{filter_idx}-{sub.__class__.__name__}" 95 if isinstance(sub, Filter): 96 name = f"{filter_idx}-{sub.__class__.__name__}" 97 sub = AsyncFilterAdapter(sub, executor=self._executor) 98 99 sub._set_rng_if_not_initialized(self._rng) 100 sub.name = name 101 sub._statistics.name = name 102 self.filters.append(sub) 103 filter_idx += 1 104 else: 105 name = f"{filter_idx}-{f.__class__.__name__}" 106 if isinstance(f, Filter): 107 name = f"{filter_idx}-{f.__class__.__name__}" 108 f = AsyncFilterAdapter(f, executor=self._executor) 109 f._set_rng_if_not_initialized(self._rng) 110 f.name = name 111 f._statistics.name = name 112 self.filters.append(f) 113 filter_idx += 1 114 115 async def apply(self, document: Document) -> Document: 116 stat = get_doc_info(document) 117 for filter_idx, filt in enumerate(self.filters): 118 document = await filt._apply(document) 119 new_stat = get_doc_info(document) 120 async with self._stats_lock: 121 self._statistics.update_by_diff(stat, new_stat) 122 return document 123 124 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 125 stats = [get_doc_info(doc) for doc in batch] 126 for i, filt in enumerate(self.filters): 127 batch = await filt._apply_batch(batch) 128 batch = await self._finalize_batch(batch, stats) 129 return list(batch) 130 131 async def apply_stream( 132 self, 133 stream: AsyncIterable[Document] | Iterable[Document], 134 ) -> AsyncGenerator[Document, None]: 135 async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor) 136 async_stream = self._count_input_stats(async_stream) 137 138 for i, filt in enumerate(self.filters): 139 async_stream = filt.apply_stream(async_stream) 140 141 async for doc in async_stream: 142 in_stat = doc._get_initial_stats() 143 if in_stat is None: 144 in_stat = get_doc_info(doc) 145 self.logger.debug( 146 "Initial stats missing for document during async stream aggregation; " 147 "using current stats as fallback" 148 ) 149 out_stat = get_doc_info(doc) 150 async with self._stats_lock: 151 self._statistics.update_by_diff(in_stat, out_stat) 152 doc._clear_initial_stats() 153 yield doc 154 155 async def _count_input_stats( 156 self, async_stream: AsyncIterable[Document] 157 ) -> AsyncGenerator[Document, None]: 158 async for doc in async_stream: 159 doc._set_initial_stats(get_doc_info(doc)) 160 yield doc 161 162 def get_total_statistics(self) -> list[Statistics]: 163 """ 164 Get the statistics of the Compose object and sub filters. 165 166 The statistics of the Compose class are stored in an object with the name "Total", 167 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 168 """ 169 stats = [] 170 stats.append(self.get_statistics()) 171 for i, filt in enumerate(self.filters): 172 stats.append(filt.get_statistics()) 173 return stats 174 175 def get_total_statistics_map(self) -> list[dict[str, Any]]: 176 """ 177 Get the statistics of the Compose object and sub filters as a list of dictionaries. 178 """ 179 stats = self.get_total_statistics() 180 return [stat.to_dict() for stat in stats] 181 182 @property 183 def statistics(self) -> dict: 184 """ 185 Deprecated 186 187 Get the statistics of the Compose object and sub filters. 188 189 This property is retained for compatibility with previous versions. 190 Please use `get_total_statistics` or `get_total_statistics_map` instead. 191 """ 192 return inspection.statistics_obj_adapter( # type: ignore 193 self.get_total_statistics() 194 ).get_human_readable_values() 195 196 @property 197 def statistics_obj(self) -> inspection.StatsContainer: 198 """ 199 Deprecated 200 201 Get the statistics of the AsyncCompose object and sub filters. 202 This method returns a StatsContainer object which contains the statistics 203 of the AsyncCompose object and sub filters. 204 205 This property is retained for compatibility with previous versions. 206 Please use `get_total_statistics` or `get_total_statistics_map` instead. 207 """ 208 return inspection.statistics_obj_adapter(self.get_total_statistics()) # type: ignore 209 210 async def shutdown(self) -> None: 211 for filt in self.filters: 212 await filt.shutdown() 213 if not self._has_external_executor: 214 self._executor.shutdown() 215 216 async def __aenter__(self) -> "AsyncCompose": 217 return self 218 219 async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 220 await self.shutdown() 221 if exc_type is not None: 222 raise exc_value
19class AsyncFilterAdapter(AsyncFilter): 20 """ 21 Adapter class for executing hojichar.Filter asynchronously. 22 """ 23 24 def __init__( 25 self, 26 sync_filter: Filter, 27 *args: Any, 28 executor: Executor | None = None, 29 use_batch: bool = True, 30 **kwargs: Any, 31 ): 32 """ 33 Adapter class for executing hojichar.Filter asynchronously. 34 Used to incorporate Filter into AsyncCompose. 35 36 To reduce the overhead of asynchronous context switching, 37 use_batch is set to True by default to process in batches 38 in apply_stream, regardless of the sync_filter's use_batch setting. 39 40 If performing CPU-bound and heavy processing, you can specify an executor 41 to offload the processing to the executor. However, due to Python's GIL 42 constraints, using ThreadPoolExecutor will not parallelize CPU-bound 43 processing, and the entire process will be locked. 44 45 By using ProcessPoolExecutor as the executor, it may be possible to 46 parallelize CPU-bound processing. However, for parallelizing CPU-bound 47 processing, it is recommended to use the hojichar.Parallel class to 48 parallelize synchronous Compose pipeline. 49 """ 50 super().__init__(*args, use_batch=use_batch, **kwargs) 51 self.sync_filter = sync_filter 52 self._has_external_executor = executor is not None 53 self._executor = executor or ThreadPoolExecutor() 54 self.batch_size = sync_filter.batch_size 55 56 async def apply(self, document: Document) -> Document: 57 loop = asyncio.get_running_loop() 58 return await loop.run_in_executor(self._executor, self.sync_filter.apply, document) 59 60 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 61 loop = asyncio.get_running_loop() 62 return await loop.run_in_executor( 63 self._executor, 64 lambda: self.sync_filter.apply_batch(batch), 65 ) 66 67 async def shutdown(self) -> None: 68 self.sync_filter.shutdown() 69 if not self._has_external_executor: 70 self._executor.shutdown()
Adapter class for executing hojichar.Filter asynchronously.
24 def __init__( 25 self, 26 sync_filter: Filter, 27 *args: Any, 28 executor: Executor | None = None, 29 use_batch: bool = True, 30 **kwargs: Any, 31 ): 32 """ 33 Adapter class for executing hojichar.Filter asynchronously. 34 Used to incorporate Filter into AsyncCompose. 35 36 To reduce the overhead of asynchronous context switching, 37 use_batch is set to True by default to process in batches 38 in apply_stream, regardless of the sync_filter's use_batch setting. 39 40 If performing CPU-bound and heavy processing, you can specify an executor 41 to offload the processing to the executor. However, due to Python's GIL 42 constraints, using ThreadPoolExecutor will not parallelize CPU-bound 43 processing, and the entire process will be locked. 44 45 By using ProcessPoolExecutor as the executor, it may be possible to 46 parallelize CPU-bound processing. However, for parallelizing CPU-bound 47 processing, it is recommended to use the hojichar.Parallel class to 48 parallelize synchronous Compose pipeline. 49 """ 50 super().__init__(*args, use_batch=use_batch, **kwargs) 51 self.sync_filter = sync_filter 52 self._has_external_executor = executor is not None 53 self._executor = executor or ThreadPoolExecutor() 54 self.batch_size = sync_filter.batch_size
Adapter class for executing hojichar.Filter asynchronously. Used to incorporate Filter into AsyncCompose.
To reduce the overhead of asynchronous context switching, use_batch is set to True by default to process in batches in apply_stream, regardless of the sync_filter's use_batch setting.
If performing CPU-bound and heavy processing, you can specify an executor to offload the processing to the executor. However, due to Python's GIL constraints, using ThreadPoolExecutor will not parallelize CPU-bound processing, and the entire process will be locked.
By using ProcessPoolExecutor as the executor, it may be possible to parallelize CPU-bound processing. However, for parallelizing CPU-bound processing, it is recommended to use the hojichar.Parallel class to parallelize synchronous Compose pipeline.
56 async def apply(self, document: Document) -> Document: 57 loop = asyncio.get_running_loop() 58 return await loop.run_in_executor(self._executor, self.sync_filter.apply, document)
Definition of async filter behavior.
In this method, the filter will modify document.text or
document.extras and set document.is_rejected = True to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
60 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 61 loop = asyncio.get_running_loop() 62 return await loop.run_in_executor( 63 self._executor, 64 lambda: self.sync_filter.apply_batch(batch), 65 )
Apply the filter to a Sequence of documents.
By default, the processing implemented in apply is executed asynchronously and concurrently.
If the filter processing can be optimized for batch processing, override this method.
73class AsyncCompose(AsyncFilter): 74 def __init__( 75 self, 76 filters: list[AsyncFilter | Filter], 77 random_state: int | np.random.Generator | None = None, 78 executor: ThreadPoolExecutor | None = None, 79 *args: Any, 80 **kwargs: Any, 81 ): 82 super().__init__(random_state=random_state, *args, **kwargs) 83 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 84 self._statistics.name = "Total" 85 self._has_external_executor = executor is not None 86 self._executor = executor or ThreadPoolExecutor() 87 self.set_filters(filters) 88 89 def set_filters(self, filters: list[AsyncFilter | Filter]) -> None: 90 self.filters: list[AsyncFilter] = [] 91 filter_idx = 0 92 for f in filters: 93 if isinstance(f, (AsyncCompose, Compose)): 94 for sub in f.filters: 95 name = f"{filter_idx}-{sub.__class__.__name__}" 96 if isinstance(sub, Filter): 97 name = f"{filter_idx}-{sub.__class__.__name__}" 98 sub = AsyncFilterAdapter(sub, executor=self._executor) 99 100 sub._set_rng_if_not_initialized(self._rng) 101 sub.name = name 102 sub._statistics.name = name 103 self.filters.append(sub) 104 filter_idx += 1 105 else: 106 name = f"{filter_idx}-{f.__class__.__name__}" 107 if isinstance(f, Filter): 108 name = f"{filter_idx}-{f.__class__.__name__}" 109 f = AsyncFilterAdapter(f, executor=self._executor) 110 f._set_rng_if_not_initialized(self._rng) 111 f.name = name 112 f._statistics.name = name 113 self.filters.append(f) 114 filter_idx += 1 115 116 async def apply(self, document: Document) -> Document: 117 stat = get_doc_info(document) 118 for filter_idx, filt in enumerate(self.filters): 119 document = await filt._apply(document) 120 new_stat = get_doc_info(document) 121 async with self._stats_lock: 122 self._statistics.update_by_diff(stat, new_stat) 123 return document 124 125 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 126 stats = [get_doc_info(doc) for doc in batch] 127 for i, filt in enumerate(self.filters): 128 batch = await filt._apply_batch(batch) 129 batch = await self._finalize_batch(batch, stats) 130 return list(batch) 131 132 async def apply_stream( 133 self, 134 stream: AsyncIterable[Document] | Iterable[Document], 135 ) -> AsyncGenerator[Document, None]: 136 async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor) 137 async_stream = self._count_input_stats(async_stream) 138 139 for i, filt in enumerate(self.filters): 140 async_stream = filt.apply_stream(async_stream) 141 142 async for doc in async_stream: 143 in_stat = doc._get_initial_stats() 144 if in_stat is None: 145 in_stat = get_doc_info(doc) 146 self.logger.debug( 147 "Initial stats missing for document during async stream aggregation; " 148 "using current stats as fallback" 149 ) 150 out_stat = get_doc_info(doc) 151 async with self._stats_lock: 152 self._statistics.update_by_diff(in_stat, out_stat) 153 doc._clear_initial_stats() 154 yield doc 155 156 async def _count_input_stats( 157 self, async_stream: AsyncIterable[Document] 158 ) -> AsyncGenerator[Document, None]: 159 async for doc in async_stream: 160 doc._set_initial_stats(get_doc_info(doc)) 161 yield doc 162 163 def get_total_statistics(self) -> list[Statistics]: 164 """ 165 Get the statistics of the Compose object and sub filters. 166 167 The statistics of the Compose class are stored in an object with the name "Total", 168 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 169 """ 170 stats = [] 171 stats.append(self.get_statistics()) 172 for i, filt in enumerate(self.filters): 173 stats.append(filt.get_statistics()) 174 return stats 175 176 def get_total_statistics_map(self) -> list[dict[str, Any]]: 177 """ 178 Get the statistics of the Compose object and sub filters as a list of dictionaries. 179 """ 180 stats = self.get_total_statistics() 181 return [stat.to_dict() for stat in stats] 182 183 @property 184 def statistics(self) -> dict: 185 """ 186 Deprecated 187 188 Get the statistics of the Compose object and sub filters. 189 190 This property is retained for compatibility with previous versions. 191 Please use `get_total_statistics` or `get_total_statistics_map` instead. 192 """ 193 return inspection.statistics_obj_adapter( # type: ignore 194 self.get_total_statistics() 195 ).get_human_readable_values() 196 197 @property 198 def statistics_obj(self) -> inspection.StatsContainer: 199 """ 200 Deprecated 201 202 Get the statistics of the AsyncCompose object and sub filters. 203 This method returns a StatsContainer object which contains the statistics 204 of the AsyncCompose object and sub filters. 205 206 This property is retained for compatibility with previous versions. 207 Please use `get_total_statistics` or `get_total_statistics_map` instead. 208 """ 209 return inspection.statistics_obj_adapter(self.get_total_statistics()) # type: ignore 210 211 async def shutdown(self) -> None: 212 for filt in self.filters: 213 await filt.shutdown() 214 if not self._has_external_executor: 215 self._executor.shutdown() 216 217 async def __aenter__(self) -> "AsyncCompose": 218 return self 219 220 async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 221 await self.shutdown() 222 if exc_type is not None: 223 raise exc_value
Helper class that provides a standard way to create an ABC using inheritance.
74 def __init__( 75 self, 76 filters: list[AsyncFilter | Filter], 77 random_state: int | np.random.Generator | None = None, 78 executor: ThreadPoolExecutor | None = None, 79 *args: Any, 80 **kwargs: Any, 81 ): 82 super().__init__(random_state=random_state, *args, **kwargs) 83 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 84 self._statistics.name = "Total" 85 self._has_external_executor = executor is not None 86 self._executor = executor or ThreadPoolExecutor() 87 self.set_filters(filters)
Base class for asynchronous filters.
Parameters
p : float
The probability of applying the filter.
If p is 1, the filter will always be applied.
skip_rejected : bool
If True, the filter will skip documents that are already rejected.
If you want to apply the filter to all documents (e.g., postprocess), set this to False.
random_state : Optional[Union[int, np.random.Generator]]
Seed for the random number generator.
If None is specified, the random number generator managed by the Compose class will be used.
use_batch : bool
If True, the filter will process documents in batches in the apply_stream method.
batch_size : int
The size of the batch to process documents in the apply_stream method.
89 def set_filters(self, filters: list[AsyncFilter | Filter]) -> None: 90 self.filters: list[AsyncFilter] = [] 91 filter_idx = 0 92 for f in filters: 93 if isinstance(f, (AsyncCompose, Compose)): 94 for sub in f.filters: 95 name = f"{filter_idx}-{sub.__class__.__name__}" 96 if isinstance(sub, Filter): 97 name = f"{filter_idx}-{sub.__class__.__name__}" 98 sub = AsyncFilterAdapter(sub, executor=self._executor) 99 100 sub._set_rng_if_not_initialized(self._rng) 101 sub.name = name 102 sub._statistics.name = name 103 self.filters.append(sub) 104 filter_idx += 1 105 else: 106 name = f"{filter_idx}-{f.__class__.__name__}" 107 if isinstance(f, Filter): 108 name = f"{filter_idx}-{f.__class__.__name__}" 109 f = AsyncFilterAdapter(f, executor=self._executor) 110 f._set_rng_if_not_initialized(self._rng) 111 f.name = name 112 f._statistics.name = name 113 self.filters.append(f) 114 filter_idx += 1
116 async def apply(self, document: Document) -> Document: 117 stat = get_doc_info(document) 118 for filter_idx, filt in enumerate(self.filters): 119 document = await filt._apply(document) 120 new_stat = get_doc_info(document) 121 async with self._stats_lock: 122 self._statistics.update_by_diff(stat, new_stat) 123 return document
Definition of async filter behavior.
In this method, the filter will modify document.text or
document.extras and set document.is_rejected = True to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
125 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 126 stats = [get_doc_info(doc) for doc in batch] 127 for i, filt in enumerate(self.filters): 128 batch = await filt._apply_batch(batch) 129 batch = await self._finalize_batch(batch, stats) 130 return list(batch)
Apply the filter to a Sequence of documents.
By default, the processing implemented in apply is executed asynchronously and concurrently.
If the filter processing can be optimized for batch processing, override this method.
132 async def apply_stream( 133 self, 134 stream: AsyncIterable[Document] | Iterable[Document], 135 ) -> AsyncGenerator[Document, None]: 136 async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor) 137 async_stream = self._count_input_stats(async_stream) 138 139 for i, filt in enumerate(self.filters): 140 async_stream = filt.apply_stream(async_stream) 141 142 async for doc in async_stream: 143 in_stat = doc._get_initial_stats() 144 if in_stat is None: 145 in_stat = get_doc_info(doc) 146 self.logger.debug( 147 "Initial stats missing for document during async stream aggregation; " 148 "using current stats as fallback" 149 ) 150 out_stat = get_doc_info(doc) 151 async with self._stats_lock: 152 self._statistics.update_by_diff(in_stat, out_stat) 153 doc._clear_initial_stats() 154 yield doc
Apply the filter to a stream of documents (Iterable or AsyncIterable).
If use_batch is set to True at initialization, the filter will process documents in batches.
If the stream is not asynchronous, use handle_stream_as_async to convert it to an asynchronous stream.
Even if an exception occurs during processing, the process will continue, and the following actions will be taken:
- Set the
is_rejectedflag of the document toTrue - Set the error details in
reject_reason - Increment the
errorscount in the statistics retrievable viaget_statistics
163 def get_total_statistics(self) -> list[Statistics]: 164 """ 165 Get the statistics of the Compose object and sub filters. 166 167 The statistics of the Compose class are stored in an object with the name "Total", 168 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 169 """ 170 stats = [] 171 stats.append(self.get_statistics()) 172 for i, filt in enumerate(self.filters): 173 stats.append(filt.get_statistics()) 174 return stats
Get the statistics of the Compose object and sub filters.
The statistics of the Compose class are stored in an object with the name "Total", and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
176 def get_total_statistics_map(self) -> list[dict[str, Any]]: 177 """ 178 Get the statistics of the Compose object and sub filters as a list of dictionaries. 179 """ 180 stats = self.get_total_statistics() 181 return [stat.to_dict() for stat in stats]
Get the statistics of the Compose object and sub filters as a list of dictionaries.
Deprecated
Get the statistics of the Compose object and sub filters.
This property is retained for compatibility with previous versions.
Please use get_total_statistics or get_total_statistics_map instead.
Deprecated
Get the statistics of the AsyncCompose object and sub filters. This method returns a StatsContainer object which contains the statistics of the AsyncCompose object and sub filters.
This property is retained for compatibility with previous versions.
Please use get_total_statistics or get_total_statistics_map instead.