hojichar

HojiChar: The Text Processing Pipeline

PyPI version Python Versions CI wowkflow codecov PyPI - Downloads

Official docs: https://hojichar.github.io/HojiChar/hojichar.html

Features

  • HojiChar provides a way to combine multiple arbitrary text processing tasks into a streamlined pipeline.
  • The sequence of operations can be described declaratively, ensuring portability.
  • HojiChar allows users to gather detailed statistical information from large amounts of text during processing.
  • It enables management of any Python text processing tasks, providing a Command Line Interface (CLI) capable of parallel processing.

Background and what is for HojiChar

Text preprocessing is far from a one-size-fits-all process. Depending on the data source and the specific task at hand, various steps including normalization, noise removal, and filtering may be necessary. Not all texts require the same level of preprocessing. For instance, relatively clean texts may only need minimal filtering, while "dirtier" sources like Common Crawl data often require more thorough processing. As a result, the preprocessing profile has to be tailored to each specific domain.

Many preprocessing operations can be viewed as filters, taking string as input, applying a transformation, and outputting the processed string. Even though these operations might seem straightforward individually, managing them in a multi-layered, efficient manner can be challenging.

Inspired by torchvision.transforms and iver56/audiomentations, HojiChar addresses these challenges. It enables users to define each text processing step as a class inheriting from hojichar.Filter and use hojichar.Compose to chain them together into a single filter. By writing out the Compose recipe as a profile, the preprocessing process for a specific domain's text can be made portable. Moreover, Compose automatically logs various metrics for each filter, such as byte changes, processing time, and number of rejected texts. This allows users to assess the validity of each operation and consider trade-offs between computation time and performance.

While there are other text normalization tools available, most are designed to perform a specific set of operations. Text preprocessing, despite its importance in the LLM era, is often considered a mundane task compared to machine learning or artificial intelligence tasks. As a result, many existing solutions can be ad hoc, poorly maintained, or inadequately tested. Recognizing these issues, we developed HojiChar as a robust tool for configuring text preprocessing.

Install

pip install hojichar

If you want to use the additional filters, install the package with the following command:

pip install 'hojichar[all]'

If you want to use AsyncChatAPI filter, install the package with the following command:

pip install 'hojichar[openai]'

Defining a Compose Object

The Compose class in HojiChar allows you to create a sequence of text processing filters.

from hojichar import Compose, document_filters

cleaner = Compose([
    document_filters.JSONLoader(key="text"),
    document_filters.AcceptJapanese(),
    document_filters.DocumentLengthFilter(min_doc_len=0,max_doc_len=1000),
    document_filters.ExampleHojiChar(),
    document_filters.JSONDumper()
])

When a Compose object is called, it accepts a string and returns the processed string.

>>> cleaner('{"text": "こんにちは、"}')
{"text": "こんにちは、<hojichar>"}

The filter pipeline above accomplishes the following steps:

  1. Extracts the value from the 'text' key in the JSON object.
  2. Discards the string if it's not in Japanese.
  3. Rejects any text shorter than 0 characters or longer than 1000 characters.
  4. Appends <hojichar> to the string.
  5. Outputs the processed string as JSON with the key "text".

The filters used in the pipeline are predefined filters found in hojichar.filters.

While HojiChar provides some fundamental text processing filters and plans to add more in the future, users can also define their custom filters.

User-defined Filters

A filter composing a Compose object is a class that inherits the Filter class and implements the text processing within the apply function.

from hojichar.core.filter_interface import Filter

class YourFilter(Filter):
    def apply(self, document):
        text = document.text
        """
        Write your text transformation...
        """
        document.text = text
        return document

The apply method accepts a hojichar.Document type as an argument and returns it after the transformations. The Document is a class that encapsulates a string.

The Document class can have additional metadata via the extras attribute. This allows you to associate values with the document that can be utilized in subsequent filters. Reject documents

  • The hojichar.Document has an is_rejected attribute. If a filter sets this flag to True, Compose will discard the document during processing.

Definition of __init__ for custom filter

When creating a user-defined class and applying a custom constructor, make sure to initialize the parent class.

class YourFilter(Filter):
    def __init__(self, your_param, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.your_param = your_param

    def apply(self, document):
        text = document.text
        text = process(text, self.your_param)
        document.text = text
        return document

This is because The Filter class implicitly has several arguments, one of which is p.

cleaner = Compose([
    document_filters.JSONLoader(key="text"),
    document_filters.AcceptJapanese(p=0.5),
    document_filters.JSONDumper()
])

The p argument passed to the document_filters.AcceptJapanese constructor determines the probability of applying the filter; with a probability of 1-p, it acts as an identity function. This behavior is defined in the parent class hojichar.Filter.

Batch and Stream Processing with apply_batch and apply_stream

The Filter and Compose classes support efficient batch and stream processing through the apply_batch and apply_stream methods.

apply_batch

  • The apply_batch method processes a list of Document objects in one go. By default, it applies the apply method to each document individually.
  • Users can override apply_batch in custom filters for optimized batch operations.
    class YourBatchFilter(Filter):
        def apply_batch(self, documents: Sequence[Document]) -> list[Document]:
            # Implement your batch processing logic here
            return documents
    

apply_stream

The apply_stream method processes an iterable (e.g., generator) of Document objects, ideal for large datasets or stream-based processing. If the use_batch flag is set to True in a Filter's constructor, its apply_batch implementation will be utilized during stream processing.

Example Usage:

stream = (Document(f"text {i}") for i in range(10000))
processed_stream = cleaner.apply_stream(stream)

for doc in processed_stream:
    print(doc.text)

This allows HojiChar to efficiently process massive corpora while maintaining low memory consumption.

Additional Notes on Compose

  • Even though the behavior of a Compose object when called is a text-in, text-out function, Compose itself also inherits from the Filter class. Therefore, applying the apply method to a Compose object results in hojihcar.Document class being used as input and output.
  • Compose class behaves like a Filter. If you add a Compose object as one of the filters in the constructor of Compose, the filter will be unfolded recursively.

HojiChar running asynchronously

  • HojiChar supports asynchronous processing of text data using the AsyncCompose class. This allows you to build pipelines that can handle out-of-CPU processing, such as making API calls.
  • You can define async versions of filter using the AsyncFilter class.

    from hojichar import AsyncFilter
    
    class YourAsyncFilter(AsyncFilter):
        async def apply(self, document):
            text = document.text
            # Perform asynchronous processing here
            document.text = text
            return document
    
  • The AsyncCompose class accepts both Filter and AsyncFilter objects, allowing you to mix synchronous and asynchronous filters in a single pipeline.

Example

Nowadays, text processing is enhanced by the intelligence of LLMs.

This example demonstrates how to use the AsyncChatAPI filter to process text data with OpenAI compatible APIs. This filter allows you to build high throughput of "Chain of LLMs" easily.

import os

from hojichar import AsyncCompose
from hojichar.filters.document_filters import JSONLoader, JSONDumper
from hojichar.utils.async_handlers import write_stream_to_file


async_pipeline = AsyncCompose(
    [
        JSONLoader(input_key="text"),
        AsyncChatAPI(
            model_id="gpt-4o",
            openai_endpoint_url="https://api.openai.com/v1", 
            openai_api_key=os.getenv("OPENAI_API_KEY"),
            max_concurrent_requests=128,
            output_key="llm_output",
            message_generator=lambda doc: [{"role": "user", "content": doc.text[:1000]}],
        ),
        JSONDumper(export_extras=True),
    ]
)

with open("input.jsonl") as f:
    with async_pipeline:
        async_output_stream = (str(doc) async for doc in async_pipeline.apply_stream(f))
        await write_stream_to_file(async_output_stream, "output.jsonl", chunk_size=128) # Write async-iterable to file efficiently
  • You can use this filter by installing 'hojichar[openai]'
  • The filter works with OpenAI compatible APIs, like the endpoint hosted by vLLM. It's useful for text-augumentation tasks.
    • The AsyncChatAPI works 1K req/sec with optimized vLLM server. (We reccomend to use uvloop to get better throughput.)

Get Metrics of processing

HojiChar tracks detailed statistics at both the filter and pipeline levels, helping you monitor and debug your processing pipeline.

Each Filter (including Compose) maintains a Statistics object containing information such as input size, output size, discarded document count, and processing time.

Example: Getting Statistics from a Compose Object

stats = cleaner.get_total_statistics_map()
print(stats)
[{'cumulative_time_ns': 337250,
  'diff_bytes': 10,
  'diff_chars': 10,
  'discard_num': 0,
  'input_bytes': 45,
  'input_chars': 23,
  'input_num': 1,
  'name': 'Total',
  'output_bytes': 55,
  'output_chars': 33,
  'output_num': 1},
 {'cumulative_time_ns': 80209,
  'diff_bytes': -12,
  'diff_chars': -12,
  'discard_num': 0,
  'input_bytes': 45,
  'input_chars': 23,
  'input_num': 1,
  'name': '0-JSONLoader',
  'output_bytes': 33,
  'output_chars': 11,
  'output_num': 1},
 {'cumulative_time_ns': 17500,
  'diff_bytes': 0,
  'diff_chars': 0,
  'discard_num': 0,
  'input_bytes': 33,
  'input_chars': 11,
  'input_num': 1,
  'name': '1-AcceptJapanese',
  'output_bytes': 33,
  'output_chars': 11,
  'output_num': 1},
 {'cumulative_time_ns': 8125,
  'diff_bytes': 0,
  'diff_chars': 0,
  'discard_num': 0,
  'input_bytes': 33,
  'input_chars': 11,
  'input_num': 1,
  'name': '2-DocumentLengthFilter',
  'output_bytes': 33,
  'output_chars': 11,
  'output_num': 1},
 {'cumulative_time_ns': 6042,
  'diff_bytes': 10,
  'diff_chars': 10,
  'discard_num': 0,
  'input_bytes': 33,
  'input_chars': 11,
  'input_num': 1,
  'name': '3-ExampleHojiChar',
  'output_bytes': 43,
  'output_chars': 21,
  'output_num': 1},
 {'cumulative_time_ns': 81125,
  'diff_bytes': 12,
  'diff_chars': 12,
  'discard_num': 0,
  'input_bytes': 43,
  'input_chars': 21,
  'input_num': 1,
  'name': '4-JSONDumper',
  'output_bytes': 55,
  'output_chars': 33,
  'output_num': 1}]
  • Use get_statistics() to get the raw Statistics object for any filter.
  • Use get_total_statistics() to get a list of statistics for all filters in a Compose pipeline.
  • Use get_total_statistics_map() to retrieve the statistics as a list of dicts.

These tools allow granular monitoring of how each filter contributes to data reduction, rejection, or transformation.

Parallel application of Compose

The hojichar.Parallel class allows for the application of Compose to an iterable of Document concurrently. This class empowers users to process vast collections of documents by harnessing the power of multiple CPU cores.

Example usage of Parallel class to proces a very large JSON Lines file concurrently.

import hojichar

input_file = "your_text.jsonl"
input_doc_iter = (hojichar.Document(line) for line in open(input_file))

cleaner = hojichar.Compose([
    hojichar.document_filters.JSONLoader(),
    hojichar.document_filters.DocumentNormalizer(),
    # Insert your filters
    hojichar.document_filters.JSONDumper(),
])

with hojichar.Parallel(cleaner, num_jobs=10) as pfilter:
    out_doc_iter = pfilter.imap_apply(input_doc_iter)
    with open("your_processed_text.jsonl", "w") as fp:
        for doc in out_doc_iter:
            fp.write(doc.text + "\n")
  • Always use the Parallel class within a with statement.
  • Parallel.imap_apply(doc_iter) processes an iterator of Document and returns an iterator of the processed documents.
  • For additional options and details about the Parallel class, please refer to the official documentation.

CLI tool and preprocessing profile

  • HojiChar provides CLI tools for text preprocess pipeline.
  • User defines a series of preprocessing into a python file as profile.

  • Example:

    cat <your_text.jsonl> | hojichar -p your_preprocessing_profile.py -o your_text_preprocessed.jsonl
    
  • hojichar --help

    usage: hojichar [-h] --profile <profile.py> [--args ARGS [ARGS ...]] [--output OUTPUT] [--input INPUT] [--dump-stats <path to stats.json>] [--exit-on-error] [--all] [--jobs JOBS]
    
    options:
    -h, --help            show this help message and exit
    --profile <profile.py>, -p <profile.py>
                            Path to a Python file that implements your custom filter.
    --args ARGS [ARGS ...]
                            Pass additional arguments to the profile. Use it like `--args arg1 arg2` etc. The arguments should be space-separated.
    --output OUTPUT, -o OUTPUT
                            Specifies the path for the output file. Defaults to standard output.
    --input INPUT, -i INPUT
                            Specifies the path for the input file. Defaults to standard input. If set this path, the progress bar is enabled.
    --dump-stats <path to stats.json>
                            Dump statistics to file. If the file exists, it will be appended.
    --exit-on-error       Exit if an exception occurs during filtering. Useful for debugging custom filters.
    --all                 A flag that specifies whether to include discarded samples. This is useful when inspecting discarded samples.
    --jobs JOBS, -j JOBS  The number ob parallel jobs. By default, the nuber of the CPU core.
    

Definition of Profile

  • HojiChar CLI receives a series of preprocessing as a profile.
  • The preprocessing profile is provided as a Python file. Two patterns of the file are allowed.
  • hojichar.utils.load_compose.load_compose() loads these profile.

FILTER profile

  • hojichar.Compose must be defined as FILTER variable.
  • Example.

    import json
    
    from hojichar import Compose, Filter
    from hojichar.filters.document_filters import ExampleHojiChar, JSONLoader
    
    
    class JSONDumper(Filter):
        def apply(self, document):
            text = document.text
            document.text = json.dumps({"text": text}, ensure_ascii=False)
            return document
    
    # FILTER must define Compose object.
    FILTER = Compose(
        [
            JSONLoader(),
            ExampleHojiChar(),
            JSONDumper(),
        ]
    )
    
    • Pass the texts to the filter you have defined using a pipe as follows.
      cat <your_file> | hojichar -p example_profile.py
      
  • hojichar.utils.load_compose.load_filter_from_file() loads this type of profile.

FACTORY profile

  • A callable function that returns hojichar.Compose must be defined as FACTORY variable.
  • The callable can receive arguments. In this way, parameters can be passed to the profile.
    • Some kinds of value are not preferred to static. For example, random seeds and some flags modify the behavior of a filter, etc
    • FACTORY provides a mechanism to pass those values as arguments to the preprocessing.
  • Example.

    import json
    
    from hojichar import Compose, Filter
    from hojichar.filters.document_filters import JSONLoader
    
    
    class AddSomething(Filter): #  Concat some value after every document.
      def __init__(self, something: str, *args, **kwargs) -> None:
          self.something = something
    
      def apply(self, document):
          text = document.text + self.something
          document.text = text
          return document
    
    class JSONDumper(Filter):
      def apply(self, document):
          text = document.text
          document.text = json.dumps({"text": text}, ensure_ascii=False)
          return document
    
    
    def callback(something):
      return Compose(
          [
              JSONLoader(),
              AddSomething(something),
              JSONDumper(),
          ]
      )
    
    # FACTORY must be callable which returns Compose object.
    FACTORY = callback
    
  • Using FACTORY profile with arguments in CLI.

    cat <your_file> | hojichar -p example_profile.py --args arg1 arg2
    
  • hojichar.utils.load_compose.load_parametrized_filter_from_file() or load_factory_from_file loads this type of profile.

For Developers

Installing from the Source Directory

To install the package, execute the following commands:

git clone https://github.com/HojiChar/HojiChar.git
cd HojiChar
uv sync --all-extras

Testing

Some filters incorporate doctests. You can run these tests with the command:

pytest --doctest-modules .

This command should be executed from the root of the project.

Code style

  • HojiChar requires type hints for all code. Type checking is performed in continuous integration (CI) in addition to the pytest tests.
  • HojiChar code is subject to inspection and formatting by the ruff Linter. For configuration details, please refer to pyproject.toml. You can perform linting and formatting from the root of the project using the following commands:

Linting

uvx ruff check .

Formatting

uvx ruff format .

Building the Documentation

We use Pdoc for building the documentation. You can build the documentation using the following command:

pdoc -o docs hojichar

Run this command from the project root.

In practice, the process of building the documentation is automated by CI. When a Pull Request is merged into the main branch, the documentation is built in the docs/ directory of the docs branch. This directory is then deployed to the official documentation site by GitHub Pages.

Creating a Source Tarball

To create a source tarball, for instance, for packaging or distribution, run the following command:

uv build

The tarball will be created in the dist directory. This command will compile the source code, and the resulting tarball can be installed with no additional dependencies other than the Python standard library.

Creating a Release and Uploading it to PyPI

Versions uploaded to PyPI are identified by git tags. The __version__ variable in __init__.py or the version entry in pyproject.toml are ignored. The uv-dynamic-versioning plugin is used to implement this process.

The steps to push to PyPI are as follows, although in actuality, the process is automated by CI when a GitHub release is created from the tag.

git checkout v0.1.2
uv build
uv publish --index testpypi --token ${PYPI_TOKEN}

The actual task for the manager is to apply the appropriate tag to the commit to be released and to create the release from GitHub:

git tag -a v0.1.2 -m "Version 0.1.2"
git push origin v0.1.2
 1"""
 2.. include:: ../README.md
 3"""
 4
 5from .core.async_composition import AsyncCompose, AsyncFilterAdapter
 6from .core.async_filter_interface import AsyncFilter
 7from .core.composition import Compose
 8from .core.filter_interface import Filter, TokenFilter
 9from .core.inspection import StatsContainer
10from .core.models import Document, Token
11from .core.parallel import Parallel
12from .filters import (
13    deduplication,
14    document_filters,
15    language_identification,
16    token_filters,
17    tokenization,
18)
19
20__version__ = "0.0.0"  # Replaced by uv-dynamic-versioning when deploying
21
22__all__ = [
23    "core",
24    "filters",
25    "utils",
26    "Compose",
27    "Filter",
28    "TokenFilter",
29    "Document",
30    "Token",
31    "Parallel",
32    "StatsContainer",
33    "deduplication",
34    "document_filters",
35    "language_identification",
36    "token_filters",
37    "tokenization",
38    "AsyncCompose",
39    "AsyncFilterAdapter",
40    "AsyncFilter",
41]
class Compose(hojichar.Filter):
 15class Compose(Filter):
 16    def __init__(
 17        self,
 18        filters: List[Union[Filter, TokenFilter]],
 19        random_state: Optional[Union[int, np.random.Generator]] = None,
 20        *args: Any,
 21        **kwargs: Any,
 22    ) -> None:
 23        """
 24        Compose a filter from pre-defined filter-objects.
 25        Filter which has `skip_rejected` flag ignores a document which has `is_rejected` flag.
 26        By doing so, Compose avoid applying filters that do not affect the output.
 27
 28        Parameters
 29        ----------
 30        filters : List[Union[Filter, TokenFilter]]
 31            Filter instances which apply to the corpus.
 32
 33        random_state : Union[None, int, np.random.Generator], optional
 34            Default = None
 35            Seed for applying filters randomly.
 36            `random_state` must be int or np.random.Generator instance.
 37        """
 38        super().__init__(random_state=random_state, *args, **kwargs)
 39        self.set_filters(filters)
 40        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
 41
 42        self._statistics.name = "Total"
 43
 44    def set_filters(self, filters: List[Union[Filter, TokenFilter]]) -> None:
 45        """
 46        Set the filter to a Compose object. The filter is expanded if the
 47        list of filters in the argument contains a filter bound by Compose.
 48
 49        Args:
 50            filters (List[Union[Filter, TokenFilter]]): Target filters
 51        """
 52        self.filters: List[Union[Filter, TokenFilter]] = []
 53
 54        filter_idx = 0
 55        for f in filters:
 56            if isinstance(f, Compose):
 57                for sub in f.filters:
 58                    sub._set_rng_if_not_initialized(self._rng)
 59                    name = f"{filter_idx}-{sub.__class__.__name__}"
 60                    sub.name = name
 61                    sub._statistics.name = name
 62                    self.filters.append(sub)
 63                    filter_idx += 1
 64            else:
 65                f._set_rng_if_not_initialized(self._rng)
 66                name = f"{filter_idx}-{f.__class__.__name__}"
 67                f.name = name
 68                f._statistics.name = name
 69                self.filters.append(f)
 70                filter_idx += 1
 71
 72    def __call__(self, text: str, **kwargs: Any) -> str:
 73        """
 74        Apply the composed filter to a text and return the processed text.
 75        If the document is rejected, return an empty string.
 76        """
 77        document = Document(text, **kwargs)
 78        document = self.apply(document)
 79        if document.is_rejected:
 80            return ""
 81        else:
 82            return document.text
 83
 84    def apply(self, document: Document) -> Document:
 85        """
 86        Apply the composed filter to a document and return the processed document.
 87        """
 88        stat = get_doc_info(document)
 89        for i, filt in enumerate(self.filters):
 90            document = filt._apply(document)
 91        new_stat = get_doc_info(document)
 92        self._statistics.update_by_diff(stat, new_stat)
 93        return document
 94
 95    def apply_batch(self, batch: Sequence[Document]) -> List[Document]:
 96        """
 97        Apply the composed filter to a batch of documents and return the processed documents.
 98        The `apply_batch` method implemented in sub-filters is called in order.
 99        """
100
101        stats = [get_doc_info(doc) for doc in batch]
102        for i, filt in enumerate(self.filters):
103            batch = filt._apply_batch(batch)
104        batch = self._finalize_batch(batch, stats)
105        return list(batch)
106
107    def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]:
108        """
109        Apply the composed filter to a stream of documents and return the processed documents.
110        The `apply_stream` method implemented in sub-filters is called in order.
111
112
113        In a sub-filter, if `apply_batch` is overridden and implemented, you need to set `use_batch`
114        to True at that filter to utilize that implementation. Otherwise, the
115        method implemented in `apply` will be applied to the stream.
116        """
117        stream = self._count_input_stats(stream)
118        for i, filt in enumerate(self.filters):
119            stream = filt.apply_stream(stream)
120
121        for doc in stream:
122            in_stat = doc.extras["__init_stats"]
123            out_stat = get_doc_info(doc)
124
125            self._statistics.update_by_diff(in_stat, out_stat)
126            del doc.extras["__init_stats"]
127            yield doc
128
129    def _count_input_stats(self, stream: Iterable[Document]) -> Iterable[Document]:
130        for doc in stream:
131            doc.extras["__init_stats"] = get_doc_info(doc)
132            yield doc
133
134    def get_total_statistics(self) -> List[Statistics]:
135        """
136        Get the statistics of the Compose object and sub filters.
137
138        The statistics of the Compose class are stored in an object with the name "Total",
139        and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
140        """
141        stats = []
142        stats.append(self.get_statistics())
143        for i, filt in enumerate(self.filters):
144            stats.append(filt.get_statistics())
145        return stats
146
147    def get_total_statistics_map(self) -> List[Dict[str, Any]]:
148        """
149        Get the statistics of the Compose object and sub filters as a list of dictionaries.
150        """
151        stats = self.get_total_statistics()
152        return [stat.to_dict() for stat in stats]
153
154    def shutdown(self) -> None:
155        for f in self.filters:
156            f.shutdown()
157
158        super().shutdown()
159
160    @property
161    def statistics(self) -> dict:
162        """
163        Deprecated
164
165        Get the statistics of the Compose object and sub filters.
166
167        This property is retained for compatibility with previous versions.
168        Please use `get_total_statistics` or `get_total_statistics_map` instead.
169        """
170        return inspection.statistics_obj_adapter(  # type: ignore
171            self.get_total_statistics()
172        ).get_human_readable_values()
173
174    @property
175    def statistics_obj(self) -> inspection.StatsContainer:
176        """
177        Deprecated
178
179        Get the statistics of the Compose object and sub filters.
180        This method returns a StatsContainer object which contains the statistics
181        of the Compose object and sub filters.
182
183        This property is retained for compatibility with previous versions.
184        Please use `get_total_statistics` or `get_total_statistics_map` instead.
185        """
186        return inspection.statistics_obj_adapter(self.get_total_statistics())  # type: ignore
187
188    @deprecated_since("1.0.0", "get_total_statistics")
189    def summary(self, format: str = "print") -> None:
190        info = [
191            {
192                "layer": i,
193                "name": filt.name,
194                "doc": filt.__doc__,
195            }
196            for i, filt in enumerate(self.filters)
197        ]
198
199        def to_json(filter_info: dict) -> dict:
200            filter_info["doc"] = "".join(d.strip() for d in filter_info["doc"].split("\n"))
201            return filter_info
202
203        if format == "json":
204            print(json.dumps(list(map(to_json, info)), ensure_ascii=False, indent="\t"))
205        if format == "print":
206            for layer in info:
207                print(f"[{layer['layer']}] {layer['name']}")
208                pprint.pprint(layer["doc"])

Base class for all filters. Document-level filters must inherit from this class.

The definition of text processing is in apply method. If you define a new filter, override the method.

When this class is called, apply the filter from string to string.

With context manager, you can use the filter as follows:

with YourFilter(p=0.5) as filt:
    text = filt("This is a sample text.")
Compose( filters: List[Union[hojichar.Filter, hojichar.TokenFilter]], random_state: Union[int, numpy.random._generator.Generator, NoneType] = None, *args: Any, **kwargs: Any)
16    def __init__(
17        self,
18        filters: List[Union[Filter, TokenFilter]],
19        random_state: Optional[Union[int, np.random.Generator]] = None,
20        *args: Any,
21        **kwargs: Any,
22    ) -> None:
23        """
24        Compose a filter from pre-defined filter-objects.
25        Filter which has `skip_rejected` flag ignores a document which has `is_rejected` flag.
26        By doing so, Compose avoid applying filters that do not affect the output.
27
28        Parameters
29        ----------
30        filters : List[Union[Filter, TokenFilter]]
31            Filter instances which apply to the corpus.
32
33        random_state : Union[None, int, np.random.Generator], optional
34            Default = None
35            Seed for applying filters randomly.
36            `random_state` must be int or np.random.Generator instance.
37        """
38        super().__init__(random_state=random_state, *args, **kwargs)
39        self.set_filters(filters)
40        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
41
42        self._statistics.name = "Total"

Compose a filter from pre-defined filter-objects. Filter which has skip_rejected flag ignores a document which has is_rejected flag. By doing so, Compose avoid applying filters that do not affect the output.

Parameters

filters : List[Union[Filter, TokenFilter]] Filter instances which apply to the corpus.

random_state : Union[None, int, np.random.Generator], optional Default = None Seed for applying filters randomly. random_state must be int or np.random.Generator instance.

def set_filters( self, filters: List[Union[hojichar.Filter, hojichar.TokenFilter]]) -> None:
44    def set_filters(self, filters: List[Union[Filter, TokenFilter]]) -> None:
45        """
46        Set the filter to a Compose object. The filter is expanded if the
47        list of filters in the argument contains a filter bound by Compose.
48
49        Args:
50            filters (List[Union[Filter, TokenFilter]]): Target filters
51        """
52        self.filters: List[Union[Filter, TokenFilter]] = []
53
54        filter_idx = 0
55        for f in filters:
56            if isinstance(f, Compose):
57                for sub in f.filters:
58                    sub._set_rng_if_not_initialized(self._rng)
59                    name = f"{filter_idx}-{sub.__class__.__name__}"
60                    sub.name = name
61                    sub._statistics.name = name
62                    self.filters.append(sub)
63                    filter_idx += 1
64            else:
65                f._set_rng_if_not_initialized(self._rng)
66                name = f"{filter_idx}-{f.__class__.__name__}"
67                f.name = name
68                f._statistics.name = name
69                self.filters.append(f)
70                filter_idx += 1

Set the filter to a Compose object. The filter is expanded if the list of filters in the argument contains a filter bound by Compose.

Args: filters (List[Union[Filter, TokenFilter]]): Target filters

def apply( self, document: hojichar.Document) -> hojichar.Document:
84    def apply(self, document: Document) -> Document:
85        """
86        Apply the composed filter to a document and return the processed document.
87        """
88        stat = get_doc_info(document)
89        for i, filt in enumerate(self.filters):
90            document = filt._apply(document)
91        new_stat = get_doc_info(document)
92        self._statistics.update_by_diff(stat, new_stat)
93        return document

Apply the composed filter to a document and return the processed document.

def apply_batch( self, batch: Sequence[hojichar.Document]) -> List[hojichar.Document]:
 95    def apply_batch(self, batch: Sequence[Document]) -> List[Document]:
 96        """
 97        Apply the composed filter to a batch of documents and return the processed documents.
 98        The `apply_batch` method implemented in sub-filters is called in order.
 99        """
100
101        stats = [get_doc_info(doc) for doc in batch]
102        for i, filt in enumerate(self.filters):
103            batch = filt._apply_batch(batch)
104        batch = self._finalize_batch(batch, stats)
105        return list(batch)

Apply the composed filter to a batch of documents and return the processed documents. The apply_batch method implemented in sub-filters is called in order.

def apply_stream( self, stream: Iterable[hojichar.Document]) -> Iterable[hojichar.Document]:
107    def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]:
108        """
109        Apply the composed filter to a stream of documents and return the processed documents.
110        The `apply_stream` method implemented in sub-filters is called in order.
111
112
113        In a sub-filter, if `apply_batch` is overridden and implemented, you need to set `use_batch`
114        to True at that filter to utilize that implementation. Otherwise, the
115        method implemented in `apply` will be applied to the stream.
116        """
117        stream = self._count_input_stats(stream)
118        for i, filt in enumerate(self.filters):
119            stream = filt.apply_stream(stream)
120
121        for doc in stream:
122            in_stat = doc.extras["__init_stats"]
123            out_stat = get_doc_info(doc)
124
125            self._statistics.update_by_diff(in_stat, out_stat)
126            del doc.extras["__init_stats"]
127            yield doc

Apply the composed filter to a stream of documents and return the processed documents. The apply_stream method implemented in sub-filters is called in order.

In a sub-filter, if apply_batch is overridden and implemented, you need to set use_batch to True at that filter to utilize that implementation. Otherwise, the method implemented in apply will be applied to the stream.

def get_total_statistics(self) -> List[hojichar.core.models.Statistics]:
134    def get_total_statistics(self) -> List[Statistics]:
135        """
136        Get the statistics of the Compose object and sub filters.
137
138        The statistics of the Compose class are stored in an object with the name "Total",
139        and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
140        """
141        stats = []
142        stats.append(self.get_statistics())
143        for i, filt in enumerate(self.filters):
144            stats.append(filt.get_statistics())
145        return stats

Get the statistics of the Compose object and sub filters.

The statistics of the Compose class are stored in an object with the name "Total", and sub-filters's are stored with names in the format {filter_index}-{filter class name}.

def get_total_statistics_map(self) -> List[Dict[str, Any]]:
147    def get_total_statistics_map(self) -> List[Dict[str, Any]]:
148        """
149        Get the statistics of the Compose object and sub filters as a list of dictionaries.
150        """
151        stats = self.get_total_statistics()
152        return [stat.to_dict() for stat in stats]

Get the statistics of the Compose object and sub filters as a list of dictionaries.

def shutdown(self) -> None:
154    def shutdown(self) -> None:
155        for f in self.filters:
156            f.shutdown()
157
158        super().shutdown()

This method is called when the filter is no longer needed. You can override this method to release resources or perform cleanup tasks.

statistics: dict

Deprecated

Get the statistics of the Compose object and sub filters.

This property is retained for compatibility with previous versions. Please use get_total_statistics or get_total_statistics_map instead.

statistics_obj: hojichar.StatsContainer

Deprecated

Get the statistics of the Compose object and sub filters. This method returns a StatsContainer object which contains the statistics of the Compose object and sub filters.

This property is retained for compatibility with previous versions. Please use get_total_statistics or get_total_statistics_map instead.

@deprecated_since('1.0.0', 'get_total_statistics')
def summary(self, format: str = 'print') -> None:
188    @deprecated_since("1.0.0", "get_total_statistics")
189    def summary(self, format: str = "print") -> None:
190        info = [
191            {
192                "layer": i,
193                "name": filt.name,
194                "doc": filt.__doc__,
195            }
196            for i, filt in enumerate(self.filters)
197        ]
198
199        def to_json(filter_info: dict) -> dict:
200            filter_info["doc"] = "".join(d.strip() for d in filter_info["doc"].split("\n"))
201            return filter_info
202
203        if format == "json":
204            print(json.dumps(list(map(to_json, info)), ensure_ascii=False, indent="\t"))
205        if format == "print":
206            for layer in info:
207                print(f"[{layer['layer']}] {layer['name']}")
208                pprint.pprint(layer["doc"])
class Filter(abc.ABC):
 22class Filter(ABC):
 23    """
 24    Base class for all filters.
 25    Document-level filters must inherit from this class.
 26
 27    The definition of text processing is in `apply` method.
 28    If you define a new filter, override the method.
 29
 30    When this class is called, apply the filter from string to string.
 31
 32    With context manager, you can use the filter as follows:
 33    ```python
 34    with YourFilter(p=0.5) as filt:
 35        text = filt("This is a sample text.")
 36    ```
 37
 38    """
 39
 40    def __init__(
 41        self,
 42        p: float = 1.0,
 43        skip_rejected: bool = True,
 44        *args: Any,
 45        random_state: Optional[Union[int, np.random.Generator]] = None,
 46        use_batch: bool = False,
 47        batch_size: int = 128,
 48        **kwargs: Any,
 49    ) -> None:
 50        """
 51        Initialize the filter.
 52        Parameters
 53        ----------
 54        p : float
 55            The probability of applying the filter.
 56            If `p` is 1, the filter will always be applied.
 57        skip_rejected : bool
 58            If `True`, the filter will skip documents that are already rejected.
 59            If you want to apply the filter to all documents (e.g., postprocess), set this to `False`.
 60        random_state : Optional[Union[int, np.random.Generator]]
 61            Seed for the random number generator.
 62            If `None`, a new random number generator will be created.
 63            If `None`, and use in the `Compose` class, the random state is shared with the `Compose` object.
 64        use_batch : bool
 65            If `True`, the filter will process documents in batches in the `apply_stream` method.
 66        batch_size : int
 67            The size of the batch to process documents in the `apply_stream` method.
 68        kwargs : Any
 69            Additional keyword arguments to pass to the filter.
 70        """
 71        self.name = self.__class__.__name__
 72        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
 73        assert 0 <= p <= 1
 74        self.p = p
 75        self.__init_rng(random_state)
 76        self.skip_rejected = skip_rejected
 77        self.use_batch = use_batch
 78        self.batch_size = batch_size
 79
 80        self._statistics: Statistics = Statistics()
 81
 82    @abstractmethod
 83    def apply(self, document: Document) -> Document:
 84        """
 85        Definition of filter behavior.
 86
 87        The document must have a protocol `TextContent`,
 88        and mostly used hojichar.Document class.
 89
 90        In this method, the filter will modify `document.text` or
 91        `document.extras` and set `document.is_rejected = True` to discard the document.
 92
 93        Parameters
 94        ----------
 95        document : Document
 96            Input document
 97
 98        Returns
 99        -------
100        Document
101            Processed Document
102        """
103
104    @deprecated_since(version="1.0.0", alternative="apply")
105    def apply_filter(self, document: Document) -> Document:
106        document = self.apply(document)
107        return document
108
109    def _check_skip(self, document: Document) -> bool:
110        """
111        Check if the document should be skipped by this filter.
112        If `skip_rejected` is set to `True`, this method will return `True`
113        if the document is already rejected.
114        If `p` is less than 1, this method will return `True` with a probability of `1 - p`.
115        """
116        skip = self.skip_rejected and document.is_rejected
117        if skip:
118            return True
119        if self.p < 1:
120            if self._rng.random() > self.p:
121                return True
122        return False
123
124    def _apply(self, document: Document) -> Document:
125        """
126        Apply the filter to a single document.
127        This method
128          - checks if the document should be skipped
129          - counts and logging the statistics
130          - logging the reason for rejection if the document is rejected
131
132        This method may be used in `apply` method of `Compose` class.
133        """
134
135        stats = get_doc_info(document)
136
137        if not self._check_skip(document):
138            document = self.apply(document)
139
140        new_stats = get_doc_info(document)
141        self._statistics.update_by_diff(stats, new_stats)
142
143        if not stats["is_rejected"] and new_stats["is_rejected"]:
144            document.reject_reason = self.get_jsonable_vars()
145
146        return document
147
148    def apply_batch(self, batch: Sequence[Document]) -> List[Document]:
149        """
150        Apply the filter to a batch of documents.
151        You can override this method if you want to
152        apply the filter to a batch of documents at once.
153
154        This method may be used in `apply_batch` method of `Compose` class.
155
156        Parameters
157        ----------
158        documents : Sequence[Document]
159            List-like object of input documents
160
161        Returns
162        -------
163        list[Document]
164            List of processed documents
165        """
166        return [self.apply(document) for document in batch]
167
168    def _apply_batch(self, batch: Sequence[Document]) -> List[Document]:
169        """
170        Apply the filter to a batch of documents.
171        This method
172        - checks if the documents should be skipped
173        - counts and logs the statistics
174        - logs the reason for rejection if any document is rejected
175        """
176        skip = False
177        if self.p < 1:
178            skip = self._rng.random() > self.p
179
180        stats = [get_doc_info(document=doc) for doc in batch]
181        if not skip:
182            batch = self.apply_batch(batch)
183        batch = self._finalize_batch(batch, stats)
184        return batch
185
186    def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]:
187        """
188        Apply the filter to a stream of documents.
189        This method is used when you want to process documents one by one.
190        If `use_batch` is set to `True` in the constructor,
191        this method will process documents in batches using the `apply_batch` method.
192
193        Even if an exception occurs during processing, the process will continue, and the following actions will be taken:
194        - Set the `is_rejected` flag of the document to `True`
195        - Set the error details in `reject_reason`
196        - Increment the `errors` count in the statistics retrievable via `get_statistics`
197
198        Parameters
199        ----------
200        stream : Iterable[Document]
201            Stream of input documents
202
203        Returns
204        -------
205        Iterable[Document]
206            Stream of processed documents
207        """
208
209        if not self.use_batch:
210            for document in stream:
211                yield self._try_process(document, self._apply)
212        else:
213            batch: list[Document] = []
214            for document in stream:
215                if self._check_skip(document):
216                    yield document
217                    continue
218
219                batch.append(document)
220                if len(batch) >= self.batch_size:
221                    stats = [get_doc_info(doc) for doc in batch]
222                    batch = self._try_process(batch, self.apply_batch)
223                    batch = self._finalize_batch(batch, stats)
224                    yield from batch
225                    batch.clear()
226            if batch:
227                stats = [get_doc_info(doc) for doc in batch]
228                batch = self._try_process(batch, self.apply_batch)
229                batch = self._finalize_batch(batch, stats)
230                yield from batch
231
232    def _try_process(self, target: T, func: Callable[[T], T]) -> T:
233        try:
234            return func(target)
235        except Exception as e:
236            if isinstance(target, Document):
237                msg = f"{e!r} occurs while processing {self.name} with {target!r}"
238                target.is_rejected = True
239                target.reject_reason = {"error": msg}
240                self._statistics.errors += 1
241                self.logger.error(msg, exc_info=True)
242                return target  # type: ignore[return-value]
243            if isinstance(target, list):
244                msg = f"{e!r} occurs while batch processing {self.name}"
245                self.logger.error(msg, exc_info=True)
246                for doc in target:
247                    doc.is_rejected = True
248                    doc.reject_reason = {"error": msg}
249                self._statistics.errors += len(target)
250                return target  # type: ignore[return-value]
251            else:
252                raise e
253
254    def __call__(self, text: str, **kwargs: Any) -> str:
255        document = Document(text, **kwargs)
256        document = self._apply(document)
257        return document.text
258
259    def get_statistics(self) -> Statistics:
260        """
261        Get the statistics of this filter.
262        This method returns the statistics of the filter,
263        which includes the number of processed documents, discarded documents, and other statistics.
264        """
265        return self._statistics
266
267    def get_statistics_map(self) -> Dict[str, Any]:
268        """
269        Get the statistics of this filter as a dictionary.
270        """
271        return self._statistics.to_dict()
272
273    def shutdown(self) -> None:
274        """
275        This method is called when the filter is no longer needed.
276        You can override this method to release resources or perform cleanup tasks.
277        """
278        pass
279
280    def __enter__(self) -> "Filter":
281        return self
282
283    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
284        """
285        This method is called when the filter is used in a context manager.
286        It calls the `shutdown` method to release resources or perform cleanup tasks.
287        """
288        self.shutdown()
289
290    def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> Dict[str, Any]:
291        """
292        Get the member variable of this filter.
293        Eligible variables are primitive types; [bool, int, float, str, None],
294        and the name of the variable not starts with the underscore; `_`.
295        """
296        if exclude_keys is None:
297            exclude_keys = set()
298        return {
299            k: v
300            for k, v in vars(self).items()
301            if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_")))
302        }
303
304    def _finalize_batch(
305        self: "Filter",
306        batch: Sequence[Document],
307        old_stats: List[Dict[str, Any]] = [],
308    ) -> List[Document]:
309        new_stats = [get_doc_info(doc) for doc in batch]
310        for old, new, doc in zip(old_stats, new_stats, batch):
311            self._statistics.update_by_diff(old, new)
312            if not old["is_rejected"] and new["is_rejected"]:
313                doc.reject_reason = self.get_jsonable_vars()
314        return list(batch)
315
316    def __init_rng(self, random_state: Optional[Union[int, np.random.Generator]]) -> None:
317        self._owns_rng = True
318        if random_state is None:
319            self._rng = np.random.default_rng()
320            self._owns_rng = False
321        elif isinstance(random_state, int):
322            self._rng = np.random.default_rng(random_state)
323        elif isinstance(random_state, np.random.Generator):
324            self._rng = random_state
325
326    def _set_rng_if_not_initialized(self, rng: np.random.Generator) -> None:
327        """
328        Set the random number generator for this filter if it is not already initialized.
329        This method is called by Compose class.
330        """
331        if not self._owns_rng:
332            self._rng = rng

Base class for all filters. Document-level filters must inherit from this class.

The definition of text processing is in apply method. If you define a new filter, override the method.

When this class is called, apply the filter from string to string.

With context manager, you can use the filter as follows:

with YourFilter(p=0.5) as filt:
    text = filt("This is a sample text.")
Filter( p: float = 1.0, skip_rejected: bool = True, *args: Any, random_state: Union[int, numpy.random._generator.Generator, NoneType] = None, use_batch: bool = False, batch_size: int = 128, **kwargs: Any)
40    def __init__(
41        self,
42        p: float = 1.0,
43        skip_rejected: bool = True,
44        *args: Any,
45        random_state: Optional[Union[int, np.random.Generator]] = None,
46        use_batch: bool = False,
47        batch_size: int = 128,
48        **kwargs: Any,
49    ) -> None:
50        """
51        Initialize the filter.
52        Parameters
53        ----------
54        p : float
55            The probability of applying the filter.
56            If `p` is 1, the filter will always be applied.
57        skip_rejected : bool
58            If `True`, the filter will skip documents that are already rejected.
59            If you want to apply the filter to all documents (e.g., postprocess), set this to `False`.
60        random_state : Optional[Union[int, np.random.Generator]]
61            Seed for the random number generator.
62            If `None`, a new random number generator will be created.
63            If `None`, and use in the `Compose` class, the random state is shared with the `Compose` object.
64        use_batch : bool
65            If `True`, the filter will process documents in batches in the `apply_stream` method.
66        batch_size : int
67            The size of the batch to process documents in the `apply_stream` method.
68        kwargs : Any
69            Additional keyword arguments to pass to the filter.
70        """
71        self.name = self.__class__.__name__
72        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
73        assert 0 <= p <= 1
74        self.p = p
75        self.__init_rng(random_state)
76        self.skip_rejected = skip_rejected
77        self.use_batch = use_batch
78        self.batch_size = batch_size
79
80        self._statistics: Statistics = Statistics()

Initialize the filter.

Parameters

p : float The probability of applying the filter. If p is 1, the filter will always be applied. skip_rejected : bool If True, the filter will skip documents that are already rejected. If you want to apply the filter to all documents (e.g., postprocess), set this to False. random_state : Optional[Union[int, np.random.Generator]] Seed for the random number generator. If None, a new random number generator will be created. If None, and use in the Compose class, the random state is shared with the Compose object. use_batch : bool If True, the filter will process documents in batches in the apply_stream method. batch_size : int The size of the batch to process documents in the apply_stream method. kwargs : Any Additional keyword arguments to pass to the filter.

@abstractmethod
def apply( self, document: hojichar.Document) -> hojichar.Document:
 82    @abstractmethod
 83    def apply(self, document: Document) -> Document:
 84        """
 85        Definition of filter behavior.
 86
 87        The document must have a protocol `TextContent`,
 88        and mostly used hojichar.Document class.
 89
 90        In this method, the filter will modify `document.text` or
 91        `document.extras` and set `document.is_rejected = True` to discard the document.
 92
 93        Parameters
 94        ----------
 95        document : Document
 96            Input document
 97
 98        Returns
 99        -------
100        Document
101            Processed Document
102        """

Definition of filter behavior.

The document must have a protocol TextContent, and mostly used hojichar.Document class.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

@deprecated_since(version='1.0.0', alternative='apply')
def apply_filter( self, document: hojichar.Document) -> hojichar.Document:
104    @deprecated_since(version="1.0.0", alternative="apply")
105    def apply_filter(self, document: Document) -> Document:
106        document = self.apply(document)
107        return document
def apply_batch( self, batch: Sequence[hojichar.Document]) -> List[hojichar.Document]:
148    def apply_batch(self, batch: Sequence[Document]) -> List[Document]:
149        """
150        Apply the filter to a batch of documents.
151        You can override this method if you want to
152        apply the filter to a batch of documents at once.
153
154        This method may be used in `apply_batch` method of `Compose` class.
155
156        Parameters
157        ----------
158        documents : Sequence[Document]
159            List-like object of input documents
160
161        Returns
162        -------
163        list[Document]
164            List of processed documents
165        """
166        return [self.apply(document) for document in batch]

Apply the filter to a batch of documents. You can override this method if you want to apply the filter to a batch of documents at once.

This method may be used in apply_batch method of Compose class.

Parameters

documents : Sequence[Document] List-like object of input documents

Returns

list[Document] List of processed documents

def apply_stream( self, stream: Iterable[hojichar.Document]) -> Iterable[hojichar.Document]:
186    def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]:
187        """
188        Apply the filter to a stream of documents.
189        This method is used when you want to process documents one by one.
190        If `use_batch` is set to `True` in the constructor,
191        this method will process documents in batches using the `apply_batch` method.
192
193        Even if an exception occurs during processing, the process will continue, and the following actions will be taken:
194        - Set the `is_rejected` flag of the document to `True`
195        - Set the error details in `reject_reason`
196        - Increment the `errors` count in the statistics retrievable via `get_statistics`
197
198        Parameters
199        ----------
200        stream : Iterable[Document]
201            Stream of input documents
202
203        Returns
204        -------
205        Iterable[Document]
206            Stream of processed documents
207        """
208
209        if not self.use_batch:
210            for document in stream:
211                yield self._try_process(document, self._apply)
212        else:
213            batch: list[Document] = []
214            for document in stream:
215                if self._check_skip(document):
216                    yield document
217                    continue
218
219                batch.append(document)
220                if len(batch) >= self.batch_size:
221                    stats = [get_doc_info(doc) for doc in batch]
222                    batch = self._try_process(batch, self.apply_batch)
223                    batch = self._finalize_batch(batch, stats)
224                    yield from batch
225                    batch.clear()
226            if batch:
227                stats = [get_doc_info(doc) for doc in batch]
228                batch = self._try_process(batch, self.apply_batch)
229                batch = self._finalize_batch(batch, stats)
230                yield from batch

Apply the filter to a stream of documents. This method is used when you want to process documents one by one. If use_batch is set to True in the constructor, this method will process documents in batches using the apply_batch method.

Even if an exception occurs during processing, the process will continue, and the following actions will be taken:

  • Set the is_rejected flag of the document to True
  • Set the error details in reject_reason
  • Increment the errors count in the statistics retrievable via get_statistics

Parameters

stream : Iterable[Document] Stream of input documents

Returns

Iterable[Document] Stream of processed documents

def get_statistics(self) -> hojichar.core.models.Statistics:
259    def get_statistics(self) -> Statistics:
260        """
261        Get the statistics of this filter.
262        This method returns the statistics of the filter,
263        which includes the number of processed documents, discarded documents, and other statistics.
264        """
265        return self._statistics

Get the statistics of this filter. This method returns the statistics of the filter, which includes the number of processed documents, discarded documents, and other statistics.

def get_statistics_map(self) -> Dict[str, Any]:
267    def get_statistics_map(self) -> Dict[str, Any]:
268        """
269        Get the statistics of this filter as a dictionary.
270        """
271        return self._statistics.to_dict()

Get the statistics of this filter as a dictionary.

def shutdown(self) -> None:
273    def shutdown(self) -> None:
274        """
275        This method is called when the filter is no longer needed.
276        You can override this method to release resources or perform cleanup tasks.
277        """
278        pass

This method is called when the filter is no longer needed. You can override this method to release resources or perform cleanup tasks.

def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> Dict[str, Any]:
290    def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> Dict[str, Any]:
291        """
292        Get the member variable of this filter.
293        Eligible variables are primitive types; [bool, int, float, str, None],
294        and the name of the variable not starts with the underscore; `_`.
295        """
296        if exclude_keys is None:
297            exclude_keys = set()
298        return {
299            k: v
300            for k, v in vars(self).items()
301            if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_")))
302        }

Get the member variable of this filter. Eligible variables are primitive types; [bool, int, float, str, None], and the name of the variable not starts with the underscore; _.

@deprecated_since(version='1.0.0', alternative='Filter')
class TokenFilter(hojichar.Filter, abc.ABC):
335@deprecated_since(version="1.0.0", alternative="Filter")
336class TokenFilter(Filter, ABC):
337    """
338    Base class for token-level filters.
339
340    Token filters, which shuld be implemented in hojichar/filters/token_filters.py,
341    must inherit from this class.
342    """
343
344    def __init__(
345        self, p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any
346    ) -> None:
347        self.name = self.__class__.__name__
348        self.logger = logging.getLogger("hojichar.token_filters." + self.name)
349        assert 0 <= p <= 1
350        self.p = p
351        self.skip_rejected = skip_rejected
352
353    def apply(self, token: Token) -> Token:  # type: ignore
354        raise NotImplementedError(f"{self.__class__.__name__}.apply method is not defined")
355        return token
356
357    def apply_filter(self, document: Document) -> Document:
358        document.tokens = [self.apply(token) for token in document.tokens if not token.is_rejected]
359        return document
360
361    def __call__(self, text: str) -> str:  # type: ignore
362        token = Token(text)
363        token = self.apply(token)
364        return token.text
365
366    def _apply(self, document: Document) -> Document:
367        """
368        Apply the token filter to a single document.
369        This method checks if the document should be skipped.
370        """
371        if self.skip_rejected and document.is_rejected:
372            return document
373        return self.apply_filter(document)
374
375    def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> dict:
376        """
377        Get the member variable of this filter.
378        Eligible variables are primitive types; [bool, int, float, str, None],
379        and the name of the variable not starts with the underscore; `_`.
380        """
381        if exclude_keys is None:
382            exclude_keys = set()
383        return {
384            k: v
385            for k, v in vars(self).items()
386            if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_")))
387        }

Base class for token-level filters.

Token filters, which shuld be implemented in hojichar/filters/token_filters.py, must inherit from this class.

TokenFilter(p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any)
344    def __init__(
345        self, p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any
346    ) -> None:
347        self.name = self.__class__.__name__
348        self.logger = logging.getLogger("hojichar.token_filters." + self.name)
349        assert 0 <= p <= 1
350        self.p = p
351        self.skip_rejected = skip_rejected

Initialize the filter.

Parameters

p : float The probability of applying the filter. If p is 1, the filter will always be applied. skip_rejected : bool If True, the filter will skip documents that are already rejected. If you want to apply the filter to all documents (e.g., postprocess), set this to False. random_state : Optional[Union[int, np.random.Generator]] Seed for the random number generator. If None, a new random number generator will be created. If None, and use in the Compose class, the random state is shared with the Compose object. use_batch : bool If True, the filter will process documents in batches in the apply_stream method. batch_size : int The size of the batch to process documents in the apply_stream method. kwargs : Any Additional keyword arguments to pass to the filter.

def apply(self, token: hojichar.Token) -> hojichar.Token:
353    def apply(self, token: Token) -> Token:  # type: ignore
354        raise NotImplementedError(f"{self.__class__.__name__}.apply method is not defined")
355        return token

Definition of filter behavior.

The document must have a protocol TextContent, and mostly used hojichar.Document class.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

def apply_filter( self, document: hojichar.Document) -> hojichar.Document:
357    def apply_filter(self, document: Document) -> Document:
358        document.tokens = [self.apply(token) for token in document.tokens if not token.is_rejected]
359        return document
def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> dict:
375    def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> dict:
376        """
377        Get the member variable of this filter.
378        Eligible variables are primitive types; [bool, int, float, str, None],
379        and the name of the variable not starts with the underscore; `_`.
380        """
381        if exclude_keys is None:
382            exclude_keys = set()
383        return {
384            k: v
385            for k, v in vars(self).items()
386            if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_")))
387        }

Get the member variable of this filter. Eligible variables are primitive types; [bool, int, float, str, None], and the name of the variable not starts with the underscore; _.

class Document:
24class Document:
25    """
26    Document class represents a text document with metadata.
27    It contains the text of the document, a flag indicating whether it is rejected,
28     and additional metadata stored in the `extras` dictionary.
29
30    The `tokens` attribute will be deprecated in future versions,
31    and users are encouraged to use the `extras` dictionary to store token-related information.
32
33    Attributes:
34        text (str): The text content of the document.
35        is_rejected (bool): A flag indicating whether the document is rejected.
36        extras (Dict[str, Any]): A dictionary to store additional metadata about the document.
37        reject_reason (Dict[str, Any]): A dictionary to store the reason for rejection. The
38          filter class and the member name and value will logged at the filter is logged here.
39
40    Next attributes will be deprecated in future versions:
41        dedup_lsh (List[str]): A list for deduplication using Locality Sensitive Hashing (LSH).
42        tokens (List[Token]): A list of tokens extracted from the document.
43    """
44
45    def __init__(
46        self,
47        text: str,
48        is_rejected: bool = False,
49        tokens: Optional[List[Token]] = None,
50        extras: Optional[Dict[str, Any]] = None,
51    ) -> None:
52        self.text = text
53        self.__original = text
54        self.is_rejected = is_rejected
55        if tokens is None:
56            self.tokens: List[Token] = []
57        else:
58            self.tokens = tokens
59
60        if extras is None:
61            self.extras: Dict[str, Any] = {}
62        else:
63            self.extras = extras
64
65        self.dedup_lsh: List[str] = []
66        self.reject_reason: Dict[str, Any] = {}
67
68    @property
69    def original(self) -> str:
70        return self.__original
71
72    @deprecated_since("1.0.0")
73    def set_tokens(self, tokens: List[str]) -> None:
74        self.tokens = [Token(token) for token in tokens]
75
76    @deprecated_since("1.0.0")
77    def get_tokens(self) -> List[str]:
78        return [token.text for token in self.tokens]
79
80    def __str__(self) -> str:
81        return self.text
82
83    def __repr__(self) -> str:
84        return (
85            f"Document(text={self.text!r}, is_rejected={self.is_rejected}, extras={self.extras})"  # noqa
86        )

Document class represents a text document with metadata. It contains the text of the document, a flag indicating whether it is rejected, and additional metadata stored in the extras dictionary.

The tokens attribute will be deprecated in future versions, and users are encouraged to use the extras dictionary to store token-related information.

Attributes: text (str): The text content of the document. is_rejected (bool): A flag indicating whether the document is rejected. extras (Dict[str, Any]): A dictionary to store additional metadata about the document. reject_reason (Dict[str, Any]): A dictionary to store the reason for rejection. The filter class and the member name and value will logged at the filter is logged here.

Next attributes will be deprecated in future versions: dedup_lsh (List[str]): A list for deduplication using Locality Sensitive Hashing (LSH). tokens (List[Token]): A list of tokens extracted from the document.

Document( text: str, is_rejected: bool = False, tokens: Optional[List[hojichar.Token]] = None, extras: Optional[Dict[str, Any]] = None)
45    def __init__(
46        self,
47        text: str,
48        is_rejected: bool = False,
49        tokens: Optional[List[Token]] = None,
50        extras: Optional[Dict[str, Any]] = None,
51    ) -> None:
52        self.text = text
53        self.__original = text
54        self.is_rejected = is_rejected
55        if tokens is None:
56            self.tokens: List[Token] = []
57        else:
58            self.tokens = tokens
59
60        if extras is None:
61            self.extras: Dict[str, Any] = {}
62        else:
63            self.extras = extras
64
65        self.dedup_lsh: List[str] = []
66        self.reject_reason: Dict[str, Any] = {}
@deprecated_since('1.0.0')
def set_tokens(self, tokens: List[str]) -> None:
72    @deprecated_since("1.0.0")
73    def set_tokens(self, tokens: List[str]) -> None:
74        self.tokens = [Token(token) for token in tokens]
@deprecated_since('1.0.0')
def get_tokens(self) -> List[str]:
76    @deprecated_since("1.0.0")
77    def get_tokens(self) -> List[str]:
78        return [token.text for token in self.tokens]
@deprecated_since('0.1.0', 'Document')
class Token:
 9@deprecated_since("0.1.0", "Document")
10class Token:
11    def __init__(self, text: str, is_rejected: bool = False) -> None:
12        self.text = text
13        self.__original = text
14        self.is_rejected = is_rejected
15
16    @property
17    def original(self) -> str:
18        return self.__original
19
20    def __str__(self) -> str:
21        return self.text
Token(text: str, is_rejected: bool = False)
11    def __init__(self, text: str, is_rejected: bool = False) -> None:
12        self.text = text
13        self.__original = text
14        self.is_rejected = is_rejected
class Parallel:
 48class Parallel:
 49    """
 50    The Parallel class provides a way to apply a hojichar.Compose filter
 51    to an iterator of documents in a parallel manner using a specified
 52    number of worker processes. This class should be used as a context
 53    manager with a 'with' statement.
 54
 55    Example:
 56
 57    doc_iter = (hojichar.Document(d) for d in open("my_text.txt"))
 58    with Parallel(my_filter, num_jobs=8) as pfilter:
 59        for doc in pfilter.imap_apply(doc_iter):
 60            pass  # Process the filtered document as needed.
 61    """
 62
 63    def __init__(
 64        self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False
 65    ):
 66        """
 67        Initializes a new instance of the Parallel class.
 68
 69        Args:
 70            filter (hojichar.Compose): A composed filter object that specifies the
 71                processing operations to apply to each document in parallel.
 72                A copy of the filter is made within a 'with' statement. When the 'with'
 73                block terminates,the statistical information obtained through `filter.statistics`
 74                or`filter.statistics_obj` is replaced with the total value of the statistical
 75                information processed within the 'with' block.
 76
 77            num_jobs (int | None, optional): The number of worker processes to use.
 78                If None, then the number returned by os.cpu_count() is used. Defaults to None.
 79            ignore_errors (bool, optional): If set to True, any exceptions thrown during
 80                the processing of a document will be caught and logged, but will not
 81                stop the processing of further documents. If set to False, the first
 82                exception thrown will terminate the entire parallel processing operation.
 83                Defaults to False.
 84        """
 85        self.filter = filter
 86        self.num_jobs = num_jobs
 87        self.ignore_errors = ignore_errors
 88
 89        self._pool: Pool | None = None
 90        self._pid_stats: dict[int, List[Statistics]] | None = None
 91
 92    def __enter__(self) -> Parallel:
 93        self._pool = Pool(
 94            processes=self.num_jobs,
 95            initializer=_init_worker,
 96            initargs=(self.filter, self.ignore_errors),
 97        )
 98        self._pid_stats = dict()
 99        return self
100
101    def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]:
102        """
103        Takes an iterator of Documents and applies the Compose filter to
104        each Document in a parallel manner. This is a generator method
105        that yields processed Documents.
106
107        Args:
108            docs (Iterator[hojichar.Document]): An iterator of Documents to be processed.
109
110        Raises:
111            RuntimeError: If the Parallel instance is not properly initialized. This
112                generally happens when the method is called outside of a 'with' statement.
113            Exception: If any exceptions are raised within the worker processes.
114
115        Yields:
116            Iterator[hojichar.Document]: An iterator that yields processed Documents.
117        """
118        if self._pool is None or self._pid_stats is None:
119            raise RuntimeError(
120                "Parallel instance not properly initialized. Use within a 'with' statement."
121            )
122        try:
123            for doc, pid, stat, err_msg in self._pool.imap_unordered(_worker, docs):
124                self._pid_stats[pid] = stat
125                if err_msg is not None:
126                    logger.error(f"Error in worker {pid}: {err_msg}")
127                yield doc
128        except Exception:
129            self.__exit__(None, None, None)
130            raise
131
132    def __exit__(self, exc_type, exc_value, traceback) -> None:  # type: ignore
133        if self._pool:
134            self._pool.terminate()
135            self._pool.join()
136        if self._pid_stats:
137            total_stats = functools.reduce(
138                lambda x, y: Statistics.add_list_of_stats(x, y), self._pid_stats.values()
139            )
140            self.filter._statistics.update(Statistics.get_filter("Total", total_stats))
141            for stat in total_stats:
142                for filt in self.filter.filters:
143                    if stat.name == filt.name:
144                        filt._statistics.update(stat)
145                        break
146
147    def get_total_statistics(self) -> List[Statistics]:
148        """
149        Returns a statistics object of the total statistical
150        values processed within the Parallel block.
151
152        Returns:
153            StatsContainer: Statistics object
154        """
155        if self._pid_stats:
156            total_stats = functools.reduce(
157                lambda x, y: Statistics.add_list_of_stats(x, y), self._pid_stats.values()
158            )
159            return total_stats
160        else:
161            return []
162
163    def get_total_statistics_map(self) -> List[dict]:
164        return [stat.to_dict() for stat in self.get_total_statistics()]
165
166    @property
167    def statistics_obj(self) -> inspection.StatsContainer:
168        """
169        Returns the statistics object of the Parallel instance.
170        This is a StatsContainer object which contains the statistics
171        of the Parallel instance and sub filters.
172
173        Returns:
174            StatsContainer: Statistics object
175        """
176        return inspection.statistics_obj_adapter(self.get_total_statistics())  # type: ignore

The Parallel class provides a way to apply a hojichar.Compose filter to an iterator of documents in a parallel manner using a specified number of worker processes. This class should be used as a context manager with a 'with' statement.

Example:

doc_iter = (hojichar.Document(d) for d in open("my_text.txt")) with Parallel(my_filter, num_jobs=8) as pfilter: for doc in pfilter.imap_apply(doc_iter): pass # Process the filtered document as needed.

Parallel( filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False)
63    def __init__(
64        self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False
65    ):
66        """
67        Initializes a new instance of the Parallel class.
68
69        Args:
70            filter (hojichar.Compose): A composed filter object that specifies the
71                processing operations to apply to each document in parallel.
72                A copy of the filter is made within a 'with' statement. When the 'with'
73                block terminates,the statistical information obtained through `filter.statistics`
74                or`filter.statistics_obj` is replaced with the total value of the statistical
75                information processed within the 'with' block.
76
77            num_jobs (int | None, optional): The number of worker processes to use.
78                If None, then the number returned by os.cpu_count() is used. Defaults to None.
79            ignore_errors (bool, optional): If set to True, any exceptions thrown during
80                the processing of a document will be caught and logged, but will not
81                stop the processing of further documents. If set to False, the first
82                exception thrown will terminate the entire parallel processing operation.
83                Defaults to False.
84        """
85        self.filter = filter
86        self.num_jobs = num_jobs
87        self.ignore_errors = ignore_errors
88
89        self._pool: Pool | None = None
90        self._pid_stats: dict[int, List[Statistics]] | None = None

Initializes a new instance of the Parallel class.

Args: filter (hojichar.Compose): A composed filter object that specifies the processing operations to apply to each document in parallel. A copy of the filter is made within a 'with' statement. When the 'with' block terminates,the statistical information obtained through filter.statistics orfilter.statistics_obj is replaced with the total value of the statistical information processed within the 'with' block.

num_jobs (int | None, optional): The number of worker processes to use.
    If None, then the number returned by os.cpu_count() is used. Defaults to None.
ignore_errors (bool, optional): If set to True, any exceptions thrown during
    the processing of a document will be caught and logged, but will not
    stop the processing of further documents. If set to False, the first
    exception thrown will terminate the entire parallel processing operation.
    Defaults to False.
def imap_apply( self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]:
101    def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]:
102        """
103        Takes an iterator of Documents and applies the Compose filter to
104        each Document in a parallel manner. This is a generator method
105        that yields processed Documents.
106
107        Args:
108            docs (Iterator[hojichar.Document]): An iterator of Documents to be processed.
109
110        Raises:
111            RuntimeError: If the Parallel instance is not properly initialized. This
112                generally happens when the method is called outside of a 'with' statement.
113            Exception: If any exceptions are raised within the worker processes.
114
115        Yields:
116            Iterator[hojichar.Document]: An iterator that yields processed Documents.
117        """
118        if self._pool is None or self._pid_stats is None:
119            raise RuntimeError(
120                "Parallel instance not properly initialized. Use within a 'with' statement."
121            )
122        try:
123            for doc, pid, stat, err_msg in self._pool.imap_unordered(_worker, docs):
124                self._pid_stats[pid] = stat
125                if err_msg is not None:
126                    logger.error(f"Error in worker {pid}: {err_msg}")
127                yield doc
128        except Exception:
129            self.__exit__(None, None, None)
130            raise

Takes an iterator of Documents and applies the Compose filter to each Document in a parallel manner. This is a generator method that yields processed Documents.

Args: docs (Iterator[hojichar.Document]): An iterator of Documents to be processed.

Raises: RuntimeError: If the Parallel instance is not properly initialized. This generally happens when the method is called outside of a 'with' statement. Exception: If any exceptions are raised within the worker processes.

Yields: Iterator[hojichar.Document]: An iterator that yields processed Documents.

def get_total_statistics(self) -> List[hojichar.core.models.Statistics]:
147    def get_total_statistics(self) -> List[Statistics]:
148        """
149        Returns a statistics object of the total statistical
150        values processed within the Parallel block.
151
152        Returns:
153            StatsContainer: Statistics object
154        """
155        if self._pid_stats:
156            total_stats = functools.reduce(
157                lambda x, y: Statistics.add_list_of_stats(x, y), self._pid_stats.values()
158            )
159            return total_stats
160        else:
161            return []

Returns a statistics object of the total statistical values processed within the Parallel block.

Returns: StatsContainer: Statistics object

def get_total_statistics_map(self) -> List[dict]:
163    def get_total_statistics_map(self) -> List[dict]:
164        return [stat.to_dict() for stat in self.get_total_statistics()]
statistics_obj: hojichar.StatsContainer

Returns the statistics object of the Parallel instance. This is a StatsContainer object which contains the statistics of the Parallel instance and sub filters.

Returns: StatsContainer: Statistics object

@dataclasses.dataclass
class StatsContainer:
116@dataclasses.dataclass
117class StatsContainer:
118    total_info: DocStatistics
119    layers_info: Dict[str, FilterStatistics]  # Key of the dict is filter name.
120
121    def __add__(self, other: StatsContainer) -> StatsContainer:
122        assert self.layers_info.keys() == other.layers_info.keys(), "Layer names must match"
123        return StatsContainer(
124            self.total_info + other.total_info,
125            {k: v + other.layers_info[k] for k, v in self.layers_info.items()},
126        )
127
128    def get_human_readable_values(self) -> dict:
129        return {
130            "total_info": self.total_info.get_human_readable_values(),
131            "layers_info": [
132                layer.get_human_readable_values() for layer in self.layers_info.values()
133            ],
134        }
135
136    def reset(self) -> StatsContainer:
137        self.total_info.reset
138        for layer in self.layers_info.values():
139            layer.reset()
140        return self
StatsContainer( total_info: hojichar.core.inspection.DocStatistics, layers_info: Dict[str, hojichar.core.inspection.FilterStatistics])
def get_human_readable_values(self) -> dict:
128    def get_human_readable_values(self) -> dict:
129        return {
130            "total_info": self.total_info.get_human_readable_values(),
131            "layers_info": [
132                layer.get_human_readable_values() for layer in self.layers_info.values()
133            ],
134        }
def reset(self) -> hojichar.StatsContainer:
136    def reset(self) -> StatsContainer:
137        self.total_info.reset
138        for layer in self.layers_info.values():
139            layer.reset()
140        return self
class AsyncCompose(hojichar.AsyncFilter):
 72class AsyncCompose(AsyncFilter):
 73    def __init__(
 74        self,
 75        filters: list[AsyncFilter | Filter],
 76        random_state: int | np.random.Generator | None = None,
 77        executor: ThreadPoolExecutor | None = None,
 78        *args: Any,
 79        **kwargs: Any,
 80    ):
 81        super().__init__(random_state=random_state, *args, **kwargs)
 82        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
 83        self._statistics.name = "Total"
 84        self._has_external_executor = executor is not None
 85        self._executor = executor or ThreadPoolExecutor()
 86        self.set_filters(filters)
 87
 88    def set_filters(self, filters: list[AsyncFilter | Filter]) -> None:
 89        self.filters: list[AsyncFilter] = []
 90        filter_idx = 0
 91        for f in filters:
 92            if isinstance(f, (AsyncCompose, Compose)):
 93                for sub in f.filters:
 94                    name = f"{filter_idx}-{sub.__class__.__name__}"
 95                    if isinstance(sub, Filter):
 96                        name = f"{filter_idx}-{sub.__class__.__name__}"
 97                        sub = AsyncFilterAdapter(sub, executor=self._executor)
 98
 99                    sub._set_rng_if_not_initialized(self._rng)
100                    sub.name = name
101                    sub._statistics.name = name
102                    self.filters.append(sub)
103                    filter_idx += 1
104            else:
105                name = f"{filter_idx}-{f.__class__.__name__}"
106                if isinstance(f, Filter):
107                    name = f"{filter_idx}-{f.__class__.__name__}"
108                    f = AsyncFilterAdapter(f, executor=self._executor)
109                f._set_rng_if_not_initialized(self._rng)
110                f.name = name
111                f._statistics.name = name
112                self.filters.append(f)
113                filter_idx += 1
114
115    async def apply(self, document: Document) -> Document:
116        stat = get_doc_info(document)
117        for filter_idx, filt in enumerate(self.filters):
118            document = await filt._apply(document)
119        new_stat = get_doc_info(document)
120        async with self._stats_lock:
121            self._statistics.update_by_diff(stat, new_stat)
122        return document
123
124    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
125        stats = [get_doc_info(doc) for doc in batch]
126        for i, filt in enumerate(self.filters):
127            batch = await filt._apply_batch(batch)
128        batch = await self._finalize_batch(batch, stats)
129        return list(batch)
130
131    async def apply_stream(
132        self,
133        stream: AsyncIterable[Document] | Iterable[Document],
134    ) -> AsyncGenerator[Document, None]:
135        async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor)
136        async_stream = self._count_input_stats(async_stream)
137
138        for i, filt in enumerate(self.filters):
139            async_stream = filt.apply_stream(async_stream)
140
141        async for doc in async_stream:
142            in_stat = doc.extras["__init_stats"]
143            out_stat = get_doc_info(doc)
144            async with self._stats_lock:
145                self._statistics.update_by_diff(in_stat, out_stat)
146            del doc.extras["__init_stats"]
147            yield doc
148
149    async def _count_input_stats(
150        self, async_stream: AsyncIterable[Document]
151    ) -> AsyncGenerator[Document, None]:
152        async for doc in async_stream:
153            doc.extras["__init_stats"] = get_doc_info(doc)
154            yield doc
155
156    def get_total_statistics(self) -> list[Statistics]:
157        """
158        Get the statistics of the Compose object and sub filters.
159
160        The statistics of the Compose class are stored in an object with the name "Total",
161        and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
162        """
163        stats = []
164        stats.append(self.get_statistics())
165        for i, filt in enumerate(self.filters):
166            stats.append(filt.get_statistics())
167        return stats
168
169    def get_total_statistics_map(self) -> list[dict[str, Any]]:
170        """
171        Get the statistics of the Compose object and sub filters as a list of dictionaries.
172        """
173        stats = self.get_total_statistics()
174        return [stat.to_dict() for stat in stats]
175
176    async def shutdown(self) -> None:
177        for filt in self.filters:
178            await filt.shutdown()
179        if not self._has_external_executor:
180            self._executor.shutdown()
181
182    async def __aenter__(self) -> "AsyncCompose":
183        return self
184
185    async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
186        await self.shutdown()
187        if exc_type is not None:
188            raise exc_value

Helper class that provides a standard way to create an ABC using inheritance.

AsyncCompose( filters: list[hojichar.AsyncFilter | hojichar.Filter], random_state: int | numpy.random._generator.Generator | None = None, executor: concurrent.futures.thread.ThreadPoolExecutor | None = None, *args: Any, **kwargs: Any)
73    def __init__(
74        self,
75        filters: list[AsyncFilter | Filter],
76        random_state: int | np.random.Generator | None = None,
77        executor: ThreadPoolExecutor | None = None,
78        *args: Any,
79        **kwargs: Any,
80    ):
81        super().__init__(random_state=random_state, *args, **kwargs)
82        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
83        self._statistics.name = "Total"
84        self._has_external_executor = executor is not None
85        self._executor = executor or ThreadPoolExecutor()
86        self.set_filters(filters)

Base class for asynchronous filters.

Parameters

p : float The probability of applying the filter. If p is 1, the filter will always be applied. skip_rejected : bool If True, the filter will skip documents that are already rejected. If you want to apply the filter to all documents (e.g., postprocess), set this to False. random_state : Optional[Union[int, np.random.Generator]] Seed for the random number generator. If None is specified, the random number generator managed by the Compose class will be used. use_batch : bool If True, the filter will process documents in batches in the apply_stream method. batch_size : int The size of the batch to process documents in the apply_stream method.

def set_filters( self, filters: list[hojichar.AsyncFilter | hojichar.Filter]) -> None:
 88    def set_filters(self, filters: list[AsyncFilter | Filter]) -> None:
 89        self.filters: list[AsyncFilter] = []
 90        filter_idx = 0
 91        for f in filters:
 92            if isinstance(f, (AsyncCompose, Compose)):
 93                for sub in f.filters:
 94                    name = f"{filter_idx}-{sub.__class__.__name__}"
 95                    if isinstance(sub, Filter):
 96                        name = f"{filter_idx}-{sub.__class__.__name__}"
 97                        sub = AsyncFilterAdapter(sub, executor=self._executor)
 98
 99                    sub._set_rng_if_not_initialized(self._rng)
100                    sub.name = name
101                    sub._statistics.name = name
102                    self.filters.append(sub)
103                    filter_idx += 1
104            else:
105                name = f"{filter_idx}-{f.__class__.__name__}"
106                if isinstance(f, Filter):
107                    name = f"{filter_idx}-{f.__class__.__name__}"
108                    f = AsyncFilterAdapter(f, executor=self._executor)
109                f._set_rng_if_not_initialized(self._rng)
110                f.name = name
111                f._statistics.name = name
112                self.filters.append(f)
113                filter_idx += 1
async def apply( self, document: hojichar.Document) -> hojichar.Document:
115    async def apply(self, document: Document) -> Document:
116        stat = get_doc_info(document)
117        for filter_idx, filt in enumerate(self.filters):
118            document = await filt._apply(document)
119        new_stat = get_doc_info(document)
120        async with self._stats_lock:
121            self._statistics.update_by_diff(stat, new_stat)
122        return document

Definition of async filter behavior.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

async def apply_batch( self, batch: Sequence[hojichar.Document]) -> list[hojichar.Document]:
124    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
125        stats = [get_doc_info(doc) for doc in batch]
126        for i, filt in enumerate(self.filters):
127            batch = await filt._apply_batch(batch)
128        batch = await self._finalize_batch(batch, stats)
129        return list(batch)

Apply the filter to a Sequence of documents. By default, the processing implemented in apply is executed asynchronously and concurrently. If the filter processing can be optimized for batch processing, override this method.

async def apply_stream( self, stream: Union[AsyncIterable[hojichar.Document], Iterable[hojichar.Document]]) -> AsyncGenerator[hojichar.Document, NoneType]:
131    async def apply_stream(
132        self,
133        stream: AsyncIterable[Document] | Iterable[Document],
134    ) -> AsyncGenerator[Document, None]:
135        async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor)
136        async_stream = self._count_input_stats(async_stream)
137
138        for i, filt in enumerate(self.filters):
139            async_stream = filt.apply_stream(async_stream)
140
141        async for doc in async_stream:
142            in_stat = doc.extras["__init_stats"]
143            out_stat = get_doc_info(doc)
144            async with self._stats_lock:
145                self._statistics.update_by_diff(in_stat, out_stat)
146            del doc.extras["__init_stats"]
147            yield doc

Apply the filter to a stream of documents (Iterable or AsyncIterable). If use_batch is set to True at initialization, the filter will process documents in batches. If the stream is not asynchronous, use handle_stream_as_async to convert it to an asynchronous stream.

Even if an exception occurs during processing, the process will continue, and the following actions will be taken:

  • Set the is_rejected flag of the document to True
  • Set the error details in reject_reason
  • Increment the errors count in the statistics retrievable via get_statistics
def get_total_statistics(self) -> list[hojichar.core.models.Statistics]:
156    def get_total_statistics(self) -> list[Statistics]:
157        """
158        Get the statistics of the Compose object and sub filters.
159
160        The statistics of the Compose class are stored in an object with the name "Total",
161        and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
162        """
163        stats = []
164        stats.append(self.get_statistics())
165        for i, filt in enumerate(self.filters):
166            stats.append(filt.get_statistics())
167        return stats

Get the statistics of the Compose object and sub filters.

The statistics of the Compose class are stored in an object with the name "Total", and sub-filters's are stored with names in the format {filter_index}-{filter class name}.

def get_total_statistics_map(self) -> list[dict[str, typing.Any]]:
169    def get_total_statistics_map(self) -> list[dict[str, Any]]:
170        """
171        Get the statistics of the Compose object and sub filters as a list of dictionaries.
172        """
173        stats = self.get_total_statistics()
174        return [stat.to_dict() for stat in stats]

Get the statistics of the Compose object and sub filters as a list of dictionaries.

async def shutdown(self) -> None:
176    async def shutdown(self) -> None:
177        for filt in self.filters:
178            await filt.shutdown()
179        if not self._has_external_executor:
180            self._executor.shutdown()

You can override this method to release resources or perform cleanup tasks.

class AsyncFilterAdapter(hojichar.AsyncFilter):
18class AsyncFilterAdapter(AsyncFilter):
19    """
20    Adapter class for executing hojichar.Filter asynchronously.
21    """
22
23    def __init__(
24        self,
25        sync_filter: Filter,
26        *args: Any,
27        executor: Executor | None = None,
28        use_batch: bool = True,
29        **kwargs: Any,
30    ):
31        """
32        Adapter class for executing hojichar.Filter asynchronously.
33        Used to incorporate Filter into AsyncCompose.
34
35        To reduce the overhead of asynchronous context switching,
36        use_batch is set to True by default to process in batches
37        in apply_stream, regardless of the sync_filter's use_batch setting.
38
39        If performing CPU-bound and heavy processing, you can specify an executor
40        to offload the processing to the executor. However, due to Python's GIL
41        constraints, using ThreadPoolExecutor will not parallelize CPU-bound
42        processing, and the entire process will be locked.
43
44        By using ProcessPoolExecutor as the executor, it may be possible to
45        parallelize CPU-bound processing. However, for parallelizing CPU-bound
46        processing, it is recommended to use the hojichar.Parallel class to
47        parallelize synchronous Compose pipeline.
48        """
49        super().__init__(*args, use_batch=use_batch, **kwargs)
50        self.sync_filter = sync_filter
51        self._has_external_executor = executor is not None
52        self._executor = executor or ThreadPoolExecutor()
53        self.batch_size = sync_filter.batch_size
54
55    async def apply(self, document: Document) -> Document:
56        loop = asyncio.get_running_loop()
57        return await loop.run_in_executor(self._executor, self.sync_filter.apply, document)
58
59    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
60        loop = asyncio.get_running_loop()
61        return await loop.run_in_executor(
62            self._executor,
63            lambda: self.sync_filter.apply_batch(batch),
64        )
65
66    async def shutdown(self) -> None:
67        self.sync_filter.shutdown()
68        if not self._has_external_executor:
69            self._executor.shutdown()

Adapter class for executing hojichar.Filter asynchronously.

AsyncFilterAdapter( sync_filter: hojichar.Filter, *args: Any, executor: concurrent.futures._base.Executor | None = None, use_batch: bool = True, **kwargs: Any)
23    def __init__(
24        self,
25        sync_filter: Filter,
26        *args: Any,
27        executor: Executor | None = None,
28        use_batch: bool = True,
29        **kwargs: Any,
30    ):
31        """
32        Adapter class for executing hojichar.Filter asynchronously.
33        Used to incorporate Filter into AsyncCompose.
34
35        To reduce the overhead of asynchronous context switching,
36        use_batch is set to True by default to process in batches
37        in apply_stream, regardless of the sync_filter's use_batch setting.
38
39        If performing CPU-bound and heavy processing, you can specify an executor
40        to offload the processing to the executor. However, due to Python's GIL
41        constraints, using ThreadPoolExecutor will not parallelize CPU-bound
42        processing, and the entire process will be locked.
43
44        By using ProcessPoolExecutor as the executor, it may be possible to
45        parallelize CPU-bound processing. However, for parallelizing CPU-bound
46        processing, it is recommended to use the hojichar.Parallel class to
47        parallelize synchronous Compose pipeline.
48        """
49        super().__init__(*args, use_batch=use_batch, **kwargs)
50        self.sync_filter = sync_filter
51        self._has_external_executor = executor is not None
52        self._executor = executor or ThreadPoolExecutor()
53        self.batch_size = sync_filter.batch_size

Adapter class for executing hojichar.Filter asynchronously. Used to incorporate Filter into AsyncCompose.

To reduce the overhead of asynchronous context switching, use_batch is set to True by default to process in batches in apply_stream, regardless of the sync_filter's use_batch setting.

If performing CPU-bound and heavy processing, you can specify an executor to offload the processing to the executor. However, due to Python's GIL constraints, using ThreadPoolExecutor will not parallelize CPU-bound processing, and the entire process will be locked.

By using ProcessPoolExecutor as the executor, it may be possible to parallelize CPU-bound processing. However, for parallelizing CPU-bound processing, it is recommended to use the hojichar.Parallel class to parallelize synchronous Compose pipeline.

async def apply( self, document: hojichar.Document) -> hojichar.Document:
55    async def apply(self, document: Document) -> Document:
56        loop = asyncio.get_running_loop()
57        return await loop.run_in_executor(self._executor, self.sync_filter.apply, document)

Definition of async filter behavior.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

async def apply_batch( self, batch: Sequence[hojichar.Document]) -> list[hojichar.Document]:
59    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
60        loop = asyncio.get_running_loop()
61        return await loop.run_in_executor(
62            self._executor,
63            lambda: self.sync_filter.apply_batch(batch),
64        )

Apply the filter to a Sequence of documents. By default, the processing implemented in apply is executed asynchronously and concurrently. If the filter processing can be optimized for batch processing, override this method.

async def shutdown(self) -> None:
66    async def shutdown(self) -> None:
67        self.sync_filter.shutdown()
68        if not self._has_external_executor:
69            self._executor.shutdown()

You can override this method to release resources or perform cleanup tasks.

class AsyncFilter(abc.ABC):
 34class AsyncFilter(ABC):
 35    def __init__(
 36        self,
 37        *args: Any,
 38        p: float = 1.0,
 39        skip_rejected: bool = True,
 40        random_state: int | np.random.Generator | None = None,
 41        use_batch: bool = True,
 42        batch_size: int = 128,
 43        **kwargs: Any,
 44    ):
 45        """
 46        Base class for asynchronous filters.
 47
 48        Parameters
 49        ----------
 50        p : float
 51            The probability of applying the filter.
 52            If `p` is 1, the filter will always be applied.
 53        skip_rejected : bool
 54            If `True`, the filter will skip documents that are already rejected.
 55            If you want to apply the filter to all documents (e.g., postprocess), set this to `False`.
 56        random_state : Optional[Union[int, np.random.Generator]]
 57            Seed for the random number generator.
 58            If `None` is specified, the random number generator managed by the Compose class will be used.
 59        use_batch : bool
 60            If `True`, the filter will process documents in batches in the `apply_stream` method.
 61        batch_size : int
 62            The size of the batch to process documents in the `apply_stream` method.
 63        """
 64        self.name = self.__class__.__name__
 65        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
 66        assert 0 <= p <= 1
 67        self.p = p
 68        self.__init_rng(random_state)
 69        self.skip_rejected = skip_rejected
 70        self.use_batch = use_batch
 71        self.batch_size = batch_size
 72
 73        self._statistics = Statistics(name=self.name)
 74        self._stats_lock = asyncio.Lock()
 75
 76    @abstractmethod
 77    async def apply(self, document: Document) -> Document:
 78        """
 79        Definition of async filter behavior.
 80
 81        In this method, the filter will modify `document.text` or
 82        `document.extras` and set `document.is_rejected = True` to discard the document.
 83
 84        Parameters
 85        ----------
 86        document : Document
 87            Input document
 88
 89        Returns
 90        -------
 91        Document
 92            Processed Document
 93        """
 94        pass
 95
 96    def _check_skip(self, document: Document) -> bool:
 97        """
 98        Check if the document should be skipped by this filter.
 99        If `skip_rejected` is set to `True`, this method will return `True`
100        if the document is already rejected.
101        If `p` is less than 1, this method will return `True` with a probability of `1 - p`.
102        """
103        skip = self.skip_rejected and document.is_rejected
104        if skip:
105            return True
106        if self.p < 1:
107            if self._rng.random() > self.p:
108                return True
109        return False
110
111    async def _apply(self, document: Document) -> Document:
112        stats = get_doc_info(document)
113        if not self._check_skip(document):
114            document = await self.apply(document)
115        new_stats = get_doc_info(document)
116        async with self._stats_lock:
117            self._statistics.update_by_diff(stats, new_stats)
118
119        if not stats["is_rejected"] and new_stats["is_rejected"]:
120            document.reject_reason = self.get_jsonable_vars()
121        return document
122
123    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
124        """
125        Apply the filter to a Sequence of documents.
126        By default, the processing implemented in `apply` is executed asynchronously and concurrently.
127        If the filter processing can be optimized for batch processing, override this method.
128        """
129        tasks = [self.apply(doc) for doc in batch]
130        return await asyncio.gather(*tasks)
131
132    async def _apply_batch(self, batch: Sequence[Document]) -> list[Document]:
133        skip = False
134        if self.p < 1:
135            skip = self._rng.random() > self.p
136
137        stats = [get_doc_info(doc) for doc in batch]
138        if not skip:
139            batch = await self.apply_batch(batch)
140        batch = await self._finalize_batch(batch, stats)
141        return list(batch)
142
143    async def apply_stream(
144        self,
145        stream: Iterable[Document] | AsyncIterable[Document],
146    ) -> AsyncGenerator[Document, None]:
147        """
148        Apply the filter to a stream of documents (Iterable or AsyncIterable).
149        If use_batch is set to `True` at initialization, the filter will process documents in batches.
150        If the stream is not asynchronous, use handle_stream_as_async to convert it to an asynchronous stream.
151
152        Even if an exception occurs during processing, the process will continue, and the following actions will be taken:
153        - Set the `is_rejected` flag of the document to `True`
154        - Set the error details in `reject_reason`
155        - Increment the `errors` count in the statistics retrievable via `get_statistics`
156        """
157        async_stream: AsyncIterable[Document] = handle_stream_as_async(stream)
158
159        if not self.use_batch:
160            async for doc in async_stream:
161                yield await self._try_process(doc, self._apply)
162        else:
163            batch: list[Document] = []
164            async for doc in async_stream:
165                if self._check_skip(doc):
166                    yield doc
167                    continue
168
169                batch.append(doc)
170                # Batch size reached, apply batch
171                if len(batch) >= self.batch_size:
172                    stats = [get_doc_info(doc) for doc in batch]
173                    batch = await self._try_process(batch, self.apply_batch)
174                    batch = await self._finalize_batch(batch, stats)
175                    for out in batch:
176                        yield out
177                    batch.clear()
178
179            # Flush remaining documents in the batch
180            if batch:
181                stats = [get_doc_info(doc) for doc in batch]
182                batch = await self._try_process(batch, self.apply_batch)
183                batch = await self._finalize_batch(batch, stats)
184                for out in batch:
185                    yield out
186
187    async def _try_process(self, target: T, func: Callable[[T], Awaitable[T]]) -> T:
188        try:
189            return await func(target)
190        except Exception as e:
191            if isinstance(target, Document):
192                msg = f"{e!r} occurs while processing {self.name} with {target!r}"
193                self.logger.error(msg, exc_info=True)
194                target.is_rejected = True
195                target.reject_reason = {"error": msg}
196                async with self._stats_lock:
197                    self._statistics.errors += 1
198                return target  # type: ignore[return-value]
199            if isinstance(target, list):
200                msg = f"{e!r} occurs while batch processing {self.name}"
201                self.logger.error(msg, exc_info=True)
202                for doc in target:
203                    doc.is_rejected = True
204                    doc.reject_reason = {"error": msg}
205                async with self._stats_lock:
206                    self._statistics.errors += len(target)
207                return target  # type: ignore[return-value]
208            raise e
209
210    async def __call__(self, text: str) -> str:
211        document = Document(text=text)
212        return (await self._apply(document)).text
213
214    def get_statistics(self) -> Statistics:
215        """
216        Get the statistics of this filter.
217        Returns:
218            Statistics: The statistics of this filter.
219        """
220        return self._statistics
221
222    def get_statistics_map(self) -> dict[str, Statistics]:
223        """
224        Get the statistics of this filter as a dictionary.
225        """
226        return self._statistics.to_dict()
227
228    async def shutdown(self) -> None:
229        """
230        You can override this method to release resources or perform cleanup tasks.
231        """
232        pass
233
234    async def __aenter__(self) -> "AsyncFilter":
235        return self
236
237    async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
238        await self.shutdown()
239
240    def get_jsonable_vars(self, exclude_keys: set[str] | None = None) -> dict[str, Any]:
241        """
242        Get the member variable of this filter.
243        Eligible variables are primitive types; [bool, int, float, str, None],
244        and the name of the variable not starts with the underscore; `_`.
245        """
246        if exclude_keys is None:
247            exclude_keys = set()
248        return {
249            k: v
250            for k, v in vars(self).items()
251            if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_")))
252        }
253
254    async def _finalize_batch(
255        self,
256        batch: Sequence[Document],
257        old_stats: list[dict[str, Any]],
258    ) -> list[Document]:
259        new_stats = [get_doc_info(doc) for doc in batch]
260        for old, new, doc in zip(old_stats, new_stats, batch):
261            async with self._stats_lock:
262                self._statistics.update_by_diff(old, new)
263            if not old["is_rejected"] and new["is_rejected"]:
264                doc.reject_reason = self.get_jsonable_vars()
265        return list(batch)
266
267    def __init_rng(self, random_state: int | np.random.Generator | None) -> None:
268        self._owns_rng = True
269        if random_state is None:
270            self._rng = np.random.default_rng()
271            self._owns_rng = False
272        elif isinstance(random_state, int):
273            self._rng = np.random.default_rng(random_state)
274        elif isinstance(random_state, np.random.Generator):
275            self._rng = random_state
276        else:
277            raise TypeError(
278                f"random_state must be int or np.random.Generator, not {type(random_state)}"
279            )
280
281    def _set_rng_if_not_initialized(self, rng: np.random.Generator) -> None:
282        """
283        Set the random number generator for this filter if it is not already initialized.
284        This method is called by Compose class.
285        """
286        if not self._owns_rng:
287            self._rng = rng

Helper class that provides a standard way to create an ABC using inheritance.

AsyncFilter( *args: Any, p: float = 1.0, skip_rejected: bool = True, random_state: int | numpy.random._generator.Generator | None = None, use_batch: bool = True, batch_size: int = 128, **kwargs: Any)
35    def __init__(
36        self,
37        *args: Any,
38        p: float = 1.0,
39        skip_rejected: bool = True,
40        random_state: int | np.random.Generator | None = None,
41        use_batch: bool = True,
42        batch_size: int = 128,
43        **kwargs: Any,
44    ):
45        """
46        Base class for asynchronous filters.
47
48        Parameters
49        ----------
50        p : float
51            The probability of applying the filter.
52            If `p` is 1, the filter will always be applied.
53        skip_rejected : bool
54            If `True`, the filter will skip documents that are already rejected.
55            If you want to apply the filter to all documents (e.g., postprocess), set this to `False`.
56        random_state : Optional[Union[int, np.random.Generator]]
57            Seed for the random number generator.
58            If `None` is specified, the random number generator managed by the Compose class will be used.
59        use_batch : bool
60            If `True`, the filter will process documents in batches in the `apply_stream` method.
61        batch_size : int
62            The size of the batch to process documents in the `apply_stream` method.
63        """
64        self.name = self.__class__.__name__
65        self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
66        assert 0 <= p <= 1
67        self.p = p
68        self.__init_rng(random_state)
69        self.skip_rejected = skip_rejected
70        self.use_batch = use_batch
71        self.batch_size = batch_size
72
73        self._statistics = Statistics(name=self.name)
74        self._stats_lock = asyncio.Lock()

Base class for asynchronous filters.

Parameters

p : float The probability of applying the filter. If p is 1, the filter will always be applied. skip_rejected : bool If True, the filter will skip documents that are already rejected. If you want to apply the filter to all documents (e.g., postprocess), set this to False. random_state : Optional[Union[int, np.random.Generator]] Seed for the random number generator. If None is specified, the random number generator managed by the Compose class will be used. use_batch : bool If True, the filter will process documents in batches in the apply_stream method. batch_size : int The size of the batch to process documents in the apply_stream method.

@abstractmethod
async def apply( self, document: hojichar.Document) -> hojichar.Document:
76    @abstractmethod
77    async def apply(self, document: Document) -> Document:
78        """
79        Definition of async filter behavior.
80
81        In this method, the filter will modify `document.text` or
82        `document.extras` and set `document.is_rejected = True` to discard the document.
83
84        Parameters
85        ----------
86        document : Document
87            Input document
88
89        Returns
90        -------
91        Document
92            Processed Document
93        """
94        pass

Definition of async filter behavior.

In this method, the filter will modify document.text or document.extras and set document.is_rejected = True to discard the document.

Parameters

document : Document Input document

Returns

Document Processed Document

async def apply_batch( self, batch: Sequence[hojichar.Document]) -> list[hojichar.Document]:
123    async def apply_batch(self, batch: Sequence[Document]) -> list[Document]:
124        """
125        Apply the filter to a Sequence of documents.
126        By default, the processing implemented in `apply` is executed asynchronously and concurrently.
127        If the filter processing can be optimized for batch processing, override this method.
128        """
129        tasks = [self.apply(doc) for doc in batch]
130        return await asyncio.gather(*tasks)

Apply the filter to a Sequence of documents. By default, the processing implemented in apply is executed asynchronously and concurrently. If the filter processing can be optimized for batch processing, override this method.

async def apply_stream( self, stream: Union[Iterable[hojichar.Document], AsyncIterable[hojichar.Document]]) -> AsyncGenerator[hojichar.Document, NoneType]:
143    async def apply_stream(
144        self,
145        stream: Iterable[Document] | AsyncIterable[Document],
146    ) -> AsyncGenerator[Document, None]:
147        """
148        Apply the filter to a stream of documents (Iterable or AsyncIterable).
149        If use_batch is set to `True` at initialization, the filter will process documents in batches.
150        If the stream is not asynchronous, use handle_stream_as_async to convert it to an asynchronous stream.
151
152        Even if an exception occurs during processing, the process will continue, and the following actions will be taken:
153        - Set the `is_rejected` flag of the document to `True`
154        - Set the error details in `reject_reason`
155        - Increment the `errors` count in the statistics retrievable via `get_statistics`
156        """
157        async_stream: AsyncIterable[Document] = handle_stream_as_async(stream)
158
159        if not self.use_batch:
160            async for doc in async_stream:
161                yield await self._try_process(doc, self._apply)
162        else:
163            batch: list[Document] = []
164            async for doc in async_stream:
165                if self._check_skip(doc):
166                    yield doc
167                    continue
168
169                batch.append(doc)
170                # Batch size reached, apply batch
171                if len(batch) >= self.batch_size:
172                    stats = [get_doc_info(doc) for doc in batch]
173                    batch = await self._try_process(batch, self.apply_batch)
174                    batch = await self._finalize_batch(batch, stats)
175                    for out in batch:
176                        yield out
177                    batch.clear()
178
179            # Flush remaining documents in the batch
180            if batch:
181                stats = [get_doc_info(doc) for doc in batch]
182                batch = await self._try_process(batch, self.apply_batch)
183                batch = await self._finalize_batch(batch, stats)
184                for out in batch:
185                    yield out

Apply the filter to a stream of documents (Iterable or AsyncIterable). If use_batch is set to True at initialization, the filter will process documents in batches. If the stream is not asynchronous, use handle_stream_as_async to convert it to an asynchronous stream.

Even if an exception occurs during processing, the process will continue, and the following actions will be taken:

  • Set the is_rejected flag of the document to True
  • Set the error details in reject_reason
  • Increment the errors count in the statistics retrievable via get_statistics
def get_statistics(self) -> hojichar.core.models.Statistics:
214    def get_statistics(self) -> Statistics:
215        """
216        Get the statistics of this filter.
217        Returns:
218            Statistics: The statistics of this filter.
219        """
220        return self._statistics

Get the statistics of this filter. Returns: Statistics: The statistics of this filter.

def get_statistics_map(self) -> dict[str, hojichar.core.models.Statistics]:
222    def get_statistics_map(self) -> dict[str, Statistics]:
223        """
224        Get the statistics of this filter as a dictionary.
225        """
226        return self._statistics.to_dict()

Get the statistics of this filter as a dictionary.

async def shutdown(self) -> None:
228    async def shutdown(self) -> None:
229        """
230        You can override this method to release resources or perform cleanup tasks.
231        """
232        pass

You can override this method to release resources or perform cleanup tasks.

def get_jsonable_vars(self, exclude_keys: set[str] | None = None) -> dict[str, typing.Any]:
240    def get_jsonable_vars(self, exclude_keys: set[str] | None = None) -> dict[str, Any]:
241        """
242        Get the member variable of this filter.
243        Eligible variables are primitive types; [bool, int, float, str, None],
244        and the name of the variable not starts with the underscore; `_`.
245        """
246        if exclude_keys is None:
247            exclude_keys = set()
248        return {
249            k: v
250            for k, v in vars(self).items()
251            if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_")))
252        }

Get the member variable of this filter. Eligible variables are primitive types; [bool, int, float, str, None], and the name of the variable not starts with the underscore; _.