hojichar.core.async_composition

  1from __future__ import annotations
  2
  3import asyncio
  4import logging
  5from concurrent.futures import Executor, ThreadPoolExecutor
  6from typing import Any, AsyncGenerator, AsyncIterable, Iterable, Sequence
  7
  8import numpy as np
  9
 10from hojichar.core.async_filter_interface import AsyncFilter
 11from hojichar.core.composition import Compose
 12from hojichar.core.filter_interface import Filter
 13from hojichar.core.models import Document, Statistics, get_doc_info
 14from hojichar.utils.async_handlers import handle_stream_as_async
 15
 16
 17class AsyncFilterAdapter(AsyncFilter):
 18    """
 19    Adapter class for executing hojichar.Filter asynchronously.
 20    """
 21
 22    def __init__(
 23        self,
 24        sync_filter: Filter,
 25        *args: Any,
 26        executor: Executor | None = None,
 27        use_batch: bool = True,
 28        **kwargs: Any,
 29    ):
 30        """
 31        Adapter class for executing hojichar.Filter asynchronously.
 32        Used to incorporate Filter into AsyncCompose.
 33
 34        To reduce the overhead of asynchronous context switching,
 35        use_batch is set to True by default to process in batches
 36        in apply_stream, regardless of the sync_filter's use_batch setting.
 37
 38        If performing CPU-bound and heavy processing, you can specify an executor
 39        to offload the processing to the executor. However, due to Python's GIL
 40        constraints, using ThreadPoolExecutor will not parallelize CPU-bound
 41        processing, and the entire process will be locked.
 42
 43        By using ProcessPoolExecutor as the executor, it may be possible to
 44        parallelize CPU-bound processing. However, for parallelizing CPU-bound
 45        processing, it is recommended to use the hojichar.Parallel class to
 46        parallelize synchronous Compose pipeline.
 47        """
 48        super().__init__(*args, use_batch=use_batch, **kwargs)
 49        self.sync_filter = sync_filter
 50        self._has_external_executor = executor is not None
 51        self._executor = executor or ThreadPoolExecutor()
 52        self.batch_size = sync_filter.batch_size
 53
 54    async def apply(self, document: Document) -> Document:
 55        loop = asyncio.get_running_loop()
 56        return await loop.run_in_executor(self._executor, self.sync_filter.apply, document)
 57
 58    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
 59        loop = asyncio.get_running_loop()
 60        return await loop.run_in_executor(
 61            self._executor,
 62            lambda: self.sync_filter.apply_batch(batch),
 63        )
 64
 65    async def shutdown(self) -> None:
 66        self.sync_filter.shutdown()
 67        if not self._has_external_executor:
 68            self._executor.shutdown()
 69
 70
 71class AsyncCompose(AsyncFilter):
 72    def __init__(
 73        self,
 74        filters: list[AsyncFilter | Filter],
 75        random_state: int | np.random.Generator | None = None,
 76        executor: ThreadPoolExecutor | None = None,
 77        *args: Any,
 78        **kwargs: Any,
 79    ):
 80        super().__init__(random_state=random_state, *args, **kwargs)
 81        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
 82        self._statistics.name = "Total"
 83        self._has_external_executor = executor is not None
 84        self._executor = executor or ThreadPoolExecutor()
 85        self.set_filters(filters)
 86
 87    def set_filters(self, filters: list[AsyncFilter | Filter]) -> None:
 88        self.filters: list[AsyncFilter] = []
 89        filter_idx = 0
 90        for f in filters:
 91            if isinstance(f, (AsyncCompose, Compose)):
 92                for sub in f.filters:
 93                    name = f"{filter_idx}-{sub.__class__.__name__}"
 94                    if isinstance(sub, Filter):
 95                        name = f"{filter_idx}-{sub.__class__.__name__}"
 96                        sub = AsyncFilterAdapter(sub, executor=self._executor)
 97
 98                    sub._set_rng_if_not_initialized(self._rng)
 99                    sub.name = name
100                    sub._statistics.name = name
101                    self.filters.append(sub)
102                    filter_idx += 1
103            else:
104                name = f"{filter_idx}-{f.__class__.__name__}"
105                if isinstance(f, Filter):
106                    name = f"{filter_idx}-{f.__class__.__name__}"
107                    f = AsyncFilterAdapter(f, executor=self._executor)
108                f._set_rng_if_not_initialized(self._rng)
109                f.name = name
110                f._statistics.name = name
111                self.filters.append(f)
112                filter_idx += 1
113
114    async def apply(self, document: Document) -> Document:
115        stat = get_doc_info(document)
116        for filter_idx, filt in enumerate(self.filters):
117            document = await filt._apply(document)
118        new_stat = get_doc_info(document)
119        async with self._stats_lock:
120            self._statistics.update_by_diff(stat, new_stat)
121        return document
122
123    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
124        stats = [get_doc_info(doc) for doc in batch]
125        for i, filt in enumerate(self.filters):
126            batch = await filt._apply_batch(batch)
127        batch = await self._finalize_batch(batch, stats)
128        return list(batch)
129
130    async def apply_stream(
131        self,
132        stream: AsyncIterable[Document] | Iterable[Document],
133    ) -> AsyncGenerator[Document, None]:
134        async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor)
135        async_stream = self._count_input_stats(async_stream)
136
137        for i, filt in enumerate(self.filters):
138            async_stream = filt.apply_stream(async_stream)
139
140        async for doc in async_stream:
141            in_stat = doc.extras["__init_stats"]
142            out_stat = get_doc_info(doc)
143            async with self._stats_lock:
144                self._statistics.update_by_diff(in_stat, out_stat)
145            del doc.extras["__init_stats"]
146            yield doc
147
148    async def _count_input_stats(
149        self, async_stream: AsyncIterable[Document]
150    ) -> AsyncGenerator[Document, None]:
151        async for doc in async_stream:
152            doc.extras["__init_stats"] = get_doc_info(doc)
153            yield doc
154
155    def get_total_statistics(self) -> list[Statistics]:
156        """
157        Get the statistics of the Compose object and sub filters.
158
159        The statistics of the Compose class are stored in an object with the name "Total",
160        and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
161        """
162        stats = []
163        stats.append(self.get_statistics())
164        for i, filt in enumerate(self.filters):
165            stats.append(filt.get_statistics())
166        return stats
167
168    def get_total_statistics_map(self) -> list[dict[str, Any]]:
169        """
170        Get the statistics of the Compose object and sub filters as a list of dictionaries.
171        """
172        stats = self.get_total_statistics()
173        return [stat.to_dict() for stat in stats]
174
175    async def shutdown(self) -> None:
176        for filt in self.filters:
177            await filt.shutdown()
178        if not self._has_external_executor:
179            self._executor.shutdown()
180
181    async def __aenter__(self) -> "AsyncCompose":
182        return self
183
184    async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
185        await self.shutdown()
186        if exc_type is not None:
187            raise exc_value
class AsyncFilterAdapter(hojichar.core.async_filter_interface.AsyncFilter):
18class AsyncFilterAdapter(AsyncFilter):
19    """
20    Adapter class for executing hojichar.Filter asynchronously.
21    """
22
23    def __init__(
24        self,
25        sync_filter: Filter,
26        *args: Any,
27        executor: Executor | None = None,
28        use_batch: bool = True,
29        **kwargs: Any,
30    ):
31        """
32        Adapter class for executing hojichar.Filter asynchronously.
33        Used to incorporate Filter into AsyncCompose.
34
35        To reduce the overhead of asynchronous context switching,
36        use_batch is set to True by default to process in batches
37        in apply_stream, regardless of the sync_filter's use_batch setting.
38
39        If performing CPU-bound and heavy processing, you can specify an executor
40        to offload the processing to the executor. However, due to Python's GIL
41        constraints, using ThreadPoolExecutor will not parallelize CPU-bound
42        processing, and the entire process will be locked.
43
44        By using ProcessPoolExecutor as the executor, it may be possible to
45        parallelize CPU-bound processing. However, for parallelizing CPU-bound
46        processing, it is recommended to use the hojichar.Parallel class to
47        parallelize synchronous Compose pipeline.
48        """
49        super().__init__(*args, use_batch=use_batch, **kwargs)
50        self.sync_filter = sync_filter
51        self._has_external_executor = executor is not None
52        self._executor = executor or ThreadPoolExecutor()
53        self.batch_size = sync_filter.batch_size
54
55    async def apply(self, document: Document) -> Document:
56        loop = asyncio.get_running_loop()
57        return await loop.run_in_executor(self._executor, self.sync_filter.apply, document)
58
59    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
60        loop = asyncio.get_running_loop()
61        return await loop.run_in_executor(
62            self._executor,
63            lambda: self.sync_filter.apply_batch(batch),
64        )
65
66    async def shutdown(self) -> None:
67        self.sync_filter.shutdown()
68        if not self._has_external_executor:
69            self._executor.shutdown()

Adapter class for executing hojichar.Filter asynchronously.

AsyncFilterAdapter( sync_filter: hojichar.core.filter_interface.Filter, *args: Any, executor: concurrent.futures._base.Executor | None = None, use_batch: bool = True, **kwargs: Any)
23    def __init__(
24        self,
25        sync_filter: Filter,
26        *args: Any,
27        executor: Executor | None = None,
28        use_batch: bool = True,
29        **kwargs: Any,
30    ):
31        """
32        Adapter class for executing hojichar.Filter asynchronously.
33        Used to incorporate Filter into AsyncCompose.
34
35        To reduce the overhead of asynchronous context switching,
36        use_batch is set to True by default to process in batches
37        in apply_stream, regardless of the sync_filter's use_batch setting.
38
39        If performing CPU-bound and heavy processing, you can specify an executor
40        to offload the processing to the executor. However, due to Python's GIL
41        constraints, using ThreadPoolExecutor will not parallelize CPU-bound
42        processing, and the entire process will be locked.
43
44        By using ProcessPoolExecutor as the executor, it may be possible to
45        parallelize CPU-bound processing. However, for parallelizing CPU-bound
46        processing, it is recommended to use the hojichar.Parallel class to
47        parallelize synchronous Compose pipeline.
48        """
49        super().__init__(*args, use_batch=use_batch, **kwargs)
50        self.sync_filter = sync_filter
51        self._has_external_executor = executor is not None
52        self._executor = executor or ThreadPoolExecutor()
53        self.batch_size = sync_filter.batch_size

Adapter class for executing hojichar.Filter asynchronously. Used to incorporate Filter into AsyncCompose.

To reduce the overhead of asynchronous context switching, use_batch is set to True by default to process in batches in apply_stream, regardless of the sync_filter's use_batch setting.

If performing CPU-bound and heavy processing, you can specify an executor to offload the processing to the executor. However, due to Python's GIL constraints, using ThreadPoolExecutor will not parallelize CPU-bound processing, and the entire process will be locked.

By using ProcessPoolExecutor as the executor, it may be possible to parallelize CPU-bound processing. However, for parallelizing CPU-bound processing, it is recommended to use the hojichar.Parallel class to parallelize synchronous Compose pipeline.

async def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
55    async def apply(self, document: Document) -> Document:
56        loop = asyncio.get_running_loop()
57        return await loop.run_in_executor(self._executor, self.sync_filter.apply, document)

Definition of async filter behavior.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

async def apply_batch( self, batch: Sequence[hojichar.core.models.Document]) -> list[hojichar.core.models.Document]:
59    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
60        loop = asyncio.get_running_loop()
61        return await loop.run_in_executor(
62            self._executor,
63            lambda: self.sync_filter.apply_batch(batch),
64        )

Apply the filter to a Sequence of documents. By default, the processing implemented in apply is executed asynchronously and concurrently. If the filter processing can be optimized for batch processing, override this method.

async def shutdown(self) -> None:
66    async def shutdown(self) -> None:
67        self.sync_filter.shutdown()
68        if not self._has_external_executor:
69            self._executor.shutdown()

You can override this method to release resources or perform cleanup tasks.

class AsyncCompose(hojichar.core.async_filter_interface.AsyncFilter):
 72class AsyncCompose(AsyncFilter):
 73    def __init__(
 74        self,
 75        filters: list[AsyncFilter | Filter],
 76        random_state: int | np.random.Generator | None = None,
 77        executor: ThreadPoolExecutor | None = None,
 78        *args: Any,
 79        **kwargs: Any,
 80    ):
 81        super().__init__(random_state=random_state, *args, **kwargs)
 82        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
 83        self._statistics.name = "Total"
 84        self._has_external_executor = executor is not None
 85        self._executor = executor or ThreadPoolExecutor()
 86        self.set_filters(filters)
 87
 88    def set_filters(self, filters: list[AsyncFilter | Filter]) -> None:
 89        self.filters: list[AsyncFilter] = []
 90        filter_idx = 0
 91        for f in filters:
 92            if isinstance(f, (AsyncCompose, Compose)):
 93                for sub in f.filters:
 94                    name = f"{filter_idx}-{sub.__class__.__name__}"
 95                    if isinstance(sub, Filter):
 96                        name = f"{filter_idx}-{sub.__class__.__name__}"
 97                        sub = AsyncFilterAdapter(sub, executor=self._executor)
 98
 99                    sub._set_rng_if_not_initialized(self._rng)
100                    sub.name = name
101                    sub._statistics.name = name
102                    self.filters.append(sub)
103                    filter_idx += 1
104            else:
105                name = f"{filter_idx}-{f.__class__.__name__}"
106                if isinstance(f, Filter):
107                    name = f"{filter_idx}-{f.__class__.__name__}"
108                    f = AsyncFilterAdapter(f, executor=self._executor)
109                f._set_rng_if_not_initialized(self._rng)
110                f.name = name
111                f._statistics.name = name
112                self.filters.append(f)
113                filter_idx += 1
114
115    async def apply(self, document: Document) -> Document:
116        stat = get_doc_info(document)
117        for filter_idx, filt in enumerate(self.filters):
118            document = await filt._apply(document)
119        new_stat = get_doc_info(document)
120        async with self._stats_lock:
121            self._statistics.update_by_diff(stat, new_stat)
122        return document
123
124    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
125        stats = [get_doc_info(doc) for doc in batch]
126        for i, filt in enumerate(self.filters):
127            batch = await filt._apply_batch(batch)
128        batch = await self._finalize_batch(batch, stats)
129        return list(batch)
130
131    async def apply_stream(
132        self,
133        stream: AsyncIterable[Document] | Iterable[Document],
134    ) -> AsyncGenerator[Document, None]:
135        async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor)
136        async_stream = self._count_input_stats(async_stream)
137
138        for i, filt in enumerate(self.filters):
139            async_stream = filt.apply_stream(async_stream)
140
141        async for doc in async_stream:
142            in_stat = doc.extras["__init_stats"]
143            out_stat = get_doc_info(doc)
144            async with self._stats_lock:
145                self._statistics.update_by_diff(in_stat, out_stat)
146            del doc.extras["__init_stats"]
147            yield doc
148
149    async def _count_input_stats(
150        self, async_stream: AsyncIterable[Document]
151    ) -> AsyncGenerator[Document, None]:
152        async for doc in async_stream:
153            doc.extras["__init_stats"] = get_doc_info(doc)
154            yield doc
155
156    def get_total_statistics(self) -> list[Statistics]:
157        """
158        Get the statistics of the Compose object and sub filters.
159
160        The statistics of the Compose class are stored in an object with the name "Total",
161        and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
162        """
163        stats = []
164        stats.append(self.get_statistics())
165        for i, filt in enumerate(self.filters):
166            stats.append(filt.get_statistics())
167        return stats
168
169    def get_total_statistics_map(self) -> list[dict[str, Any]]:
170        """
171        Get the statistics of the Compose object and sub filters as a list of dictionaries.
172        """
173        stats = self.get_total_statistics()
174        return [stat.to_dict() for stat in stats]
175
176    async def shutdown(self) -> None:
177        for filt in self.filters:
178            await filt.shutdown()
179        if not self._has_external_executor:
180            self._executor.shutdown()
181
182    async def __aenter__(self) -> "AsyncCompose":
183        return self
184
185    async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
186        await self.shutdown()
187        if exc_type is not None:
188            raise exc_value

Helper class that provides a standard way to create an ABC using inheritance.

AsyncCompose( filters: list[hojichar.core.async_filter_interface.AsyncFilter | hojichar.core.filter_interface.Filter], random_state: int | numpy.random._generator.Generator | None = None, executor: concurrent.futures.thread.ThreadPoolExecutor | None = None, *args: Any, **kwargs: Any)
73    def __init__(
74        self,
75        filters: list[AsyncFilter | Filter],
76        random_state: int | np.random.Generator | None = None,
77        executor: ThreadPoolExecutor | None = None,
78        *args: Any,
79        **kwargs: Any,
80    ):
81        super().__init__(random_state=random_state, *args, **kwargs)
82        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
83        self._statistics.name = "Total"
84        self._has_external_executor = executor is not None
85        self._executor = executor or ThreadPoolExecutor()
86        self.set_filters(filters)

Base class for asynchronous filters.

Parameters

p : float The probability of applying the filter. If p is 1, the filter will always be applied. skip_rejected : bool If True, the filter will skip documents that are already rejected. If you want to apply the filter to all documents (e.g., postprocess), set this to False. random_state : Optional[Union[int, np.random.Generator]] Seed for the random number generator. If None is specified, the random number generator managed by the Compose class will be used. use_batch : bool If True, the filter will process documents in batches in the apply_stream method. batch_size : int The size of the batch to process documents in the apply_stream method.

def set_filters( self, filters: list[hojichar.core.async_filter_interface.AsyncFilter | hojichar.core.filter_interface.Filter]) -> None:
 88    def set_filters(self, filters: list[AsyncFilter | Filter]) -> None:
 89        self.filters: list[AsyncFilter] = []
 90        filter_idx = 0
 91        for f in filters:
 92            if isinstance(f, (AsyncCompose, Compose)):
 93                for sub in f.filters:
 94                    name = f"{filter_idx}-{sub.__class__.__name__}"
 95                    if isinstance(sub, Filter):
 96                        name = f"{filter_idx}-{sub.__class__.__name__}"
 97                        sub = AsyncFilterAdapter(sub, executor=self._executor)
 98
 99                    sub._set_rng_if_not_initialized(self._rng)
100                    sub.name = name
101                    sub._statistics.name = name
102                    self.filters.append(sub)
103                    filter_idx += 1
104            else:
105                name = f"{filter_idx}-{f.__class__.__name__}"
106                if isinstance(f, Filter):
107                    name = f"{filter_idx}-{f.__class__.__name__}"
108                    f = AsyncFilterAdapter(f, executor=self._executor)
109                f._set_rng_if_not_initialized(self._rng)
110                f.name = name
111                f._statistics.name = name
112                self.filters.append(f)
113                filter_idx += 1
async def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
115    async def apply(self, document: Document) -> Document:
116        stat = get_doc_info(document)
117        for filter_idx, filt in enumerate(self.filters):
118            document = await filt._apply(document)
119        new_stat = get_doc_info(document)
120        async with self._stats_lock:
121            self._statistics.update_by_diff(stat, new_stat)
122        return document

Definition of async filter behavior.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

async def apply_batch( self, batch: Sequence[hojichar.core.models.Document]) -> list[hojichar.core.models.Document]:
124    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
125        stats = [get_doc_info(doc) for doc in batch]
126        for i, filt in enumerate(self.filters):
127            batch = await filt._apply_batch(batch)
128        batch = await self._finalize_batch(batch, stats)
129        return list(batch)

Apply the filter to a Sequence of documents. By default, the processing implemented in apply is executed asynchronously and concurrently. If the filter processing can be optimized for batch processing, override this method.

async def apply_stream( self, stream: Union[AsyncIterable[hojichar.core.models.Document], Iterable[hojichar.core.models.Document]]) -> AsyncGenerator[hojichar.core.models.Document, NoneType]:
131    async def apply_stream(
132        self,
133        stream: AsyncIterable[Document] | Iterable[Document],
134    ) -> AsyncGenerator[Document, None]:
135        async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor)
136        async_stream = self._count_input_stats(async_stream)
137
138        for i, filt in enumerate(self.filters):
139            async_stream = filt.apply_stream(async_stream)
140
141        async for doc in async_stream:
142            in_stat = doc.extras["__init_stats"]
143            out_stat = get_doc_info(doc)
144            async with self._stats_lock:
145                self._statistics.update_by_diff(in_stat, out_stat)
146            del doc.extras["__init_stats"]
147            yield doc

Apply the filter to a stream of documents (Iterable or AsyncIterable). If use_batch is set to True at initialization, the filter will process documents in batches. If the stream is not asynchronous, use handle_stream_as_async to convert it to an asynchronous stream.

Even if an exception occurs during processing, the process will continue, and the following actions will be taken:

  • Set the is_rejected flag of the document to True
  • Set the error details in reject_reason
  • Increment the errors count in the statistics retrievable via get_statistics
def get_total_statistics(self) -> list[hojichar.core.models.Statistics]:
156    def get_total_statistics(self) -> list[Statistics]:
157        """
158        Get the statistics of the Compose object and sub filters.
159
160        The statistics of the Compose class are stored in an object with the name "Total",
161        and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
162        """
163        stats = []
164        stats.append(self.get_statistics())
165        for i, filt in enumerate(self.filters):
166            stats.append(filt.get_statistics())
167        return stats

Get the statistics of the Compose object and sub filters.

The statistics of the Compose class are stored in an object with the name "Total", and sub-filters's are stored with names in the format {filter_index}-{filter class name}.

def get_total_statistics_map(self) -> list[dict[str, typing.Any]]:
169    def get_total_statistics_map(self) -> list[dict[str, Any]]:
170        """
171        Get the statistics of the Compose object and sub filters as a list of dictionaries.
172        """
173        stats = self.get_total_statistics()
174        return [stat.to_dict() for stat in stats]

Get the statistics of the Compose object and sub filters as a list of dictionaries.

async def shutdown(self) -> None:
176    async def shutdown(self) -> None:
177        for filt in self.filters:
178            await filt.shutdown()
179        if not self._has_external_executor:
180            self._executor.shutdown()

You can override this method to release resources or perform cleanup tasks.