hojichar.core.async_composition
1from __future__ import annotations 2 3import asyncio 4import logging 5from concurrent.futures import Executor, ThreadPoolExecutor 6from typing import Any, AsyncGenerator, AsyncIterable, Iterable, Sequence 7 8import numpy as np 9 10from hojichar.core import inspection 11from hojichar.core.async_filter_interface import AsyncFilter 12from hojichar.core.composition import Compose 13from hojichar.core.filter_interface import Filter 14from hojichar.core.models import Document, Statistics, get_doc_info 15from hojichar.utils.async_handlers import handle_stream_as_async 16 17 18class AsyncFilterAdapter(AsyncFilter): 19 """ 20 Adapter class for executing hojichar.Filter asynchronously. 21 """ 22 23 def __init__( 24 self, 25 sync_filter: Filter, 26 *args: Any, 27 executor: Executor | None = None, 28 use_batch: bool = True, 29 **kwargs: Any, 30 ): 31 """ 32 Adapter class for executing hojichar.Filter asynchronously. 33 Used to incorporate Filter into AsyncCompose. 34 35 To reduce the overhead of asynchronous context switching, 36 use_batch is set to True by default to process in batches 37 in apply_stream, regardless of the sync_filter's use_batch setting. 38 39 If performing CPU-bound and heavy processing, you can specify an executor 40 to offload the processing to the executor. However, due to Python's GIL 41 constraints, using ThreadPoolExecutor will not parallelize CPU-bound 42 processing, and the entire process will be locked. 43 44 By using ProcessPoolExecutor as the executor, it may be possible to 45 parallelize CPU-bound processing. However, for parallelizing CPU-bound 46 processing, it is recommended to use the hojichar.Parallel class to 47 parallelize synchronous Compose pipeline. 48 """ 49 super().__init__(*args, use_batch=use_batch, **kwargs) 50 self.sync_filter = sync_filter 51 self._has_external_executor = executor is not None 52 self._executor = executor or ThreadPoolExecutor() 53 self.batch_size = sync_filter.batch_size 54 55 async def apply(self, document: Document) -> Document: 56 loop = asyncio.get_running_loop() 57 return await loop.run_in_executor(self._executor, self.sync_filter.apply, document) 58 59 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 60 loop = asyncio.get_running_loop() 61 return await loop.run_in_executor( 62 self._executor, 63 lambda: self.sync_filter.apply_batch(batch), 64 ) 65 66 async def shutdown(self) -> None: 67 self.sync_filter.shutdown() 68 if not self._has_external_executor: 69 self._executor.shutdown() 70 71 72class AsyncCompose(AsyncFilter): 73 def __init__( 74 self, 75 filters: list[AsyncFilter | Filter], 76 random_state: int | np.random.Generator | None = None, 77 executor: ThreadPoolExecutor | None = None, 78 *args: Any, 79 **kwargs: Any, 80 ): 81 super().__init__(random_state=random_state, *args, **kwargs) 82 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 83 self._statistics.name = "Total" 84 self._has_external_executor = executor is not None 85 self._executor = executor or ThreadPoolExecutor() 86 self.set_filters(filters) 87 88 def set_filters(self, filters: list[AsyncFilter | Filter]) -> None: 89 self.filters: list[AsyncFilter] = [] 90 filter_idx = 0 91 for f in filters: 92 if isinstance(f, (AsyncCompose, Compose)): 93 for sub in f.filters: 94 name = f"{filter_idx}-{sub.__class__.__name__}" 95 if isinstance(sub, Filter): 96 name = f"{filter_idx}-{sub.__class__.__name__}" 97 sub = AsyncFilterAdapter(sub, executor=self._executor) 98 99 sub._set_rng_if_not_initialized(self._rng) 100 sub.name = name 101 sub._statistics.name = name 102 self.filters.append(sub) 103 filter_idx += 1 104 else: 105 name = f"{filter_idx}-{f.__class__.__name__}" 106 if isinstance(f, Filter): 107 name = f"{filter_idx}-{f.__class__.__name__}" 108 f = AsyncFilterAdapter(f, executor=self._executor) 109 f._set_rng_if_not_initialized(self._rng) 110 f.name = name 111 f._statistics.name = name 112 self.filters.append(f) 113 filter_idx += 1 114 115 async def apply(self, document: Document) -> Document: 116 stat = get_doc_info(document) 117 for filter_idx, filt in enumerate(self.filters): 118 document = await filt._apply(document) 119 new_stat = get_doc_info(document) 120 async with self._stats_lock: 121 self._statistics.update_by_diff(stat, new_stat) 122 return document 123 124 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 125 stats = [get_doc_info(doc) for doc in batch] 126 for i, filt in enumerate(self.filters): 127 batch = await filt._apply_batch(batch) 128 batch = await self._finalize_batch(batch, stats) 129 return list(batch) 130 131 async def apply_stream( 132 self, 133 stream: AsyncIterable[Document] | Iterable[Document], 134 ) -> AsyncGenerator[Document, None]: 135 async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor) 136 async_stream = self._count_input_stats(async_stream) 137 138 for i, filt in enumerate(self.filters): 139 async_stream = filt.apply_stream(async_stream) 140 141 async for doc in async_stream: 142 in_stat = doc.extras["__init_stats"] 143 out_stat = get_doc_info(doc) 144 async with self._stats_lock: 145 self._statistics.update_by_diff(in_stat, out_stat) 146 del doc.extras["__init_stats"] 147 yield doc 148 149 async def _count_input_stats( 150 self, async_stream: AsyncIterable[Document] 151 ) -> AsyncGenerator[Document, None]: 152 async for doc in async_stream: 153 doc.extras["__init_stats"] = get_doc_info(doc) 154 yield doc 155 156 def get_total_statistics(self) -> list[Statistics]: 157 """ 158 Get the statistics of the Compose object and sub filters. 159 160 The statistics of the Compose class are stored in an object with the name "Total", 161 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 162 """ 163 stats = [] 164 stats.append(self.get_statistics()) 165 for i, filt in enumerate(self.filters): 166 stats.append(filt.get_statistics()) 167 return stats 168 169 def get_total_statistics_map(self) -> list[dict[str, Any]]: 170 """ 171 Get the statistics of the Compose object and sub filters as a list of dictionaries. 172 """ 173 stats = self.get_total_statistics() 174 return [stat.to_dict() for stat in stats] 175 176 @property 177 def statistics(self) -> dict: 178 """ 179 Deprecated 180 181 Get the statistics of the Compose object and sub filters. 182 183 This property is retained for compatibility with previous versions. 184 Please use `get_total_statistics` or `get_total_statistics_map` instead. 185 """ 186 return inspection.statistics_obj_adapter( # type: ignore 187 self.get_total_statistics() 188 ).get_human_readable_values() 189 190 @property 191 def statistics_obj(self) -> inspection.StatsContainer: 192 """ 193 Deprecated 194 195 Get the statistics of the AsyncCompose object and sub filters. 196 This method returns a StatsContainer object which contains the statistics 197 of the AsyncCompose object and sub filters. 198 199 This property is retained for compatibility with previous versions. 200 Please use `get_total_statistics` or `get_total_statistics_map` instead. 201 """ 202 return inspection.statistics_obj_adapter(self.get_total_statistics()) # type: ignore 203 204 async def shutdown(self) -> None: 205 for filt in self.filters: 206 await filt.shutdown() 207 if not self._has_external_executor: 208 self._executor.shutdown() 209 210 async def __aenter__(self) -> "AsyncCompose": 211 return self 212 213 async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 214 await self.shutdown() 215 if exc_type is not None: 216 raise exc_value
19class AsyncFilterAdapter(AsyncFilter): 20 """ 21 Adapter class for executing hojichar.Filter asynchronously. 22 """ 23 24 def __init__( 25 self, 26 sync_filter: Filter, 27 *args: Any, 28 executor: Executor | None = None, 29 use_batch: bool = True, 30 **kwargs: Any, 31 ): 32 """ 33 Adapter class for executing hojichar.Filter asynchronously. 34 Used to incorporate Filter into AsyncCompose. 35 36 To reduce the overhead of asynchronous context switching, 37 use_batch is set to True by default to process in batches 38 in apply_stream, regardless of the sync_filter's use_batch setting. 39 40 If performing CPU-bound and heavy processing, you can specify an executor 41 to offload the processing to the executor. However, due to Python's GIL 42 constraints, using ThreadPoolExecutor will not parallelize CPU-bound 43 processing, and the entire process will be locked. 44 45 By using ProcessPoolExecutor as the executor, it may be possible to 46 parallelize CPU-bound processing. However, for parallelizing CPU-bound 47 processing, it is recommended to use the hojichar.Parallel class to 48 parallelize synchronous Compose pipeline. 49 """ 50 super().__init__(*args, use_batch=use_batch, **kwargs) 51 self.sync_filter = sync_filter 52 self._has_external_executor = executor is not None 53 self._executor = executor or ThreadPoolExecutor() 54 self.batch_size = sync_filter.batch_size 55 56 async def apply(self, document: Document) -> Document: 57 loop = asyncio.get_running_loop() 58 return await loop.run_in_executor(self._executor, self.sync_filter.apply, document) 59 60 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 61 loop = asyncio.get_running_loop() 62 return await loop.run_in_executor( 63 self._executor, 64 lambda: self.sync_filter.apply_batch(batch), 65 ) 66 67 async def shutdown(self) -> None: 68 self.sync_filter.shutdown() 69 if not self._has_external_executor: 70 self._executor.shutdown()
Adapter class for executing hojichar.Filter asynchronously.
24 def __init__( 25 self, 26 sync_filter: Filter, 27 *args: Any, 28 executor: Executor | None = None, 29 use_batch: bool = True, 30 **kwargs: Any, 31 ): 32 """ 33 Adapter class for executing hojichar.Filter asynchronously. 34 Used to incorporate Filter into AsyncCompose. 35 36 To reduce the overhead of asynchronous context switching, 37 use_batch is set to True by default to process in batches 38 in apply_stream, regardless of the sync_filter's use_batch setting. 39 40 If performing CPU-bound and heavy processing, you can specify an executor 41 to offload the processing to the executor. However, due to Python's GIL 42 constraints, using ThreadPoolExecutor will not parallelize CPU-bound 43 processing, and the entire process will be locked. 44 45 By using ProcessPoolExecutor as the executor, it may be possible to 46 parallelize CPU-bound processing. However, for parallelizing CPU-bound 47 processing, it is recommended to use the hojichar.Parallel class to 48 parallelize synchronous Compose pipeline. 49 """ 50 super().__init__(*args, use_batch=use_batch, **kwargs) 51 self.sync_filter = sync_filter 52 self._has_external_executor = executor is not None 53 self._executor = executor or ThreadPoolExecutor() 54 self.batch_size = sync_filter.batch_size
Adapter class for executing hojichar.Filter asynchronously. Used to incorporate Filter into AsyncCompose.
To reduce the overhead of asynchronous context switching, use_batch is set to True by default to process in batches in apply_stream, regardless of the sync_filter's use_batch setting.
If performing CPU-bound and heavy processing, you can specify an executor to offload the processing to the executor. However, due to Python's GIL constraints, using ThreadPoolExecutor will not parallelize CPU-bound processing, and the entire process will be locked.
By using ProcessPoolExecutor as the executor, it may be possible to parallelize CPU-bound processing. However, for parallelizing CPU-bound processing, it is recommended to use the hojichar.Parallel class to parallelize synchronous Compose pipeline.
56 async def apply(self, document: Document) -> Document: 57 loop = asyncio.get_running_loop() 58 return await loop.run_in_executor(self._executor, self.sync_filter.apply, document)
Definition of async filter behavior.
In this method, the filter will modify document.text
or
document.extras
and set document.is_rejected = True
to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
60 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 61 loop = asyncio.get_running_loop() 62 return await loop.run_in_executor( 63 self._executor, 64 lambda: self.sync_filter.apply_batch(batch), 65 )
Apply the filter to a Sequence of documents.
By default, the processing implemented in apply
is executed asynchronously and concurrently.
If the filter processing can be optimized for batch processing, override this method.
73class AsyncCompose(AsyncFilter): 74 def __init__( 75 self, 76 filters: list[AsyncFilter | Filter], 77 random_state: int | np.random.Generator | None = None, 78 executor: ThreadPoolExecutor | None = None, 79 *args: Any, 80 **kwargs: Any, 81 ): 82 super().__init__(random_state=random_state, *args, **kwargs) 83 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 84 self._statistics.name = "Total" 85 self._has_external_executor = executor is not None 86 self._executor = executor or ThreadPoolExecutor() 87 self.set_filters(filters) 88 89 def set_filters(self, filters: list[AsyncFilter | Filter]) -> None: 90 self.filters: list[AsyncFilter] = [] 91 filter_idx = 0 92 for f in filters: 93 if isinstance(f, (AsyncCompose, Compose)): 94 for sub in f.filters: 95 name = f"{filter_idx}-{sub.__class__.__name__}" 96 if isinstance(sub, Filter): 97 name = f"{filter_idx}-{sub.__class__.__name__}" 98 sub = AsyncFilterAdapter(sub, executor=self._executor) 99 100 sub._set_rng_if_not_initialized(self._rng) 101 sub.name = name 102 sub._statistics.name = name 103 self.filters.append(sub) 104 filter_idx += 1 105 else: 106 name = f"{filter_idx}-{f.__class__.__name__}" 107 if isinstance(f, Filter): 108 name = f"{filter_idx}-{f.__class__.__name__}" 109 f = AsyncFilterAdapter(f, executor=self._executor) 110 f._set_rng_if_not_initialized(self._rng) 111 f.name = name 112 f._statistics.name = name 113 self.filters.append(f) 114 filter_idx += 1 115 116 async def apply(self, document: Document) -> Document: 117 stat = get_doc_info(document) 118 for filter_idx, filt in enumerate(self.filters): 119 document = await filt._apply(document) 120 new_stat = get_doc_info(document) 121 async with self._stats_lock: 122 self._statistics.update_by_diff(stat, new_stat) 123 return document 124 125 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 126 stats = [get_doc_info(doc) for doc in batch] 127 for i, filt in enumerate(self.filters): 128 batch = await filt._apply_batch(batch) 129 batch = await self._finalize_batch(batch, stats) 130 return list(batch) 131 132 async def apply_stream( 133 self, 134 stream: AsyncIterable[Document] | Iterable[Document], 135 ) -> AsyncGenerator[Document, None]: 136 async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor) 137 async_stream = self._count_input_stats(async_stream) 138 139 for i, filt in enumerate(self.filters): 140 async_stream = filt.apply_stream(async_stream) 141 142 async for doc in async_stream: 143 in_stat = doc.extras["__init_stats"] 144 out_stat = get_doc_info(doc) 145 async with self._stats_lock: 146 self._statistics.update_by_diff(in_stat, out_stat) 147 del doc.extras["__init_stats"] 148 yield doc 149 150 async def _count_input_stats( 151 self, async_stream: AsyncIterable[Document] 152 ) -> AsyncGenerator[Document, None]: 153 async for doc in async_stream: 154 doc.extras["__init_stats"] = get_doc_info(doc) 155 yield doc 156 157 def get_total_statistics(self) -> list[Statistics]: 158 """ 159 Get the statistics of the Compose object and sub filters. 160 161 The statistics of the Compose class are stored in an object with the name "Total", 162 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 163 """ 164 stats = [] 165 stats.append(self.get_statistics()) 166 for i, filt in enumerate(self.filters): 167 stats.append(filt.get_statistics()) 168 return stats 169 170 def get_total_statistics_map(self) -> list[dict[str, Any]]: 171 """ 172 Get the statistics of the Compose object and sub filters as a list of dictionaries. 173 """ 174 stats = self.get_total_statistics() 175 return [stat.to_dict() for stat in stats] 176 177 @property 178 def statistics(self) -> dict: 179 """ 180 Deprecated 181 182 Get the statistics of the Compose object and sub filters. 183 184 This property is retained for compatibility with previous versions. 185 Please use `get_total_statistics` or `get_total_statistics_map` instead. 186 """ 187 return inspection.statistics_obj_adapter( # type: ignore 188 self.get_total_statistics() 189 ).get_human_readable_values() 190 191 @property 192 def statistics_obj(self) -> inspection.StatsContainer: 193 """ 194 Deprecated 195 196 Get the statistics of the AsyncCompose object and sub filters. 197 This method returns a StatsContainer object which contains the statistics 198 of the AsyncCompose object and sub filters. 199 200 This property is retained for compatibility with previous versions. 201 Please use `get_total_statistics` or `get_total_statistics_map` instead. 202 """ 203 return inspection.statistics_obj_adapter(self.get_total_statistics()) # type: ignore 204 205 async def shutdown(self) -> None: 206 for filt in self.filters: 207 await filt.shutdown() 208 if not self._has_external_executor: 209 self._executor.shutdown() 210 211 async def __aenter__(self) -> "AsyncCompose": 212 return self 213 214 async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 215 await self.shutdown() 216 if exc_type is not None: 217 raise exc_value
Helper class that provides a standard way to create an ABC using inheritance.
74 def __init__( 75 self, 76 filters: list[AsyncFilter | Filter], 77 random_state: int | np.random.Generator | None = None, 78 executor: ThreadPoolExecutor | None = None, 79 *args: Any, 80 **kwargs: Any, 81 ): 82 super().__init__(random_state=random_state, *args, **kwargs) 83 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 84 self._statistics.name = "Total" 85 self._has_external_executor = executor is not None 86 self._executor = executor or ThreadPoolExecutor() 87 self.set_filters(filters)
Base class for asynchronous filters.
Parameters
p : float
The probability of applying the filter.
If p
is 1, the filter will always be applied.
skip_rejected : bool
If True
, the filter will skip documents that are already rejected.
If you want to apply the filter to all documents (e.g., postprocess), set this to False
.
random_state : Optional[Union[int, np.random.Generator]]
Seed for the random number generator.
If None
is specified, the random number generator managed by the Compose class will be used.
use_batch : bool
If True
, the filter will process documents in batches in the apply_stream
method.
batch_size : int
The size of the batch to process documents in the apply_stream
method.
89 def set_filters(self, filters: list[AsyncFilter | Filter]) -> None: 90 self.filters: list[AsyncFilter] = [] 91 filter_idx = 0 92 for f in filters: 93 if isinstance(f, (AsyncCompose, Compose)): 94 for sub in f.filters: 95 name = f"{filter_idx}-{sub.__class__.__name__}" 96 if isinstance(sub, Filter): 97 name = f"{filter_idx}-{sub.__class__.__name__}" 98 sub = AsyncFilterAdapter(sub, executor=self._executor) 99 100 sub._set_rng_if_not_initialized(self._rng) 101 sub.name = name 102 sub._statistics.name = name 103 self.filters.append(sub) 104 filter_idx += 1 105 else: 106 name = f"{filter_idx}-{f.__class__.__name__}" 107 if isinstance(f, Filter): 108 name = f"{filter_idx}-{f.__class__.__name__}" 109 f = AsyncFilterAdapter(f, executor=self._executor) 110 f._set_rng_if_not_initialized(self._rng) 111 f.name = name 112 f._statistics.name = name 113 self.filters.append(f) 114 filter_idx += 1
116 async def apply(self, document: Document) -> Document: 117 stat = get_doc_info(document) 118 for filter_idx, filt in enumerate(self.filters): 119 document = await filt._apply(document) 120 new_stat = get_doc_info(document) 121 async with self._stats_lock: 122 self._statistics.update_by_diff(stat, new_stat) 123 return document
Definition of async filter behavior.
In this method, the filter will modify document.text
or
document.extras
and set document.is_rejected = True
to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
125 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 126 stats = [get_doc_info(doc) for doc in batch] 127 for i, filt in enumerate(self.filters): 128 batch = await filt._apply_batch(batch) 129 batch = await self._finalize_batch(batch, stats) 130 return list(batch)
Apply the filter to a Sequence of documents.
By default, the processing implemented in apply
is executed asynchronously and concurrently.
If the filter processing can be optimized for batch processing, override this method.
132 async def apply_stream( 133 self, 134 stream: AsyncIterable[Document] | Iterable[Document], 135 ) -> AsyncGenerator[Document, None]: 136 async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor) 137 async_stream = self._count_input_stats(async_stream) 138 139 for i, filt in enumerate(self.filters): 140 async_stream = filt.apply_stream(async_stream) 141 142 async for doc in async_stream: 143 in_stat = doc.extras["__init_stats"] 144 out_stat = get_doc_info(doc) 145 async with self._stats_lock: 146 self._statistics.update_by_diff(in_stat, out_stat) 147 del doc.extras["__init_stats"] 148 yield doc
Apply the filter to a stream of documents (Iterable or AsyncIterable).
If use_batch is set to True
at initialization, the filter will process documents in batches.
If the stream is not asynchronous, use handle_stream_as_async to convert it to an asynchronous stream.
Even if an exception occurs during processing, the process will continue, and the following actions will be taken:
- Set the
is_rejected
flag of the document toTrue
- Set the error details in
reject_reason
- Increment the
errors
count in the statistics retrievable viaget_statistics
157 def get_total_statistics(self) -> list[Statistics]: 158 """ 159 Get the statistics of the Compose object and sub filters. 160 161 The statistics of the Compose class are stored in an object with the name "Total", 162 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 163 """ 164 stats = [] 165 stats.append(self.get_statistics()) 166 for i, filt in enumerate(self.filters): 167 stats.append(filt.get_statistics()) 168 return stats
Get the statistics of the Compose object and sub filters.
The statistics of the Compose class are stored in an object with the name "Total", and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
170 def get_total_statistics_map(self) -> list[dict[str, Any]]: 171 """ 172 Get the statistics of the Compose object and sub filters as a list of dictionaries. 173 """ 174 stats = self.get_total_statistics() 175 return [stat.to_dict() for stat in stats]
Get the statistics of the Compose object and sub filters as a list of dictionaries.
Deprecated
Get the statistics of the Compose object and sub filters.
This property is retained for compatibility with previous versions.
Please use get_total_statistics
or get_total_statistics_map
instead.
Deprecated
Get the statistics of the AsyncCompose object and sub filters. This method returns a StatsContainer object which contains the statistics of the AsyncCompose object and sub filters.
This property is retained for compatibility with previous versions.
Please use get_total_statistics
or get_total_statistics_map
instead.