hojichar.core.async_composition

  1from __future__ import annotations
  2
  3import asyncio
  4import logging
  5from concurrent.futures import Executor, ThreadPoolExecutor
  6from typing import Any, AsyncGenerator, AsyncIterable, Iterable, Sequence
  7
  8import numpy as np
  9
 10from hojichar.core import inspection
 11from hojichar.core.async_filter_interface import AsyncFilter
 12from hojichar.core.composition import Compose
 13from hojichar.core.filter_interface import Filter
 14from hojichar.core.models import Document, Statistics, get_doc_info
 15from hojichar.utils.async_handlers import handle_stream_as_async
 16
 17
 18class AsyncFilterAdapter(AsyncFilter):
 19    """
 20    Adapter class for executing hojichar.Filter asynchronously.
 21    """
 22
 23    def __init__(
 24        self,
 25        sync_filter: Filter,
 26        *args: Any,
 27        executor: Executor | None = None,
 28        use_batch: bool = True,
 29        **kwargs: Any,
 30    ):
 31        """
 32        Adapter class for executing hojichar.Filter asynchronously.
 33        Used to incorporate Filter into AsyncCompose.
 34
 35        To reduce the overhead of asynchronous context switching,
 36        use_batch is set to True by default to process in batches
 37        in apply_stream, regardless of the sync_filter's use_batch setting.
 38
 39        If performing CPU-bound and heavy processing, you can specify an executor
 40        to offload the processing to the executor. However, due to Python's GIL
 41        constraints, using ThreadPoolExecutor will not parallelize CPU-bound
 42        processing, and the entire process will be locked.
 43
 44        By using ProcessPoolExecutor as the executor, it may be possible to
 45        parallelize CPU-bound processing. However, for parallelizing CPU-bound
 46        processing, it is recommended to use the hojichar.Parallel class to
 47        parallelize synchronous Compose pipeline.
 48        """
 49        super().__init__(*args, use_batch=use_batch, **kwargs)
 50        self.sync_filter = sync_filter
 51        self._has_external_executor = executor is not None
 52        self._executor = executor or ThreadPoolExecutor()
 53        self.batch_size = sync_filter.batch_size
 54
 55    async def apply(self, document: Document) -> Document:
 56        loop = asyncio.get_running_loop()
 57        return await loop.run_in_executor(self._executor, self.sync_filter.apply, document)
 58
 59    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
 60        loop = asyncio.get_running_loop()
 61        return await loop.run_in_executor(
 62            self._executor,
 63            lambda: self.sync_filter.apply_batch(batch),
 64        )
 65
 66    async def shutdown(self) -> None:
 67        self.sync_filter.shutdown()
 68        if not self._has_external_executor:
 69            self._executor.shutdown()
 70
 71
 72class AsyncCompose(AsyncFilter):
 73    def __init__(
 74        self,
 75        filters: list[AsyncFilter | Filter],
 76        random_state: int | np.random.Generator | None = None,
 77        executor: ThreadPoolExecutor | None = None,
 78        *args: Any,
 79        **kwargs: Any,
 80    ):
 81        super().__init__(random_state=random_state, *args, **kwargs)
 82        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
 83        self._statistics.name = "Total"
 84        self._has_external_executor = executor is not None
 85        self._executor = executor or ThreadPoolExecutor()
 86        self.set_filters(filters)
 87
 88    def set_filters(self, filters: list[AsyncFilter | Filter]) -> None:
 89        self.filters: list[AsyncFilter] = []
 90        filter_idx = 0
 91        for f in filters:
 92            if isinstance(f, (AsyncCompose, Compose)):
 93                for sub in f.filters:
 94                    name = f"{filter_idx}-{sub.__class__.__name__}"
 95                    if isinstance(sub, Filter):
 96                        name = f"{filter_idx}-{sub.__class__.__name__}"
 97                        sub = AsyncFilterAdapter(sub, executor=self._executor)
 98
 99                    sub._set_rng_if_not_initialized(self._rng)
100                    sub.name = name
101                    sub._statistics.name = name
102                    self.filters.append(sub)
103                    filter_idx += 1
104            else:
105                name = f"{filter_idx}-{f.__class__.__name__}"
106                if isinstance(f, Filter):
107                    name = f"{filter_idx}-{f.__class__.__name__}"
108                    f = AsyncFilterAdapter(f, executor=self._executor)
109                f._set_rng_if_not_initialized(self._rng)
110                f.name = name
111                f._statistics.name = name
112                self.filters.append(f)
113                filter_idx += 1
114
115    async def apply(self, document: Document) -> Document:
116        stat = get_doc_info(document)
117        for filter_idx, filt in enumerate(self.filters):
118            document = await filt._apply(document)
119        new_stat = get_doc_info(document)
120        async with self._stats_lock:
121            self._statistics.update_by_diff(stat, new_stat)
122        return document
123
124    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
125        stats = [get_doc_info(doc) for doc in batch]
126        for i, filt in enumerate(self.filters):
127            batch = await filt._apply_batch(batch)
128        batch = await self._finalize_batch(batch, stats)
129        return list(batch)
130
131    async def apply_stream(
132        self,
133        stream: AsyncIterable[Document] | Iterable[Document],
134    ) -> AsyncGenerator[Document, None]:
135        async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor)
136        async_stream = self._count_input_stats(async_stream)
137
138        for i, filt in enumerate(self.filters):
139            async_stream = filt.apply_stream(async_stream)
140
141        async for doc in async_stream:
142            in_stat = doc._get_initial_stats()
143            if in_stat is None:
144                in_stat = get_doc_info(doc)
145                self.logger.debug(
146                    "Initial stats missing for document during async stream aggregation; "
147                    "using current stats as fallback"
148                )
149            out_stat = get_doc_info(doc)
150            async with self._stats_lock:
151                self._statistics.update_by_diff(in_stat, out_stat)
152            doc._clear_initial_stats()
153            yield doc
154
155    async def _count_input_stats(
156        self, async_stream: AsyncIterable[Document]
157    ) -> AsyncGenerator[Document, None]:
158        async for doc in async_stream:
159            doc._set_initial_stats(get_doc_info(doc))
160            yield doc
161
162    def get_total_statistics(self) -> list[Statistics]:
163        """
164        Get the statistics of the Compose object and sub filters.
165
166        The statistics of the Compose class are stored in an object with the name "Total",
167        and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
168        """
169        stats = []
170        stats.append(self.get_statistics())
171        for i, filt in enumerate(self.filters):
172            stats.append(filt.get_statistics())
173        return stats
174
175    def get_total_statistics_map(self) -> list[dict[str, Any]]:
176        """
177        Get the statistics of the Compose object and sub filters as a list of dictionaries.
178        """
179        stats = self.get_total_statistics()
180        return [stat.to_dict() for stat in stats]
181
182    @property
183    def statistics(self) -> dict:
184        """
185        Deprecated
186
187        Get the statistics of the Compose object and sub filters.
188
189        This property is retained for compatibility with previous versions.
190        Please use `get_total_statistics` or `get_total_statistics_map` instead.
191        """
192        return inspection.statistics_obj_adapter(  # type: ignore
193            self.get_total_statistics()
194        ).get_human_readable_values()
195
196    @property
197    def statistics_obj(self) -> inspection.StatsContainer:
198        """
199        Deprecated
200
201        Get the statistics of the AsyncCompose object and sub filters.
202        This method returns a StatsContainer object which contains the statistics
203        of the AsyncCompose object and sub filters.
204
205        This property is retained for compatibility with previous versions.
206        Please use `get_total_statistics` or `get_total_statistics_map` instead.
207        """
208        return inspection.statistics_obj_adapter(self.get_total_statistics())  # type: ignore
209
210    async def shutdown(self) -> None:
211        for filt in self.filters:
212            await filt.shutdown()
213        if not self._has_external_executor:
214            self._executor.shutdown()
215
216    async def __aenter__(self) -> "AsyncCompose":
217        return self
218
219    async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
220        await self.shutdown()
221        if exc_type is not None:
222            raise exc_value
class AsyncFilterAdapter(hojichar.core.async_filter_interface.AsyncFilter):
19class AsyncFilterAdapter(AsyncFilter):
20    """
21    Adapter class for executing hojichar.Filter asynchronously.
22    """
23
24    def __init__(
25        self,
26        sync_filter: Filter,
27        *args: Any,
28        executor: Executor | None = None,
29        use_batch: bool = True,
30        **kwargs: Any,
31    ):
32        """
33        Adapter class for executing hojichar.Filter asynchronously.
34        Used to incorporate Filter into AsyncCompose.
35
36        To reduce the overhead of asynchronous context switching,
37        use_batch is set to True by default to process in batches
38        in apply_stream, regardless of the sync_filter's use_batch setting.
39
40        If performing CPU-bound and heavy processing, you can specify an executor
41        to offload the processing to the executor. However, due to Python's GIL
42        constraints, using ThreadPoolExecutor will not parallelize CPU-bound
43        processing, and the entire process will be locked.
44
45        By using ProcessPoolExecutor as the executor, it may be possible to
46        parallelize CPU-bound processing. However, for parallelizing CPU-bound
47        processing, it is recommended to use the hojichar.Parallel class to
48        parallelize synchronous Compose pipeline.
49        """
50        super().__init__(*args, use_batch=use_batch, **kwargs)
51        self.sync_filter = sync_filter
52        self._has_external_executor = executor is not None
53        self._executor = executor or ThreadPoolExecutor()
54        self.batch_size = sync_filter.batch_size
55
56    async def apply(self, document: Document) -> Document:
57        loop = asyncio.get_running_loop()
58        return await loop.run_in_executor(self._executor, self.sync_filter.apply, document)
59
60    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
61        loop = asyncio.get_running_loop()
62        return await loop.run_in_executor(
63            self._executor,
64            lambda: self.sync_filter.apply_batch(batch),
65        )
66
67    async def shutdown(self) -> None:
68        self.sync_filter.shutdown()
69        if not self._has_external_executor:
70            self._executor.shutdown()

Adapter class for executing hojichar.Filter asynchronously.

AsyncFilterAdapter( sync_filter: hojichar.core.filter_interface.Filter, *args: Any, executor: concurrent.futures._base.Executor | None = None, use_batch: bool = True, **kwargs: Any)
24    def __init__(
25        self,
26        sync_filter: Filter,
27        *args: Any,
28        executor: Executor | None = None,
29        use_batch: bool = True,
30        **kwargs: Any,
31    ):
32        """
33        Adapter class for executing hojichar.Filter asynchronously.
34        Used to incorporate Filter into AsyncCompose.
35
36        To reduce the overhead of asynchronous context switching,
37        use_batch is set to True by default to process in batches
38        in apply_stream, regardless of the sync_filter's use_batch setting.
39
40        If performing CPU-bound and heavy processing, you can specify an executor
41        to offload the processing to the executor. However, due to Python's GIL
42        constraints, using ThreadPoolExecutor will not parallelize CPU-bound
43        processing, and the entire process will be locked.
44
45        By using ProcessPoolExecutor as the executor, it may be possible to
46        parallelize CPU-bound processing. However, for parallelizing CPU-bound
47        processing, it is recommended to use the hojichar.Parallel class to
48        parallelize synchronous Compose pipeline.
49        """
50        super().__init__(*args, use_batch=use_batch, **kwargs)
51        self.sync_filter = sync_filter
52        self._has_external_executor = executor is not None
53        self._executor = executor or ThreadPoolExecutor()
54        self.batch_size = sync_filter.batch_size

Adapter class for executing hojichar.Filter asynchronously. Used to incorporate Filter into AsyncCompose.

To reduce the overhead of asynchronous context switching, use_batch is set to True by default to process in batches in apply_stream, regardless of the sync_filter's use_batch setting.

If performing CPU-bound and heavy processing, you can specify an executor to offload the processing to the executor. However, due to Python's GIL constraints, using ThreadPoolExecutor will not parallelize CPU-bound processing, and the entire process will be locked.

By using ProcessPoolExecutor as the executor, it may be possible to parallelize CPU-bound processing. However, for parallelizing CPU-bound processing, it is recommended to use the hojichar.Parallel class to parallelize synchronous Compose pipeline.

async def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
56    async def apply(self, document: Document) -> Document:
57        loop = asyncio.get_running_loop()
58        return await loop.run_in_executor(self._executor, self.sync_filter.apply, document)

Definition of async filter behavior.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

async def apply_batch( self, batch: Sequence[hojichar.core.models.Document]) -> list[hojichar.core.models.Document]:
60    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
61        loop = asyncio.get_running_loop()
62        return await loop.run_in_executor(
63            self._executor,
64            lambda: self.sync_filter.apply_batch(batch),
65        )

Apply the filter to a Sequence of documents. By default, the processing implemented in apply is executed asynchronously and concurrently. If the filter processing can be optimized for batch processing, override this method.

async def shutdown(self) -> None:
67    async def shutdown(self) -> None:
68        self.sync_filter.shutdown()
69        if not self._has_external_executor:
70            self._executor.shutdown()

You can override this method to release resources or perform cleanup tasks.

class AsyncCompose(hojichar.core.async_filter_interface.AsyncFilter):
 73class AsyncCompose(AsyncFilter):
 74    def __init__(
 75        self,
 76        filters: list[AsyncFilter | Filter],
 77        random_state: int | np.random.Generator | None = None,
 78        executor: ThreadPoolExecutor | None = None,
 79        *args: Any,
 80        **kwargs: Any,
 81    ):
 82        super().__init__(random_state=random_state, *args, **kwargs)
 83        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
 84        self._statistics.name = "Total"
 85        self._has_external_executor = executor is not None
 86        self._executor = executor or ThreadPoolExecutor()
 87        self.set_filters(filters)
 88
 89    def set_filters(self, filters: list[AsyncFilter | Filter]) -> None:
 90        self.filters: list[AsyncFilter] = []
 91        filter_idx = 0
 92        for f in filters:
 93            if isinstance(f, (AsyncCompose, Compose)):
 94                for sub in f.filters:
 95                    name = f"{filter_idx}-{sub.__class__.__name__}"
 96                    if isinstance(sub, Filter):
 97                        name = f"{filter_idx}-{sub.__class__.__name__}"
 98                        sub = AsyncFilterAdapter(sub, executor=self._executor)
 99
100                    sub._set_rng_if_not_initialized(self._rng)
101                    sub.name = name
102                    sub._statistics.name = name
103                    self.filters.append(sub)
104                    filter_idx += 1
105            else:
106                name = f"{filter_idx}-{f.__class__.__name__}"
107                if isinstance(f, Filter):
108                    name = f"{filter_idx}-{f.__class__.__name__}"
109                    f = AsyncFilterAdapter(f, executor=self._executor)
110                f._set_rng_if_not_initialized(self._rng)
111                f.name = name
112                f._statistics.name = name
113                self.filters.append(f)
114                filter_idx += 1
115
116    async def apply(self, document: Document) -> Document:
117        stat = get_doc_info(document)
118        for filter_idx, filt in enumerate(self.filters):
119            document = await filt._apply(document)
120        new_stat = get_doc_info(document)
121        async with self._stats_lock:
122            self._statistics.update_by_diff(stat, new_stat)
123        return document
124
125    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
126        stats = [get_doc_info(doc) for doc in batch]
127        for i, filt in enumerate(self.filters):
128            batch = await filt._apply_batch(batch)
129        batch = await self._finalize_batch(batch, stats)
130        return list(batch)
131
132    async def apply_stream(
133        self,
134        stream: AsyncIterable[Document] | Iterable[Document],
135    ) -> AsyncGenerator[Document, None]:
136        async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor)
137        async_stream = self._count_input_stats(async_stream)
138
139        for i, filt in enumerate(self.filters):
140            async_stream = filt.apply_stream(async_stream)
141
142        async for doc in async_stream:
143            in_stat = doc._get_initial_stats()
144            if in_stat is None:
145                in_stat = get_doc_info(doc)
146                self.logger.debug(
147                    "Initial stats missing for document during async stream aggregation; "
148                    "using current stats as fallback"
149                )
150            out_stat = get_doc_info(doc)
151            async with self._stats_lock:
152                self._statistics.update_by_diff(in_stat, out_stat)
153            doc._clear_initial_stats()
154            yield doc
155
156    async def _count_input_stats(
157        self, async_stream: AsyncIterable[Document]
158    ) -> AsyncGenerator[Document, None]:
159        async for doc in async_stream:
160            doc._set_initial_stats(get_doc_info(doc))
161            yield doc
162
163    def get_total_statistics(self) -> list[Statistics]:
164        """
165        Get the statistics of the Compose object and sub filters.
166
167        The statistics of the Compose class are stored in an object with the name "Total",
168        and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
169        """
170        stats = []
171        stats.append(self.get_statistics())
172        for i, filt in enumerate(self.filters):
173            stats.append(filt.get_statistics())
174        return stats
175
176    def get_total_statistics_map(self) -> list[dict[str, Any]]:
177        """
178        Get the statistics of the Compose object and sub filters as a list of dictionaries.
179        """
180        stats = self.get_total_statistics()
181        return [stat.to_dict() for stat in stats]
182
183    @property
184    def statistics(self) -> dict:
185        """
186        Deprecated
187
188        Get the statistics of the Compose object and sub filters.
189
190        This property is retained for compatibility with previous versions.
191        Please use `get_total_statistics` or `get_total_statistics_map` instead.
192        """
193        return inspection.statistics_obj_adapter(  # type: ignore
194            self.get_total_statistics()
195        ).get_human_readable_values()
196
197    @property
198    def statistics_obj(self) -> inspection.StatsContainer:
199        """
200        Deprecated
201
202        Get the statistics of the AsyncCompose object and sub filters.
203        This method returns a StatsContainer object which contains the statistics
204        of the AsyncCompose object and sub filters.
205
206        This property is retained for compatibility with previous versions.
207        Please use `get_total_statistics` or `get_total_statistics_map` instead.
208        """
209        return inspection.statistics_obj_adapter(self.get_total_statistics())  # type: ignore
210
211    async def shutdown(self) -> None:
212        for filt in self.filters:
213            await filt.shutdown()
214        if not self._has_external_executor:
215            self._executor.shutdown()
216
217    async def __aenter__(self) -> "AsyncCompose":
218        return self
219
220    async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
221        await self.shutdown()
222        if exc_type is not None:
223            raise exc_value

Helper class that provides a standard way to create an ABC using inheritance.

AsyncCompose( filters: list[hojichar.core.async_filter_interface.AsyncFilter | hojichar.core.filter_interface.Filter], random_state: int | numpy.random._generator.Generator | None = None, executor: concurrent.futures.thread.ThreadPoolExecutor | None = None, *args: Any, **kwargs: Any)
74    def __init__(
75        self,
76        filters: list[AsyncFilter | Filter],
77        random_state: int | np.random.Generator | None = None,
78        executor: ThreadPoolExecutor | None = None,
79        *args: Any,
80        **kwargs: Any,
81    ):
82        super().__init__(random_state=random_state, *args, **kwargs)
83        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
84        self._statistics.name = "Total"
85        self._has_external_executor = executor is not None
86        self._executor = executor or ThreadPoolExecutor()
87        self.set_filters(filters)

Base class for asynchronous filters.

Parameters

p : float The probability of applying the filter. If p is 1, the filter will always be applied. skip_rejected : bool If True, the filter will skip documents that are already rejected. If you want to apply the filter to all documents (e.g., postprocess), set this to False. random_state : Optional[Union[int, np.random.Generator]] Seed for the random number generator. If None is specified, the random number generator managed by the Compose class will be used. use_batch : bool If True, the filter will process documents in batches in the apply_stream method. batch_size : int The size of the batch to process documents in the apply_stream method.

def set_filters( self, filters: list[hojichar.core.async_filter_interface.AsyncFilter | hojichar.core.filter_interface.Filter]) -> None:
 89    def set_filters(self, filters: list[AsyncFilter | Filter]) -> None:
 90        self.filters: list[AsyncFilter] = []
 91        filter_idx = 0
 92        for f in filters:
 93            if isinstance(f, (AsyncCompose, Compose)):
 94                for sub in f.filters:
 95                    name = f"{filter_idx}-{sub.__class__.__name__}"
 96                    if isinstance(sub, Filter):
 97                        name = f"{filter_idx}-{sub.__class__.__name__}"
 98                        sub = AsyncFilterAdapter(sub, executor=self._executor)
 99
100                    sub._set_rng_if_not_initialized(self._rng)
101                    sub.name = name
102                    sub._statistics.name = name
103                    self.filters.append(sub)
104                    filter_idx += 1
105            else:
106                name = f"{filter_idx}-{f.__class__.__name__}"
107                if isinstance(f, Filter):
108                    name = f"{filter_idx}-{f.__class__.__name__}"
109                    f = AsyncFilterAdapter(f, executor=self._executor)
110                f._set_rng_if_not_initialized(self._rng)
111                f.name = name
112                f._statistics.name = name
113                self.filters.append(f)
114                filter_idx += 1
async def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
116    async def apply(self, document: Document) -> Document:
117        stat = get_doc_info(document)
118        for filter_idx, filt in enumerate(self.filters):
119            document = await filt._apply(document)
120        new_stat = get_doc_info(document)
121        async with self._stats_lock:
122            self._statistics.update_by_diff(stat, new_stat)
123        return document

Definition of async filter behavior.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

async def apply_batch( self, batch: Sequence[hojichar.core.models.Document]) -> list[hojichar.core.models.Document]:
125    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
126        stats = [get_doc_info(doc) for doc in batch]
127        for i, filt in enumerate(self.filters):
128            batch = await filt._apply_batch(batch)
129        batch = await self._finalize_batch(batch, stats)
130        return list(batch)

Apply the filter to a Sequence of documents. By default, the processing implemented in apply is executed asynchronously and concurrently. If the filter processing can be optimized for batch processing, override this method.

async def apply_stream( self, stream: Union[AsyncIterable[hojichar.core.models.Document], Iterable[hojichar.core.models.Document]]) -> AsyncGenerator[hojichar.core.models.Document, NoneType]:
132    async def apply_stream(
133        self,
134        stream: AsyncIterable[Document] | Iterable[Document],
135    ) -> AsyncGenerator[Document, None]:
136        async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor)
137        async_stream = self._count_input_stats(async_stream)
138
139        for i, filt in enumerate(self.filters):
140            async_stream = filt.apply_stream(async_stream)
141
142        async for doc in async_stream:
143            in_stat = doc._get_initial_stats()
144            if in_stat is None:
145                in_stat = get_doc_info(doc)
146                self.logger.debug(
147                    "Initial stats missing for document during async stream aggregation; "
148                    "using current stats as fallback"
149                )
150            out_stat = get_doc_info(doc)
151            async with self._stats_lock:
152                self._statistics.update_by_diff(in_stat, out_stat)
153            doc._clear_initial_stats()
154            yield doc

Apply the filter to a stream of documents (Iterable or AsyncIterable). If use_batch is set to True at initialization, the filter will process documents in batches. If the stream is not asynchronous, use handle_stream_as_async to convert it to an asynchronous stream.

Even if an exception occurs during processing, the process will continue, and the following actions will be taken:

  • Set the is_rejected flag of the document to True
  • Set the error details in reject_reason
  • Increment the errors count in the statistics retrievable via get_statistics
def get_total_statistics(self) -> list[hojichar.core.models.Statistics]:
163    def get_total_statistics(self) -> list[Statistics]:
164        """
165        Get the statistics of the Compose object and sub filters.
166
167        The statistics of the Compose class are stored in an object with the name "Total",
168        and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
169        """
170        stats = []
171        stats.append(self.get_statistics())
172        for i, filt in enumerate(self.filters):
173            stats.append(filt.get_statistics())
174        return stats

Get the statistics of the Compose object and sub filters.

The statistics of the Compose class are stored in an object with the name "Total", and sub-filters's are stored with names in the format {filter_index}-{filter class name}.

def get_total_statistics_map(self) -> list[dict[str, typing.Any]]:
176    def get_total_statistics_map(self) -> list[dict[str, Any]]:
177        """
178        Get the statistics of the Compose object and sub filters as a list of dictionaries.
179        """
180        stats = self.get_total_statistics()
181        return [stat.to_dict() for stat in stats]

Get the statistics of the Compose object and sub filters as a list of dictionaries.

statistics: dict

Deprecated

Get the statistics of the Compose object and sub filters.

This property is retained for compatibility with previous versions. Please use get_total_statistics or get_total_statistics_map instead.

Deprecated

Get the statistics of the AsyncCompose object and sub filters. This method returns a StatsContainer object which contains the statistics of the AsyncCompose object and sub filters.

This property is retained for compatibility with previous versions. Please use get_total_statistics or get_total_statistics_map instead.

async def shutdown(self) -> None:
211    async def shutdown(self) -> None:
212        for filt in self.filters:
213            await filt.shutdown()
214        if not self._has_external_executor:
215            self._executor.shutdown()

You can override this method to release resources or perform cleanup tasks.