hojichar.core.async_composition

  1from __future__ import annotations
  2
  3import asyncio
  4import logging
  5from concurrent.futures import Executor, ThreadPoolExecutor
  6from typing import Any, AsyncGenerator, AsyncIterable, Iterable, Sequence
  7
  8import numpy as np
  9
 10from hojichar.core import inspection
 11from hojichar.core.async_filter_interface import AsyncFilter
 12from hojichar.core.composition import Compose
 13from hojichar.core.filter_interface import Filter
 14from hojichar.core.models import Document, Statistics, get_doc_info
 15from hojichar.utils.async_handlers import handle_stream_as_async
 16
 17
 18class AsyncFilterAdapter(AsyncFilter):
 19    """
 20    Adapter class for executing hojichar.Filter asynchronously.
 21    """
 22
 23    def __init__(
 24        self,
 25        sync_filter: Filter,
 26        *args: Any,
 27        executor: Executor | None = None,
 28        use_batch: bool = True,
 29        **kwargs: Any,
 30    ):
 31        """
 32        Adapter class for executing hojichar.Filter asynchronously.
 33        Used to incorporate Filter into AsyncCompose.
 34
 35        To reduce the overhead of asynchronous context switching,
 36        use_batch is set to True by default to process in batches
 37        in apply_stream, regardless of the sync_filter's use_batch setting.
 38
 39        If performing CPU-bound and heavy processing, you can specify an executor
 40        to offload the processing to the executor. However, due to Python's GIL
 41        constraints, using ThreadPoolExecutor will not parallelize CPU-bound
 42        processing, and the entire process will be locked.
 43
 44        By using ProcessPoolExecutor as the executor, it may be possible to
 45        parallelize CPU-bound processing. However, for parallelizing CPU-bound
 46        processing, it is recommended to use the hojichar.Parallel class to
 47        parallelize synchronous Compose pipeline.
 48        """
 49        super().__init__(*args, use_batch=use_batch, **kwargs)
 50        self.sync_filter = sync_filter
 51        self._has_external_executor = executor is not None
 52        self._executor = executor or ThreadPoolExecutor()
 53        self.batch_size = sync_filter.batch_size
 54
 55    async def apply(self, document: Document) -> Document:
 56        loop = asyncio.get_running_loop()
 57        return await loop.run_in_executor(self._executor, self.sync_filter.apply, document)
 58
 59    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
 60        loop = asyncio.get_running_loop()
 61        return await loop.run_in_executor(
 62            self._executor,
 63            lambda: self.sync_filter.apply_batch(batch),
 64        )
 65
 66    async def shutdown(self) -> None:
 67        self.sync_filter.shutdown()
 68        if not self._has_external_executor:
 69            self._executor.shutdown()
 70
 71
 72class AsyncCompose(AsyncFilter):
 73    def __init__(
 74        self,
 75        filters: list[AsyncFilter | Filter],
 76        random_state: int | np.random.Generator | None = None,
 77        executor: ThreadPoolExecutor | None = None,
 78        *args: Any,
 79        **kwargs: Any,
 80    ):
 81        super().__init__(random_state=random_state, *args, **kwargs)
 82        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
 83        self._statistics.name = "Total"
 84        self._has_external_executor = executor is not None
 85        self._executor = executor or ThreadPoolExecutor()
 86        self.set_filters(filters)
 87
 88    def set_filters(self, filters: list[AsyncFilter | Filter]) -> None:
 89        self.filters: list[AsyncFilter] = []
 90        filter_idx = 0
 91        for f in filters:
 92            if isinstance(f, (AsyncCompose, Compose)):
 93                for sub in f.filters:
 94                    name = f"{filter_idx}-{sub.__class__.__name__}"
 95                    if isinstance(sub, Filter):
 96                        name = f"{filter_idx}-{sub.__class__.__name__}"
 97                        sub = AsyncFilterAdapter(sub, executor=self._executor)
 98
 99                    sub._set_rng_if_not_initialized(self._rng)
100                    sub.name = name
101                    sub._statistics.name = name
102                    self.filters.append(sub)
103                    filter_idx += 1
104            else:
105                name = f"{filter_idx}-{f.__class__.__name__}"
106                if isinstance(f, Filter):
107                    name = f"{filter_idx}-{f.__class__.__name__}"
108                    f = AsyncFilterAdapter(f, executor=self._executor)
109                f._set_rng_if_not_initialized(self._rng)
110                f.name = name
111                f._statistics.name = name
112                self.filters.append(f)
113                filter_idx += 1
114
115    async def apply(self, document: Document) -> Document:
116        stat = get_doc_info(document)
117        for filter_idx, filt in enumerate(self.filters):
118            document = await filt._apply(document)
119        new_stat = get_doc_info(document)
120        async with self._stats_lock:
121            self._statistics.update_by_diff(stat, new_stat)
122        return document
123
124    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
125        stats = [get_doc_info(doc) for doc in batch]
126        for i, filt in enumerate(self.filters):
127            batch = await filt._apply_batch(batch)
128        batch = await self._finalize_batch(batch, stats)
129        return list(batch)
130
131    async def apply_stream(
132        self,
133        stream: AsyncIterable[Document] | Iterable[Document],
134    ) -> AsyncGenerator[Document, None]:
135        async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor)
136        async_stream = self._count_input_stats(async_stream)
137
138        for i, filt in enumerate(self.filters):
139            async_stream = filt.apply_stream(async_stream)
140
141        async for doc in async_stream:
142            in_stat = doc.extras["__init_stats"]
143            out_stat = get_doc_info(doc)
144            async with self._stats_lock:
145                self._statistics.update_by_diff(in_stat, out_stat)
146            del doc.extras["__init_stats"]
147            yield doc
148
149    async def _count_input_stats(
150        self, async_stream: AsyncIterable[Document]
151    ) -> AsyncGenerator[Document, None]:
152        async for doc in async_stream:
153            doc.extras["__init_stats"] = get_doc_info(doc)
154            yield doc
155
156    def get_total_statistics(self) -> list[Statistics]:
157        """
158        Get the statistics of the Compose object and sub filters.
159
160        The statistics of the Compose class are stored in an object with the name "Total",
161        and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
162        """
163        stats = []
164        stats.append(self.get_statistics())
165        for i, filt in enumerate(self.filters):
166            stats.append(filt.get_statistics())
167        return stats
168
169    def get_total_statistics_map(self) -> list[dict[str, Any]]:
170        """
171        Get the statistics of the Compose object and sub filters as a list of dictionaries.
172        """
173        stats = self.get_total_statistics()
174        return [stat.to_dict() for stat in stats]
175
176    @property
177    def statistics(self) -> dict:
178        """
179        Deprecated
180
181        Get the statistics of the Compose object and sub filters.
182
183        This property is retained for compatibility with previous versions.
184        Please use `get_total_statistics` or `get_total_statistics_map` instead.
185        """
186        return inspection.statistics_obj_adapter(  # type: ignore
187            self.get_total_statistics()
188        ).get_human_readable_values()
189
190    @property
191    def statistics_obj(self) -> inspection.StatsContainer:
192        """
193        Deprecated
194
195        Get the statistics of the AsyncCompose object and sub filters.
196        This method returns a StatsContainer object which contains the statistics
197        of the AsyncCompose object and sub filters.
198
199        This property is retained for compatibility with previous versions.
200        Please use `get_total_statistics` or `get_total_statistics_map` instead.
201        """
202        return inspection.statistics_obj_adapter(self.get_total_statistics())  # type: ignore
203
204    async def shutdown(self) -> None:
205        for filt in self.filters:
206            await filt.shutdown()
207        if not self._has_external_executor:
208            self._executor.shutdown()
209
210    async def __aenter__(self) -> "AsyncCompose":
211        return self
212
213    async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
214        await self.shutdown()
215        if exc_type is not None:
216            raise exc_value
class AsyncFilterAdapter(hojichar.core.async_filter_interface.AsyncFilter):
19class AsyncFilterAdapter(AsyncFilter):
20    """
21    Adapter class for executing hojichar.Filter asynchronously.
22    """
23
24    def __init__(
25        self,
26        sync_filter: Filter,
27        *args: Any,
28        executor: Executor | None = None,
29        use_batch: bool = True,
30        **kwargs: Any,
31    ):
32        """
33        Adapter class for executing hojichar.Filter asynchronously.
34        Used to incorporate Filter into AsyncCompose.
35
36        To reduce the overhead of asynchronous context switching,
37        use_batch is set to True by default to process in batches
38        in apply_stream, regardless of the sync_filter's use_batch setting.
39
40        If performing CPU-bound and heavy processing, you can specify an executor
41        to offload the processing to the executor. However, due to Python's GIL
42        constraints, using ThreadPoolExecutor will not parallelize CPU-bound
43        processing, and the entire process will be locked.
44
45        By using ProcessPoolExecutor as the executor, it may be possible to
46        parallelize CPU-bound processing. However, for parallelizing CPU-bound
47        processing, it is recommended to use the hojichar.Parallel class to
48        parallelize synchronous Compose pipeline.
49        """
50        super().__init__(*args, use_batch=use_batch, **kwargs)
51        self.sync_filter = sync_filter
52        self._has_external_executor = executor is not None
53        self._executor = executor or ThreadPoolExecutor()
54        self.batch_size = sync_filter.batch_size
55
56    async def apply(self, document: Document) -> Document:
57        loop = asyncio.get_running_loop()
58        return await loop.run_in_executor(self._executor, self.sync_filter.apply, document)
59
60    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
61        loop = asyncio.get_running_loop()
62        return await loop.run_in_executor(
63            self._executor,
64            lambda: self.sync_filter.apply_batch(batch),
65        )
66
67    async def shutdown(self) -> None:
68        self.sync_filter.shutdown()
69        if not self._has_external_executor:
70            self._executor.shutdown()

Adapter class for executing hojichar.Filter asynchronously.

AsyncFilterAdapter( sync_filter: hojichar.core.filter_interface.Filter, *args: Any, executor: concurrent.futures._base.Executor | None = None, use_batch: bool = True, **kwargs: Any)
24    def __init__(
25        self,
26        sync_filter: Filter,
27        *args: Any,
28        executor: Executor | None = None,
29        use_batch: bool = True,
30        **kwargs: Any,
31    ):
32        """
33        Adapter class for executing hojichar.Filter asynchronously.
34        Used to incorporate Filter into AsyncCompose.
35
36        To reduce the overhead of asynchronous context switching,
37        use_batch is set to True by default to process in batches
38        in apply_stream, regardless of the sync_filter's use_batch setting.
39
40        If performing CPU-bound and heavy processing, you can specify an executor
41        to offload the processing to the executor. However, due to Python's GIL
42        constraints, using ThreadPoolExecutor will not parallelize CPU-bound
43        processing, and the entire process will be locked.
44
45        By using ProcessPoolExecutor as the executor, it may be possible to
46        parallelize CPU-bound processing. However, for parallelizing CPU-bound
47        processing, it is recommended to use the hojichar.Parallel class to
48        parallelize synchronous Compose pipeline.
49        """
50        super().__init__(*args, use_batch=use_batch, **kwargs)
51        self.sync_filter = sync_filter
52        self._has_external_executor = executor is not None
53        self._executor = executor or ThreadPoolExecutor()
54        self.batch_size = sync_filter.batch_size

Adapter class for executing hojichar.Filter asynchronously. Used to incorporate Filter into AsyncCompose.

To reduce the overhead of asynchronous context switching, use_batch is set to True by default to process in batches in apply_stream, regardless of the sync_filter's use_batch setting.

If performing CPU-bound and heavy processing, you can specify an executor to offload the processing to the executor. However, due to Python's GIL constraints, using ThreadPoolExecutor will not parallelize CPU-bound processing, and the entire process will be locked.

By using ProcessPoolExecutor as the executor, it may be possible to parallelize CPU-bound processing. However, for parallelizing CPU-bound processing, it is recommended to use the hojichar.Parallel class to parallelize synchronous Compose pipeline.

async def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
56    async def apply(self, document: Document) -> Document:
57        loop = asyncio.get_running_loop()
58        return await loop.run_in_executor(self._executor, self.sync_filter.apply, document)

Definition of async filter behavior.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

async def apply_batch( self, batch: Sequence[hojichar.core.models.Document]) -> list[hojichar.core.models.Document]:
60    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
61        loop = asyncio.get_running_loop()
62        return await loop.run_in_executor(
63            self._executor,
64            lambda: self.sync_filter.apply_batch(batch),
65        )

Apply the filter to a Sequence of documents. By default, the processing implemented in apply is executed asynchronously and concurrently. If the filter processing can be optimized for batch processing, override this method.

async def shutdown(self) -> None:
67    async def shutdown(self) -> None:
68        self.sync_filter.shutdown()
69        if not self._has_external_executor:
70            self._executor.shutdown()

You can override this method to release resources or perform cleanup tasks.

class AsyncCompose(hojichar.core.async_filter_interface.AsyncFilter):
 73class AsyncCompose(AsyncFilter):
 74    def __init__(
 75        self,
 76        filters: list[AsyncFilter | Filter],
 77        random_state: int | np.random.Generator | None = None,
 78        executor: ThreadPoolExecutor | None = None,
 79        *args: Any,
 80        **kwargs: Any,
 81    ):
 82        super().__init__(random_state=random_state, *args, **kwargs)
 83        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
 84        self._statistics.name = "Total"
 85        self._has_external_executor = executor is not None
 86        self._executor = executor or ThreadPoolExecutor()
 87        self.set_filters(filters)
 88
 89    def set_filters(self, filters: list[AsyncFilter | Filter]) -> None:
 90        self.filters: list[AsyncFilter] = []
 91        filter_idx = 0
 92        for f in filters:
 93            if isinstance(f, (AsyncCompose, Compose)):
 94                for sub in f.filters:
 95                    name = f"{filter_idx}-{sub.__class__.__name__}"
 96                    if isinstance(sub, Filter):
 97                        name = f"{filter_idx}-{sub.__class__.__name__}"
 98                        sub = AsyncFilterAdapter(sub, executor=self._executor)
 99
100                    sub._set_rng_if_not_initialized(self._rng)
101                    sub.name = name
102                    sub._statistics.name = name
103                    self.filters.append(sub)
104                    filter_idx += 1
105            else:
106                name = f"{filter_idx}-{f.__class__.__name__}"
107                if isinstance(f, Filter):
108                    name = f"{filter_idx}-{f.__class__.__name__}"
109                    f = AsyncFilterAdapter(f, executor=self._executor)
110                f._set_rng_if_not_initialized(self._rng)
111                f.name = name
112                f._statistics.name = name
113                self.filters.append(f)
114                filter_idx += 1
115
116    async def apply(self, document: Document) -> Document:
117        stat = get_doc_info(document)
118        for filter_idx, filt in enumerate(self.filters):
119            document = await filt._apply(document)
120        new_stat = get_doc_info(document)
121        async with self._stats_lock:
122            self._statistics.update_by_diff(stat, new_stat)
123        return document
124
125    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
126        stats = [get_doc_info(doc) for doc in batch]
127        for i, filt in enumerate(self.filters):
128            batch = await filt._apply_batch(batch)
129        batch = await self._finalize_batch(batch, stats)
130        return list(batch)
131
132    async def apply_stream(
133        self,
134        stream: AsyncIterable[Document] | Iterable[Document],
135    ) -> AsyncGenerator[Document, None]:
136        async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor)
137        async_stream = self._count_input_stats(async_stream)
138
139        for i, filt in enumerate(self.filters):
140            async_stream = filt.apply_stream(async_stream)
141
142        async for doc in async_stream:
143            in_stat = doc.extras["__init_stats"]
144            out_stat = get_doc_info(doc)
145            async with self._stats_lock:
146                self._statistics.update_by_diff(in_stat, out_stat)
147            del doc.extras["__init_stats"]
148            yield doc
149
150    async def _count_input_stats(
151        self, async_stream: AsyncIterable[Document]
152    ) -> AsyncGenerator[Document, None]:
153        async for doc in async_stream:
154            doc.extras["__init_stats"] = get_doc_info(doc)
155            yield doc
156
157    def get_total_statistics(self) -> list[Statistics]:
158        """
159        Get the statistics of the Compose object and sub filters.
160
161        The statistics of the Compose class are stored in an object with the name "Total",
162        and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
163        """
164        stats = []
165        stats.append(self.get_statistics())
166        for i, filt in enumerate(self.filters):
167            stats.append(filt.get_statistics())
168        return stats
169
170    def get_total_statistics_map(self) -> list[dict[str, Any]]:
171        """
172        Get the statistics of the Compose object and sub filters as a list of dictionaries.
173        """
174        stats = self.get_total_statistics()
175        return [stat.to_dict() for stat in stats]
176
177    @property
178    def statistics(self) -> dict:
179        """
180        Deprecated
181
182        Get the statistics of the Compose object and sub filters.
183
184        This property is retained for compatibility with previous versions.
185        Please use `get_total_statistics` or `get_total_statistics_map` instead.
186        """
187        return inspection.statistics_obj_adapter(  # type: ignore
188            self.get_total_statistics()
189        ).get_human_readable_values()
190
191    @property
192    def statistics_obj(self) -> inspection.StatsContainer:
193        """
194        Deprecated
195
196        Get the statistics of the AsyncCompose object and sub filters.
197        This method returns a StatsContainer object which contains the statistics
198        of the AsyncCompose object and sub filters.
199
200        This property is retained for compatibility with previous versions.
201        Please use `get_total_statistics` or `get_total_statistics_map` instead.
202        """
203        return inspection.statistics_obj_adapter(self.get_total_statistics())  # type: ignore
204
205    async def shutdown(self) -> None:
206        for filt in self.filters:
207            await filt.shutdown()
208        if not self._has_external_executor:
209            self._executor.shutdown()
210
211    async def __aenter__(self) -> "AsyncCompose":
212        return self
213
214    async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
215        await self.shutdown()
216        if exc_type is not None:
217            raise exc_value

Helper class that provides a standard way to create an ABC using inheritance.

AsyncCompose( filters: list[hojichar.core.async_filter_interface.AsyncFilter | hojichar.core.filter_interface.Filter], random_state: int | numpy.random._generator.Generator | None = None, executor: concurrent.futures.thread.ThreadPoolExecutor | None = None, *args: Any, **kwargs: Any)
74    def __init__(
75        self,
76        filters: list[AsyncFilter | Filter],
77        random_state: int | np.random.Generator | None = None,
78        executor: ThreadPoolExecutor | None = None,
79        *args: Any,
80        **kwargs: Any,
81    ):
82        super().__init__(random_state=random_state, *args, **kwargs)
83        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
84        self._statistics.name = "Total"
85        self._has_external_executor = executor is not None
86        self._executor = executor or ThreadPoolExecutor()
87        self.set_filters(filters)

Base class for asynchronous filters.

Parameters

p : float The probability of applying the filter. If p is 1, the filter will always be applied. skip_rejected : bool If True, the filter will skip documents that are already rejected. If you want to apply the filter to all documents (e.g., postprocess), set this to False. random_state : Optional[Union[int, np.random.Generator]] Seed for the random number generator. If None is specified, the random number generator managed by the Compose class will be used. use_batch : bool If True, the filter will process documents in batches in the apply_stream method. batch_size : int The size of the batch to process documents in the apply_stream method.

def set_filters( self, filters: list[hojichar.core.async_filter_interface.AsyncFilter | hojichar.core.filter_interface.Filter]) -> None:
 89    def set_filters(self, filters: list[AsyncFilter | Filter]) -> None:
 90        self.filters: list[AsyncFilter] = []
 91        filter_idx = 0
 92        for f in filters:
 93            if isinstance(f, (AsyncCompose, Compose)):
 94                for sub in f.filters:
 95                    name = f"{filter_idx}-{sub.__class__.__name__}"
 96                    if isinstance(sub, Filter):
 97                        name = f"{filter_idx}-{sub.__class__.__name__}"
 98                        sub = AsyncFilterAdapter(sub, executor=self._executor)
 99
100                    sub._set_rng_if_not_initialized(self._rng)
101                    sub.name = name
102                    sub._statistics.name = name
103                    self.filters.append(sub)
104                    filter_idx += 1
105            else:
106                name = f"{filter_idx}-{f.__class__.__name__}"
107                if isinstance(f, Filter):
108                    name = f"{filter_idx}-{f.__class__.__name__}"
109                    f = AsyncFilterAdapter(f, executor=self._executor)
110                f._set_rng_if_not_initialized(self._rng)
111                f.name = name
112                f._statistics.name = name
113                self.filters.append(f)
114                filter_idx += 1
async def apply( self, document: hojichar.core.models.Document) -> hojichar.core.models.Document:
116    async def apply(self, document: Document) -> Document:
117        stat = get_doc_info(document)
118        for filter_idx, filt in enumerate(self.filters):
119            document = await filt._apply(document)
120        new_stat = get_doc_info(document)
121        async with self._stats_lock:
122            self._statistics.update_by_diff(stat, new_stat)
123        return document

Definition of async filter behavior.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

async def apply_batch( self, batch: Sequence[hojichar.core.models.Document]) -> list[hojichar.core.models.Document]:
125    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
126        stats = [get_doc_info(doc) for doc in batch]
127        for i, filt in enumerate(self.filters):
128            batch = await filt._apply_batch(batch)
129        batch = await self._finalize_batch(batch, stats)
130        return list(batch)

Apply the filter to a Sequence of documents. By default, the processing implemented in apply is executed asynchronously and concurrently. If the filter processing can be optimized for batch processing, override this method.

async def apply_stream( self, stream: Union[AsyncIterable[hojichar.core.models.Document], Iterable[hojichar.core.models.Document]]) -> AsyncGenerator[hojichar.core.models.Document, NoneType]:
132    async def apply_stream(
133        self,
134        stream: AsyncIterable[Document] | Iterable[Document],
135    ) -> AsyncGenerator[Document, None]:
136        async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor)
137        async_stream = self._count_input_stats(async_stream)
138
139        for i, filt in enumerate(self.filters):
140            async_stream = filt.apply_stream(async_stream)
141
142        async for doc in async_stream:
143            in_stat = doc.extras["__init_stats"]
144            out_stat = get_doc_info(doc)
145            async with self._stats_lock:
146                self._statistics.update_by_diff(in_stat, out_stat)
147            del doc.extras["__init_stats"]
148            yield doc

Apply the filter to a stream of documents (Iterable or AsyncIterable). If use_batch is set to True at initialization, the filter will process documents in batches. If the stream is not asynchronous, use handle_stream_as_async to convert it to an asynchronous stream.

Even if an exception occurs during processing, the process will continue, and the following actions will be taken:

  • Set the is_rejected flag of the document to True
  • Set the error details in reject_reason
  • Increment the errors count in the statistics retrievable via get_statistics
def get_total_statistics(self) -> list[hojichar.core.models.Statistics]:
157    def get_total_statistics(self) -> list[Statistics]:
158        """
159        Get the statistics of the Compose object and sub filters.
160
161        The statistics of the Compose class are stored in an object with the name "Total",
162        and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
163        """
164        stats = []
165        stats.append(self.get_statistics())
166        for i, filt in enumerate(self.filters):
167            stats.append(filt.get_statistics())
168        return stats

Get the statistics of the Compose object and sub filters.

The statistics of the Compose class are stored in an object with the name "Total", and sub-filters's are stored with names in the format {filter_index}-{filter class name}.

def get_total_statistics_map(self) -> list[dict[str, typing.Any]]:
170    def get_total_statistics_map(self) -> list[dict[str, Any]]:
171        """
172        Get the statistics of the Compose object and sub filters as a list of dictionaries.
173        """
174        stats = self.get_total_statistics()
175        return [stat.to_dict() for stat in stats]

Get the statistics of the Compose object and sub filters as a list of dictionaries.

statistics: dict

Deprecated

Get the statistics of the Compose object and sub filters.

This property is retained for compatibility with previous versions. Please use get_total_statistics or get_total_statistics_map instead.

Deprecated

Get the statistics of the AsyncCompose object and sub filters. This method returns a StatsContainer object which contains the statistics of the AsyncCompose object and sub filters.

This property is retained for compatibility with previous versions. Please use get_total_statistics or get_total_statistics_map instead.

async def shutdown(self) -> None:
205    async def shutdown(self) -> None:
206        for filt in self.filters:
207            await filt.shutdown()
208        if not self._has_external_executor:
209            self._executor.shutdown()

You can override this method to release resources or perform cleanup tasks.