hojichar.core.async_composition
1from __future__ import annotations 2 3import asyncio 4import logging 5from concurrent.futures import Executor, ThreadPoolExecutor 6from typing import Any, AsyncGenerator, AsyncIterable, Iterable, Sequence 7 8import numpy as np 9 10from hojichar.core.async_filter_interface import AsyncFilter 11from hojichar.core.composition import Compose 12from hojichar.core.filter_interface import Filter 13from hojichar.core.models import Document, Statistics, get_doc_info 14from hojichar.utils.async_handlers import handle_stream_as_async 15 16 17class AsyncFilterAdapter(AsyncFilter): 18 """ 19 Adapter class for executing hojichar.Filter asynchronously. 20 """ 21 22 def __init__( 23 self, 24 sync_filter: Filter, 25 *args: Any, 26 executor: Executor | None = None, 27 use_batch: bool = True, 28 **kwargs: Any, 29 ): 30 """ 31 Adapter class for executing hojichar.Filter asynchronously. 32 Used to incorporate Filter into AsyncCompose. 33 34 To reduce the overhead of asynchronous context switching, 35 use_batch is set to True by default to process in batches 36 in apply_stream, regardless of the sync_filter's use_batch setting. 37 38 If performing CPU-bound and heavy processing, you can specify an executor 39 to offload the processing to the executor. However, due to Python's GIL 40 constraints, using ThreadPoolExecutor will not parallelize CPU-bound 41 processing, and the entire process will be locked. 42 43 By using ProcessPoolExecutor as the executor, it may be possible to 44 parallelize CPU-bound processing. However, for parallelizing CPU-bound 45 processing, it is recommended to use the hojichar.Parallel class to 46 parallelize synchronous Compose pipeline. 47 """ 48 super().__init__(*args, use_batch=use_batch, **kwargs) 49 self.sync_filter = sync_filter 50 self._has_external_executor = executor is not None 51 self._executor = executor or ThreadPoolExecutor() 52 self.batch_size = sync_filter.batch_size 53 54 async def apply(self, document: Document) -> Document: 55 loop = asyncio.get_running_loop() 56 return await loop.run_in_executor(self._executor, self.sync_filter.apply, document) 57 58 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 59 loop = asyncio.get_running_loop() 60 return await loop.run_in_executor( 61 self._executor, 62 lambda: self.sync_filter.apply_batch(batch), 63 ) 64 65 async def shutdown(self) -> None: 66 self.sync_filter.shutdown() 67 if not self._has_external_executor: 68 self._executor.shutdown() 69 70 71class AsyncCompose(AsyncFilter): 72 def __init__( 73 self, 74 filters: list[AsyncFilter | Filter], 75 random_state: int | np.random.Generator | None = None, 76 executor: ThreadPoolExecutor | None = None, 77 *args: Any, 78 **kwargs: Any, 79 ): 80 super().__init__(random_state=random_state, *args, **kwargs) 81 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 82 self._statistics.name = "Total" 83 self._has_external_executor = executor is not None 84 self._executor = executor or ThreadPoolExecutor() 85 self.set_filters(filters) 86 87 def set_filters(self, filters: list[AsyncFilter | Filter]) -> None: 88 self.filters: list[AsyncFilter] = [] 89 filter_idx = 0 90 for f in filters: 91 if isinstance(f, (AsyncCompose, Compose)): 92 for sub in f.filters: 93 name = f"{filter_idx}-{sub.__class__.__name__}" 94 if isinstance(sub, Filter): 95 name = f"{filter_idx}-{sub.__class__.__name__}" 96 sub = AsyncFilterAdapter(sub, executor=self._executor) 97 98 sub._set_rng_if_not_initialized(self._rng) 99 sub.name = name 100 sub._statistics.name = name 101 self.filters.append(sub) 102 filter_idx += 1 103 else: 104 name = f"{filter_idx}-{f.__class__.__name__}" 105 if isinstance(f, Filter): 106 name = f"{filter_idx}-{f.__class__.__name__}" 107 f = AsyncFilterAdapter(f, executor=self._executor) 108 f._set_rng_if_not_initialized(self._rng) 109 f.name = name 110 f._statistics.name = name 111 self.filters.append(f) 112 filter_idx += 1 113 114 async def apply(self, document: Document) -> Document: 115 stat = get_doc_info(document) 116 for filter_idx, filt in enumerate(self.filters): 117 document = await filt._apply(document) 118 new_stat = get_doc_info(document) 119 async with self._stats_lock: 120 self._statistics.update_by_diff(stat, new_stat) 121 return document 122 123 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 124 stats = [get_doc_info(doc) for doc in batch] 125 for i, filt in enumerate(self.filters): 126 batch = await filt._apply_batch(batch) 127 batch = await self._finalize_batch(batch, stats) 128 return list(batch) 129 130 async def apply_stream( 131 self, 132 stream: AsyncIterable[Document] | Iterable[Document], 133 ) -> AsyncGenerator[Document, None]: 134 async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor) 135 async_stream = self._count_input_stats(async_stream) 136 137 for i, filt in enumerate(self.filters): 138 async_stream = filt.apply_stream(async_stream) 139 140 async for doc in async_stream: 141 in_stat = doc.extras["__init_stats"] 142 out_stat = get_doc_info(doc) 143 async with self._stats_lock: 144 self._statistics.update_by_diff(in_stat, out_stat) 145 del doc.extras["__init_stats"] 146 yield doc 147 148 async def _count_input_stats( 149 self, async_stream: AsyncIterable[Document] 150 ) -> AsyncGenerator[Document, None]: 151 async for doc in async_stream: 152 doc.extras["__init_stats"] = get_doc_info(doc) 153 yield doc 154 155 def get_total_statistics(self) -> list[Statistics]: 156 """ 157 Get the statistics of the Compose object and sub filters. 158 159 The statistics of the Compose class are stored in an object with the name "Total", 160 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 161 """ 162 stats = [] 163 stats.append(self.get_statistics()) 164 for i, filt in enumerate(self.filters): 165 stats.append(filt.get_statistics()) 166 return stats 167 168 def get_total_statistics_map(self) -> list[dict[str, Any]]: 169 """ 170 Get the statistics of the Compose object and sub filters as a list of dictionaries. 171 """ 172 stats = self.get_total_statistics() 173 return [stat.to_dict() for stat in stats] 174 175 async def shutdown(self) -> None: 176 for filt in self.filters: 177 await filt.shutdown() 178 if not self._has_external_executor: 179 self._executor.shutdown() 180 181 async def __aenter__(self) -> "AsyncCompose": 182 return self 183 184 async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 185 await self.shutdown() 186 if exc_type is not None: 187 raise exc_value
18class AsyncFilterAdapter(AsyncFilter): 19 """ 20 Adapter class for executing hojichar.Filter asynchronously. 21 """ 22 23 def __init__( 24 self, 25 sync_filter: Filter, 26 *args: Any, 27 executor: Executor | None = None, 28 use_batch: bool = True, 29 **kwargs: Any, 30 ): 31 """ 32 Adapter class for executing hojichar.Filter asynchronously. 33 Used to incorporate Filter into AsyncCompose. 34 35 To reduce the overhead of asynchronous context switching, 36 use_batch is set to True by default to process in batches 37 in apply_stream, regardless of the sync_filter's use_batch setting. 38 39 If performing CPU-bound and heavy processing, you can specify an executor 40 to offload the processing to the executor. However, due to Python's GIL 41 constraints, using ThreadPoolExecutor will not parallelize CPU-bound 42 processing, and the entire process will be locked. 43 44 By using ProcessPoolExecutor as the executor, it may be possible to 45 parallelize CPU-bound processing. However, for parallelizing CPU-bound 46 processing, it is recommended to use the hojichar.Parallel class to 47 parallelize synchronous Compose pipeline. 48 """ 49 super().__init__(*args, use_batch=use_batch, **kwargs) 50 self.sync_filter = sync_filter 51 self._has_external_executor = executor is not None 52 self._executor = executor or ThreadPoolExecutor() 53 self.batch_size = sync_filter.batch_size 54 55 async def apply(self, document: Document) -> Document: 56 loop = asyncio.get_running_loop() 57 return await loop.run_in_executor(self._executor, self.sync_filter.apply, document) 58 59 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 60 loop = asyncio.get_running_loop() 61 return await loop.run_in_executor( 62 self._executor, 63 lambda: self.sync_filter.apply_batch(batch), 64 ) 65 66 async def shutdown(self) -> None: 67 self.sync_filter.shutdown() 68 if not self._has_external_executor: 69 self._executor.shutdown()
Adapter class for executing hojichar.Filter asynchronously.
23 def __init__( 24 self, 25 sync_filter: Filter, 26 *args: Any, 27 executor: Executor | None = None, 28 use_batch: bool = True, 29 **kwargs: Any, 30 ): 31 """ 32 Adapter class for executing hojichar.Filter asynchronously. 33 Used to incorporate Filter into AsyncCompose. 34 35 To reduce the overhead of asynchronous context switching, 36 use_batch is set to True by default to process in batches 37 in apply_stream, regardless of the sync_filter's use_batch setting. 38 39 If performing CPU-bound and heavy processing, you can specify an executor 40 to offload the processing to the executor. However, due to Python's GIL 41 constraints, using ThreadPoolExecutor will not parallelize CPU-bound 42 processing, and the entire process will be locked. 43 44 By using ProcessPoolExecutor as the executor, it may be possible to 45 parallelize CPU-bound processing. However, for parallelizing CPU-bound 46 processing, it is recommended to use the hojichar.Parallel class to 47 parallelize synchronous Compose pipeline. 48 """ 49 super().__init__(*args, use_batch=use_batch, **kwargs) 50 self.sync_filter = sync_filter 51 self._has_external_executor = executor is not None 52 self._executor = executor or ThreadPoolExecutor() 53 self.batch_size = sync_filter.batch_size
Adapter class for executing hojichar.Filter asynchronously. Used to incorporate Filter into AsyncCompose.
To reduce the overhead of asynchronous context switching, use_batch is set to True by default to process in batches in apply_stream, regardless of the sync_filter's use_batch setting.
If performing CPU-bound and heavy processing, you can specify an executor to offload the processing to the executor. However, due to Python's GIL constraints, using ThreadPoolExecutor will not parallelize CPU-bound processing, and the entire process will be locked.
By using ProcessPoolExecutor as the executor, it may be possible to parallelize CPU-bound processing. However, for parallelizing CPU-bound processing, it is recommended to use the hojichar.Parallel class to parallelize synchronous Compose pipeline.
55 async def apply(self, document: Document) -> Document: 56 loop = asyncio.get_running_loop() 57 return await loop.run_in_executor(self._executor, self.sync_filter.apply, document)
Definition of async filter behavior.
In this method, the filter will modify document.text
or
document.extras
and set document.is_rejected = True
to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
59 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 60 loop = asyncio.get_running_loop() 61 return await loop.run_in_executor( 62 self._executor, 63 lambda: self.sync_filter.apply_batch(batch), 64 )
Apply the filter to a Sequence of documents.
By default, the processing implemented in apply
is executed asynchronously and concurrently.
If the filter processing can be optimized for batch processing, override this method.
72class AsyncCompose(AsyncFilter): 73 def __init__( 74 self, 75 filters: list[AsyncFilter | Filter], 76 random_state: int | np.random.Generator | None = None, 77 executor: ThreadPoolExecutor | None = None, 78 *args: Any, 79 **kwargs: Any, 80 ): 81 super().__init__(random_state=random_state, *args, **kwargs) 82 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 83 self._statistics.name = "Total" 84 self._has_external_executor = executor is not None 85 self._executor = executor or ThreadPoolExecutor() 86 self.set_filters(filters) 87 88 def set_filters(self, filters: list[AsyncFilter | Filter]) -> None: 89 self.filters: list[AsyncFilter] = [] 90 filter_idx = 0 91 for f in filters: 92 if isinstance(f, (AsyncCompose, Compose)): 93 for sub in f.filters: 94 name = f"{filter_idx}-{sub.__class__.__name__}" 95 if isinstance(sub, Filter): 96 name = f"{filter_idx}-{sub.__class__.__name__}" 97 sub = AsyncFilterAdapter(sub, executor=self._executor) 98 99 sub._set_rng_if_not_initialized(self._rng) 100 sub.name = name 101 sub._statistics.name = name 102 self.filters.append(sub) 103 filter_idx += 1 104 else: 105 name = f"{filter_idx}-{f.__class__.__name__}" 106 if isinstance(f, Filter): 107 name = f"{filter_idx}-{f.__class__.__name__}" 108 f = AsyncFilterAdapter(f, executor=self._executor) 109 f._set_rng_if_not_initialized(self._rng) 110 f.name = name 111 f._statistics.name = name 112 self.filters.append(f) 113 filter_idx += 1 114 115 async def apply(self, document: Document) -> Document: 116 stat = get_doc_info(document) 117 for filter_idx, filt in enumerate(self.filters): 118 document = await filt._apply(document) 119 new_stat = get_doc_info(document) 120 async with self._stats_lock: 121 self._statistics.update_by_diff(stat, new_stat) 122 return document 123 124 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 125 stats = [get_doc_info(doc) for doc in batch] 126 for i, filt in enumerate(self.filters): 127 batch = await filt._apply_batch(batch) 128 batch = await self._finalize_batch(batch, stats) 129 return list(batch) 130 131 async def apply_stream( 132 self, 133 stream: AsyncIterable[Document] | Iterable[Document], 134 ) -> AsyncGenerator[Document, None]: 135 async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor) 136 async_stream = self._count_input_stats(async_stream) 137 138 for i, filt in enumerate(self.filters): 139 async_stream = filt.apply_stream(async_stream) 140 141 async for doc in async_stream: 142 in_stat = doc.extras["__init_stats"] 143 out_stat = get_doc_info(doc) 144 async with self._stats_lock: 145 self._statistics.update_by_diff(in_stat, out_stat) 146 del doc.extras["__init_stats"] 147 yield doc 148 149 async def _count_input_stats( 150 self, async_stream: AsyncIterable[Document] 151 ) -> AsyncGenerator[Document, None]: 152 async for doc in async_stream: 153 doc.extras["__init_stats"] = get_doc_info(doc) 154 yield doc 155 156 def get_total_statistics(self) -> list[Statistics]: 157 """ 158 Get the statistics of the Compose object and sub filters. 159 160 The statistics of the Compose class are stored in an object with the name "Total", 161 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 162 """ 163 stats = [] 164 stats.append(self.get_statistics()) 165 for i, filt in enumerate(self.filters): 166 stats.append(filt.get_statistics()) 167 return stats 168 169 def get_total_statistics_map(self) -> list[dict[str, Any]]: 170 """ 171 Get the statistics of the Compose object and sub filters as a list of dictionaries. 172 """ 173 stats = self.get_total_statistics() 174 return [stat.to_dict() for stat in stats] 175 176 async def shutdown(self) -> None: 177 for filt in self.filters: 178 await filt.shutdown() 179 if not self._has_external_executor: 180 self._executor.shutdown() 181 182 async def __aenter__(self) -> "AsyncCompose": 183 return self 184 185 async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 186 await self.shutdown() 187 if exc_type is not None: 188 raise exc_value
Helper class that provides a standard way to create an ABC using inheritance.
73 def __init__( 74 self, 75 filters: list[AsyncFilter | Filter], 76 random_state: int | np.random.Generator | None = None, 77 executor: ThreadPoolExecutor | None = None, 78 *args: Any, 79 **kwargs: Any, 80 ): 81 super().__init__(random_state=random_state, *args, **kwargs) 82 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 83 self._statistics.name = "Total" 84 self._has_external_executor = executor is not None 85 self._executor = executor or ThreadPoolExecutor() 86 self.set_filters(filters)
Base class for asynchronous filters.
Parameters
p : float
The probability of applying the filter.
If p
is 1, the filter will always be applied.
skip_rejected : bool
If True
, the filter will skip documents that are already rejected.
If you want to apply the filter to all documents (e.g., postprocess), set this to False
.
random_state : Optional[Union[int, np.random.Generator]]
Seed for the random number generator.
If None
is specified, the random number generator managed by the Compose class will be used.
use_batch : bool
If True
, the filter will process documents in batches in the apply_stream
method.
batch_size : int
The size of the batch to process documents in the apply_stream
method.
88 def set_filters(self, filters: list[AsyncFilter | Filter]) -> None: 89 self.filters: list[AsyncFilter] = [] 90 filter_idx = 0 91 for f in filters: 92 if isinstance(f, (AsyncCompose, Compose)): 93 for sub in f.filters: 94 name = f"{filter_idx}-{sub.__class__.__name__}" 95 if isinstance(sub, Filter): 96 name = f"{filter_idx}-{sub.__class__.__name__}" 97 sub = AsyncFilterAdapter(sub, executor=self._executor) 98 99 sub._set_rng_if_not_initialized(self._rng) 100 sub.name = name 101 sub._statistics.name = name 102 self.filters.append(sub) 103 filter_idx += 1 104 else: 105 name = f"{filter_idx}-{f.__class__.__name__}" 106 if isinstance(f, Filter): 107 name = f"{filter_idx}-{f.__class__.__name__}" 108 f = AsyncFilterAdapter(f, executor=self._executor) 109 f._set_rng_if_not_initialized(self._rng) 110 f.name = name 111 f._statistics.name = name 112 self.filters.append(f) 113 filter_idx += 1
115 async def apply(self, document: Document) -> Document: 116 stat = get_doc_info(document) 117 for filter_idx, filt in enumerate(self.filters): 118 document = await filt._apply(document) 119 new_stat = get_doc_info(document) 120 async with self._stats_lock: 121 self._statistics.update_by_diff(stat, new_stat) 122 return document
Definition of async filter behavior.
In this method, the filter will modify document.text
or
document.extras
and set document.is_rejected = True
to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
124 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 125 stats = [get_doc_info(doc) for doc in batch] 126 for i, filt in enumerate(self.filters): 127 batch = await filt._apply_batch(batch) 128 batch = await self._finalize_batch(batch, stats) 129 return list(batch)
Apply the filter to a Sequence of documents.
By default, the processing implemented in apply
is executed asynchronously and concurrently.
If the filter processing can be optimized for batch processing, override this method.
131 async def apply_stream( 132 self, 133 stream: AsyncIterable[Document] | Iterable[Document], 134 ) -> AsyncGenerator[Document, None]: 135 async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor) 136 async_stream = self._count_input_stats(async_stream) 137 138 for i, filt in enumerate(self.filters): 139 async_stream = filt.apply_stream(async_stream) 140 141 async for doc in async_stream: 142 in_stat = doc.extras["__init_stats"] 143 out_stat = get_doc_info(doc) 144 async with self._stats_lock: 145 self._statistics.update_by_diff(in_stat, out_stat) 146 del doc.extras["__init_stats"] 147 yield doc
Apply the filter to a stream of documents (Iterable or AsyncIterable).
If use_batch is set to True
at initialization, the filter will process documents in batches.
If the stream is not asynchronous, use handle_stream_as_async to convert it to an asynchronous stream.
Even if an exception occurs during processing, the process will continue, and the following actions will be taken:
- Set the
is_rejected
flag of the document toTrue
- Set the error details in
reject_reason
- Increment the
errors
count in the statistics retrievable viaget_statistics
156 def get_total_statistics(self) -> list[Statistics]: 157 """ 158 Get the statistics of the Compose object and sub filters. 159 160 The statistics of the Compose class are stored in an object with the name "Total", 161 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 162 """ 163 stats = [] 164 stats.append(self.get_statistics()) 165 for i, filt in enumerate(self.filters): 166 stats.append(filt.get_statistics()) 167 return stats
Get the statistics of the Compose object and sub filters.
The statistics of the Compose class are stored in an object with the name "Total", and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
169 def get_total_statistics_map(self) -> list[dict[str, Any]]: 170 """ 171 Get the statistics of the Compose object and sub filters as a list of dictionaries. 172 """ 173 stats = self.get_total_statistics() 174 return [stat.to_dict() for stat in stats]
Get the statistics of the Compose object and sub filters as a list of dictionaries.