hojichar

HojiChar: The Text Processing Pipeline

PyPI version Python Versions CI wowkflow codecov PyPI - Downloads

Official docs: https://hojichar.github.io/HojiChar/hojichar.html

Features

  • HojiChar provides a way to combine multiple arbitrary text processing tasks into a streamlined pipeline.
  • The sequence of operations can be described declaratively, ensuring portability.
  • HojiChar allows users to gather detailed statistical information from large amounts of text during processing.
  • It enables management of any Python text processing tasks, providing a Command Line Interface (CLI) capable of parallel processing.

Background and what is for HojiChar

Text preprocessing is far from a one-size-fits-all process. Depending on the data source and the specific task at hand, various steps including normalization, noise removal, and filtering may be necessary. Not all texts require the same level of preprocessing. For instance, relatively clean texts may only need minimal filtering, while "dirtier" sources like Common Crawl data often require more thorough processing. As a result, the preprocessing profile has to be tailored to each specific domain.

Many preprocessing operations can be viewed as filters, taking string as input, applying a transformation, and outputting the processed string. Even though these operations might seem straightforward individually, managing them in a multi-layered, efficient manner can be challenging.

Inspired by torchvision.transforms and iver56/audiomentations, HojiChar addresses these challenges. It enables users to define each text processing step as a class inheriting from hojichar.Filter and use hojichar.Compose to chain them together into a single filter. By writing out the Compose recipe as a profile, the preprocessing process for a specific domain's text can be made portable. Moreover, Compose automatically logs various metrics for each filter, such as byte changes, processing time, and number of rejected texts. This allows users to assess the validity of each operation and consider trade-offs between computation time and performance.

While there are other text normalization tools available, most are designed to perform a specific set of operations. Text preprocessing, despite its importance in the LLM era, is often considered a mundane task compared to machine learning or artificial intelligence tasks. As a result, many existing solutions can be ad hoc, poorly maintained, or inadequately tested. Recognizing these issues, we developed HojiChar as a robust tool for configuring text preprocessing.

Install

pip install hojichar

Defining a Compose Object

The Compose class in HojiChar allows you to create a sequence of text processing filters.

from hojichar import Compose, document_filters

cleaner = Compose([
    document_filters.JSONLoader(key="text"),
    document_filters.AcceptJapanese(),
    document_filters.DocumentLengthFilter(min_doc_len=0,max_doc_len=1000),
    document_filters.ExampleHojiChar(),
    document_filters.JSONDumper()
])

When a Compose object is called, it accepts a string and returns the processed string.

>>> cleaner('{"text": "こんにちは、"}')
{"text": "こんにちは、<hojichar>"}

The filter pipeline above accomplishes the following steps:

  1. Extracts the value from the 'text' key in the JSON object.
  2. Discards the string if it's not in Japanese.
  3. Rejects any text shorter than 0 characters or longer than 1000 characters.
  4. Appends <hojichar> to the string.
  5. Outputs the processed string as JSON with the key "text".

The filters used in the pipeline are predefined filters found in hojichar.filters.

While HojiChar provides some fundamental text processing filters and plans to add more in the future, users can also define their custom filters.

User-defined Filters

A filter composing a Compose object is a class that inherits the Filter class and implements the text processing within the apply function.

from hojichar.core.filter_interface import Filter

class YourFilter(Filter):
    def apply(self, document):
        text = document.text
        """
        Write your text transformation...
        """
        document.text = text
        return document

The apply method accepts a hojichar.Document type as an argument and returns it after the transformations. The Document is a class that encapsulates a string.

Reject documents

  • The hojichar.Document has an is_rejected attribute. If a filter sets this flag to True, Compose will discard the document during processing.

Definition of __init__ for custom filter

When creating a user-defined class and applying a custom constructor, make sure to initialize the parent class.

class YourFilter(Filter):
    def __init__(self, your_param, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.your_param = your_param

    def apply(self, document):
        text = document.text
        text = process(text, self.your_param)
        document.text = text
        return document

This is because The Filter class implicitly has several arguments, one of which is p.

cleaner = Compose([
    document_filters.JSONLoader(key="text"),
    document_filters.AcceptJapanese(p=0.5),
    document_filters.JSONDumper()
])

The p argument passed to the document_filters.AcceptJapanese constructor determines the probability of applying the filter; with a probability of 1-p, it acts as an identity function. This behavior is defined in the parent class hojichar.Filter.

Additional Notes on Compose

  • Even though the behavior of a Compose object when called is a text-in, text-out function, Compose itself also inherits from the Filter class. Therefore, applying the apply method to a Compose object results in hojihcar.Document class being used as input and output.
  • Compose class behaves like a Filter. If you add a Compose object as one of the filters in the constructor of Compose, the filter will be unfolded recursively.
  • You can access various statistics regarding the processing performed by Compose through Compose.statistics or Compose.statistics_obj.

    • Compose.statistics is a dictionary like above.
      {
      "total_info": {
          "processed_num": 10928,
          "discard_num": 5513,
          "input_MB": 104.514584,
          "output_MB": 25.33024,
          "cumulative_time": 114.071047143,
          "total_token_num": 0
      },
      "layers_info": [
          {
          "name": "0-JSONLoader",
          "discard_num": 0,
          "diff_MB": -1.9647932052612305,
          "cumulative_time": 0.420034328,
          "params": {
              "name": "JSONLoader",
              "p": 1,
              "skip_rejected": true,
              "key": "text",
              "ignore": true
          }
          },
          {
          "name": "1-DocumentNormalizer",
          "discard_num": 0,
          "diff_MB": -1.5221118927001953,
          "cumulative_time": 8.286988707,
          "params": {
              "name": "DocumentNormalizer",
              "p": 1,
              "skip_rejected": true
          }
          },
          {
          "name": "2-DocumentLengthFilter",
          "discard_num": 344,
          "diff_MB": -0.05566596984863281,
          "cumulative_time": 0.093768306,
          "params": {
              "name": "DocumentLengthFilter",
              "p": 1,
              "skip_rejected": true,
              "min_doc_len": 100,
              "max_doc_len": null
          }
          },
      ]
      }
      
  • Compose.statistics_obj is a hojichar.StatsContainer class. The hojichar.StatsContainer class stores the raw values of the statistics dictionary, and addition operations are defined to easily calculate the total statistics processed with the same filter. You can get the statistics dictionary by calling Compose.statistics_obj.get_human_readable_values().

Parallel application of Compose

The hojichar.Parallel class allows for the application of Compose to an iterable of Document concurrently. This class empowers users to process vast collections of documents by harnessing the power of multiple CPU cores.

Example usage of Parallel class to proces a very large JSON Lines file concurrently.

import hojichar

input_file = "your_text.jsonl"
input_doc_iter = (hojichar.Document(line) for line in open(input_file))

cleaner = hojichar.Compose([
    hojichar.document_filters.JSONLoader(),
    hojichar.document_filters.DocumentNormalizer(),
    # Insert your filters
    hojichar.document_filters.JSONDumper(),
])

with hojichar.Parallel(cleaner, num_jobs=10) as pfilter:
    out_doc_iter = pfilter.imap_apply(input_doc_iter)
    with open("your_processed_text.jsonl", "w") as fp:
        for doc in out_doc_iter:
            fp.write(doc.text + "\n")
  • Always use the Parallel class within a with statement.
  • Parallel.imap_apply(doc_iter) processes an iterator of Document and returns an iterator of the processed documents.
  • For additional options and details about the Parallel class, please refer to the official documentation.

CLI tool and preprocessing profile

  • HojiChar provides CLI tools for text preprocess pipeline.
  • User defines a series of preprocessing into a python file as profile.

  • Example:

    cat <your_text.jsonl> | hojichar -p your_preprocessing_profile.py -o your_text_preprocessed.jsonl
    
  • hojichar --help

    usage: hojichar [-h] --profile <profile.py> [--args ARGS [ARGS ...]] [--output OUTPUT] [--input INPUT] [--dump-stats <path to stats.json>] [--exit-on-error] [--all] [--jobs JOBS]
    
    options:
    -h, --help            show this help message and exit
    --profile <profile.py>, -p <profile.py>
                            Path to a Python file that implements your custom filter.
    --args ARGS [ARGS ...]
                            Pass additional arguments to the profile. Use it like `--args arg1 arg2` etc. The arguments should be space-separated.
    --output OUTPUT, -o OUTPUT
                            Specifies the path for the output file. Defaults to standard output.
    --input INPUT, -i INPUT
                            Specifies the path for the input file. Defaults to standard input. If set this path, the progress bar is enabled.
    --dump-stats <path to stats.json>
                            Dump statistics to file. If the file exists, it will be appended.
    --exit-on-error       Exit if an exception occurs during filtering. Useful for debugging custom filters.
    --all                 A flag that specifies whether to include discarded samples. This is useful when inspecting discarded samples.
    --jobs JOBS, -j JOBS  The number ob parallel jobs. By default, the nuber of the CPU core.
    

Definition of Profile

  • HojiChar CLI receives a series of preprocessing as a profile.
  • The preprocessing profile is provided as a Python file. Two patterns of the file are allowed.
  • hojichar.utils.load_compose.load_compose() loads these profile.

FILTER profile

  • hojichar.Compose must be defined as FILTER variable.
  • Example.

    import json
    
    from hojichar import Compose, Filter
    from hojichar.filters.document_filters import ExampleHojiChar, JSONLoader
    
    
    class JSONDumper(Filter):
        def apply(self, document):
            text = document.text
            document.text = json.dumps({"text": text}, ensure_ascii=False)
            return document
    
    # FILTER must define Compose object.
    FILTER = Compose(
        [
            JSONLoader(),
            ExampleHojiChar(),
            JSONDumper(),
        ]
    )
    
    • Pass the texts to the filter you have defined using a pipe as follows.
      cat <your_file> | hojichar -p example_profile.py
      
  • hojichar.utils.load_compose.load_filter_from_file() loads this type of profile.

FACTORY profile

  • A callable function that returns hojichar.Compose must be defined as FACTORY variable.
  • The callable can receive arguments. In this way, parameters can be passed to the profile.
    • Some kinds of value are not preferred to static. For example, random seeds and some flags modify the behavior of a filter, etc
    • FACTORY provides a mechanism to pass those values as arguments to the preprocessing.
  • Example.

    import json
    
    from hojichar import Compose, Filter
    from hojichar.filters.document_filters import JSONLoader
    
    
    class AddSomething(Filter): #  Concat some value after every document.
      def __init__(self, something: str, *args, **kwargs) -> None:
          self.something = something
    
      def apply(self, document):
          text = document.text + self.something
          document.text = text
          return document
    
    class JSONDumper(Filter):
      def apply(self, document):
          text = document.text
          document.text = json.dumps({"text": text}, ensure_ascii=False)
          return document
    
    
    def callback(something):
      return Compose(
          [
              JSONLoader(),
              AddSomething(something),
              JSONDumper(),
          ]
      )
    
    # FACTORY must be callable which returns Compose object.
    FACTORY = callback
    
  • Using FACTORY profile with arguments in CLI.

    cat <your_file> | hojichar -p example_profile.py --args arg1 arg2
    
  • hojichar.utils.load_compose.load_parametrized_filter_from_file() or load_factory_from_file loads this type of profile.

For Developers

Installing from the Source Directory

To install the package, execute the following commands:

git clone https://github.com/HojiChar/HojiChar.git
cd HojiChar
poetry install

To install packages related to development, use:

poetry install --extras "dev lint test doc"

Testing

Some filters incorporate doctests. You can run these tests with the command:

pytest --doctest-modules .

This command should be executed from the root of the project.

Code style

  • HojiChar requires type hints for all code. Type checking is performed in continuous integration (CI) in addition to the pytest tests.
  • HojiChar code is subject to inspection by the Flake8 Linter and is formatted using Black and isort. For configuration details, please refer to pyproject.toml. You can perform linting and formatting from the root of the project using the following commands:

Linting

poetry run task lint

Formtatting

poetry run task format

Building the Documentation

We use Pdoc for building the documentation. You can build the documentation using the following command:

pdoc -o docs hojichar

Run this command from the project root.

In practice, the process of building the documentation is automated by CI. When a Pull Request is merged into the main branch, the documentation is built in the docs/ directory of the docs branch. This directory is then deployed to the official documentation site by GitHub Pages.

Creating a Source Tarball

To create a source tarball, for instance, for packaging or distribution, run the following command:

poetry build

The tarball will be created in the dist directory. This command will compile the source code, and the resulting tarball can be installed with no additional dependencies other than the Python standard library.

Creating a Release and Uploading it to PyPI

This command is primarily used by the project manager to create a release and upload it to PyPI.

Versions uploaded to PyPI are identified by git tags. The __version__ variable in __init__.py or the version entry in pyproject.toml are ignored. The poetry-dynamic-versioning Poetry plugin is used to implement this process.

To add the plugin, use:

poetry self add "poetry-dynamic-versioning[plugin]"

The steps to push to PyPI are as follows, although in actuality, the process is automated by CI when a GitHub release is created from the tag.

git checkout v0.1.2
poetry config pypi-token.pypi <API TOKEN>
poetry build 
poetry publish

The actual task for the manager is to apply the appropriate tag to the commit to be released and to create the release from GitHub:

git tag -a v0.1.2 -m "Version 0.1.2"
git push origin v0.1.2
 1"""
 2.. include:: ../README.md
 3"""
 4from .core.composition import Compose
 5from .core.filter_interface import Filter, TokenFilter
 6from .core.inspection import StatsContainer
 7from .core.models import Document, Token
 8from .core.parallel import Parallel
 9from .filters import deduplication, document_filters, token_filters, tokenization
10
11__version__ = "0.0.0"  # Replaced by poetry-dynamic-versioning when deploying
12
13__all__ = [
14    "core",
15    "filters",
16    "utils",
17    "Compose",
18    "Filter",
19    "TokenFilter",
20    "Document",
21    "Token",
22    "Parallel",
23    "StatsContainer",
24    "deduplication",
25    "document_filters",
26    "token_filters",
27    "tokenization",
28]
class Compose(hojichar.Filter):
 19class Compose(Filter):
 20    def __init__(
 21        self,
 22        filters: List[Union[Filter, TokenFilter]],
 23        random_state: Optional[Union[int, np.random.Generator]] = None,
 24        *args: Any,
 25        **kwargs: Any,
 26    ) -> None:
 27        """
 28        Compose a filter from pre-defined filter-objects.
 29        Filter which has `skip_rejected` flag ignores a document which has `is_rejected` flag.
 30        By doing so, Compose avoid applying filters that do not affect the output.
 31
 32        Parameters
 33        ----------
 34        filters : List[Union[Filter, TokenFilter]]
 35            Filter instances which apply to the corpus.
 36
 37        random_state : Union[None, int, np.random.Generator], optional
 38            Default = None
 39            Seed for applying filters randomly.
 40            `random_state` must be int or np.random.Generator instance.
 41        """
 42        super().__init__(*args, **kwargs)
 43        self.set_filters(filters)
 44        self.logger = logging.getLogger("hojichar.Compose")
 45        self.before_process_inspector = Inspector(
 46            target_filter=BeforeProcessFilter(), filter_idx=-1
 47        )
 48        self.inspectors = [
 49            Inspector(target_filter=filter, filter_idx=idx)
 50            for idx, filter in enumerate(self.filters)
 51        ]
 52        self._statistics = StatisticsCounter(self.inspectors)
 53
 54        # Turn random_state into a `np.random.Generator` instance.
 55        if random_state is None:
 56            self.rng = np.random.default_rng()
 57        elif isinstance(random_state, int):
 58            self.rng = np.random.default_rng(random_state)
 59        elif isinstance(random_state, np.random.Generator):
 60            self.rng = random_state
 61        else:
 62            raise ValueError(f"{random_state} cannot be used to seed.")
 63
 64    def set_filters(self, filters: List[Union[Filter, TokenFilter]]) -> None:
 65        """
 66        Set the filter to a Compose object. The filter is expanded if the
 67        list of filters in the argument contains a filter bound by Compose.
 68
 69        Args:
 70            filters (List[Union[Filter, TokenFilter]]): Target filters
 71        """
 72        self.filters: List[Union[Filter, TokenFilter]] = []
 73        for filter in filters:
 74            if isinstance(filter, Compose):
 75                self.filters.extend(filter.filters)
 76            else:
 77                self.filters.append(filter)
 78
 79    def __call__(self, text: str) -> str:
 80        document = Document(text)
 81        document = self.apply(document)
 82        if document.is_rejected:
 83            return ""
 84        else:
 85            return document.text
 86
 87    def _apply_filter(self, filt: Union[Filter, TokenFilter], document: Document) -> Document:
 88        if document.is_rejected and filt.skip_rejected:
 89            pass
 90        else:
 91            if filt.p == 1:
 92                document = filt.apply_filter(document)
 93            else:
 94                if self.rng.random() < filt.p:
 95                    document = filt.apply_filter(document)
 96        return document
 97
 98    def apply(self, document: Document) -> Document:
 99        document = self.before_process_inspector.apply(document)
100        previous_inspector = self.before_process_inspector
101        for i, filt in enumerate(self.filters):
102            inspector = self.inspectors[i]
103            document = self._apply_filter(filt=filt, document=document)
104            document = inspector.apply(document)
105            if (not previous_inspector.is_rejected) and inspector.is_rejected:
106                document.reject_reason = filt.get_jsonalbe_vars(exclude_keys={"skip_rejected"})
107            previous_inspector = inspector
108
109        self._statistics.update_changes(document, self.before_process_inspector, self.inspectors)
110        return document
111
112    @property
113    def statistics(self) -> dict:
114        return self._statistics.get_statistics()
115
116    @property
117    def statistics_obj(self) -> StatsContainer:
118        return self._statistics.stats
119
120    def summary(self, format: str = "print") -> None:
121        info = [
122            {
123                "layer": i,
124                "name": filt.name,
125                "doc": filt.__doc__,
126            }
127            for i, filt in enumerate(self.filters)
128        ]
129
130        def to_json(filter_info: dict) -> dict:
131            filter_info["doc"] = "".join(d.strip() for d in filter_info["doc"].split("\n"))
132            return filter_info
133
134        if format == "json":
135            print(json.dumps(list(map(to_json, info)), ensure_ascii=False, indent="\t"))
136        if format == "print":
137            for layer in info:
138                print(f"[{layer['layer']}] {layer['name']}")
139                pprint.pprint(layer["doc"])

Base class for all filters. Document-level filters must inherit from this class.

The definition of filter function is in apply method. If you define a new filter, you must define the method. When this class is called, apply the filter from string to string.

If the filter create Document.tokens form Document.text, you must implement tokenize method. If the filter update Document.text by merging Document.tokens, you must implement merge method. Do not define a filter that changes both Document.text and Document.token to prevent unexpected behavior.

If you apply the filter to tokens, you can use TokenFilter class.

Parameters

p: float The probability apply the filter organized by hojichar.Compose skip_reject: bool If set True, hojichar.Compose make this filter ignore the document which has is_rejected flag. This flag is True by default since processing discarded documents in subsequent filters is meaningless. However, in some cases, docs that have been rejected need another filter. For example, analyzing false-positive, discarded docs must be passed to JSON Dump filters. In such case, set the skip_reject flag as False and make it pass all docs.

Compose( filters: List[Union[hojichar.Filter, hojichar.TokenFilter]], random_state: Union[int, numpy.random._generator.Generator, NoneType] = None, *args: Any, **kwargs: Any)
20    def __init__(
21        self,
22        filters: List[Union[Filter, TokenFilter]],
23        random_state: Optional[Union[int, np.random.Generator]] = None,
24        *args: Any,
25        **kwargs: Any,
26    ) -> None:
27        """
28        Compose a filter from pre-defined filter-objects.
29        Filter which has `skip_rejected` flag ignores a document which has `is_rejected` flag.
30        By doing so, Compose avoid applying filters that do not affect the output.
31
32        Parameters
33        ----------
34        filters : List[Union[Filter, TokenFilter]]
35            Filter instances which apply to the corpus.
36
37        random_state : Union[None, int, np.random.Generator], optional
38            Default = None
39            Seed for applying filters randomly.
40            `random_state` must be int or np.random.Generator instance.
41        """
42        super().__init__(*args, **kwargs)
43        self.set_filters(filters)
44        self.logger = logging.getLogger("hojichar.Compose")
45        self.before_process_inspector = Inspector(
46            target_filter=BeforeProcessFilter(), filter_idx=-1
47        )
48        self.inspectors = [
49            Inspector(target_filter=filter, filter_idx=idx)
50            for idx, filter in enumerate(self.filters)
51        ]
52        self._statistics = StatisticsCounter(self.inspectors)
53
54        # Turn random_state into a `np.random.Generator` instance.
55        if random_state is None:
56            self.rng = np.random.default_rng()
57        elif isinstance(random_state, int):
58            self.rng = np.random.default_rng(random_state)
59        elif isinstance(random_state, np.random.Generator):
60            self.rng = random_state
61        else:
62            raise ValueError(f"{random_state} cannot be used to seed.")

Compose a filter from pre-defined filter-objects. Filter which has skip_rejected flag ignores a document which has is_rejected flag. By doing so, Compose avoid applying filters that do not affect the output.

Parameters

filters : List[Union[Filter, TokenFilter]] Filter instances which apply to the corpus.

random_state : Union[None, int, np.random.Generator], optional Default = None Seed for applying filters randomly. random_state must be int or np.random.Generator instance.

def set_filters( self, filters: List[Union[hojichar.Filter, hojichar.TokenFilter]]) -> None:
64    def set_filters(self, filters: List[Union[Filter, TokenFilter]]) -> None:
65        """
66        Set the filter to a Compose object. The filter is expanded if the
67        list of filters in the argument contains a filter bound by Compose.
68
69        Args:
70            filters (List[Union[Filter, TokenFilter]]): Target filters
71        """
72        self.filters: List[Union[Filter, TokenFilter]] = []
73        for filter in filters:
74            if isinstance(filter, Compose):
75                self.filters.extend(filter.filters)
76            else:
77                self.filters.append(filter)

Set the filter to a Compose object. The filter is expanded if the list of filters in the argument contains a filter bound by Compose.

Args: filters (List[Union[Filter, TokenFilter]]): Target filters

def apply( self, document: hojichar.Document) -> hojichar.Document:
 98    def apply(self, document: Document) -> Document:
 99        document = self.before_process_inspector.apply(document)
100        previous_inspector = self.before_process_inspector
101        for i, filt in enumerate(self.filters):
102            inspector = self.inspectors[i]
103            document = self._apply_filter(filt=filt, document=document)
104            document = inspector.apply(document)
105            if (not previous_inspector.is_rejected) and inspector.is_rejected:
106                document.reject_reason = filt.get_jsonalbe_vars(exclude_keys={"skip_rejected"})
107            previous_inspector = inspector
108
109        self._statistics.update_changes(document, self.before_process_inspector, self.inspectors)
110        return document

Definition of filter behavior.

In this method, the filter will modify document.text, or set document.is_rejected = True to discard the document.

Do not define a filter that changes both document.text and document.token

Parameters

document : Document Input document

Returns

Document Processed Document

def summary(self, format: str = 'print') -> None:
120    def summary(self, format: str = "print") -> None:
121        info = [
122            {
123                "layer": i,
124                "name": filt.name,
125                "doc": filt.__doc__,
126            }
127            for i, filt in enumerate(self.filters)
128        ]
129
130        def to_json(filter_info: dict) -> dict:
131            filter_info["doc"] = "".join(d.strip() for d in filter_info["doc"].split("\n"))
132            return filter_info
133
134        if format == "json":
135            print(json.dumps(list(map(to_json, info)), ensure_ascii=False, indent="\t"))
136        if format == "print":
137            for layer in info:
138                print(f"[{layer['layer']}] {layer['name']}")
139                pprint.pprint(layer["doc"])
class Filter:
 22class Filter:
 23    """
 24    Base class for all filters.
 25    Document-level filters must inherit from this class.
 26
 27    The definition of filter function is in `apply` method.
 28    If you define a new filter, you must define the method.
 29    When this class is called, apply the filter from string to string.
 30
 31    If the filter create `Document.tokens` form `Document.text`, you
 32    must implement `tokenize` method.
 33    If the filter update `Document.text` by merging `Document.tokens`, you
 34    must implement `merge` method.
 35    Do not define a filter that changes both `Document.text` and `Document.token`
 36    to prevent unexpected behavior.
 37
 38    If you apply the filter to tokens, you can use `TokenFilter` class.
 39
 40    Parameters
 41    ----------
 42    p: float
 43        The probability apply the filter organized by hojichar.Compose
 44    skip_reject: bool
 45        If set `True`, `hojichar.Compose` make this filter ignore the document
 46        which has `is_rejected` flag.
 47        This flag is `True` by default since processing discarded documents
 48        in subsequent filters is meaningless. However, in some cases, docs that
 49        have been rejected need another filter. For example, analyzing false-positive,
 50        discarded docs must be passed to JSON Dump filters. In such case,
 51        set the `skip_reject` flag as `False` and make it pass all docs.
 52    """
 53
 54    def __init__(
 55        self, p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any
 56    ) -> None:
 57        """
 58        Parameters
 59        ----------
 60        p : float, optional
 61            Probability that this filter will be applied. Default=1
 62        """
 63        self.name = self.__class__.__name__
 64        self.logger = logging.getLogger("hojichar.document_filters." + self.name)
 65        assert 0 <= p <= 1
 66        self.p = p
 67        self.skip_rejected = skip_rejected
 68
 69    def apply(self, document: Document) -> Document:
 70        """Definition of filter behavior.
 71
 72        In this method, the filter will modify `document.text`, or
 73        set `document.is_rejected = True` to discard the document.
 74
 75        Do not define a filter that changes both `document.text` and `document.token`
 76
 77        Parameters
 78        ----------
 79        document : Document
 80            Input document
 81
 82        Returns
 83        -------
 84        Document
 85            Processed Document
 86        """
 87        raise NotImplementedError(f"{self.__class__.__name__}.apply method is not defined")
 88        return document
 89
 90    def apply_filter(self, document: Document) -> Document:
 91        document = self.apply(document)
 92        return document
 93
 94    def __call__(self, text: str) -> str:
 95        document = Document(text)
 96        document = self.apply(document)
 97        return document.text
 98
 99    def get_jsonalbe_vars(self, exclude_keys: Optional[Set[str]] = None) -> Dict[str, Any]:
100        """
101        Get the member variable of this filter.
102        Eligible variables are primitive types; [bool, int, float, str, None],
103        and the name of the variable not starts with the underscore; `_`.
104        """
105        if exclude_keys is None:
106            exclude_keys = set()
107        return {
108            k: v
109            for k, v in vars(self).items()
110            if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_")))
111        }

Base class for all filters. Document-level filters must inherit from this class.

The definition of filter function is in apply method. If you define a new filter, you must define the method. When this class is called, apply the filter from string to string.

If the filter create Document.tokens form Document.text, you must implement tokenize method. If the filter update Document.text by merging Document.tokens, you must implement merge method. Do not define a filter that changes both Document.text and Document.token to prevent unexpected behavior.

If you apply the filter to tokens, you can use TokenFilter class.

Parameters

p: float The probability apply the filter organized by hojichar.Compose skip_reject: bool If set True, hojichar.Compose make this filter ignore the document which has is_rejected flag. This flag is True by default since processing discarded documents in subsequent filters is meaningless. However, in some cases, docs that have been rejected need another filter. For example, analyzing false-positive, discarded docs must be passed to JSON Dump filters. In such case, set the skip_reject flag as False and make it pass all docs.

Filter(p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any)
54    def __init__(
55        self, p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any
56    ) -> None:
57        """
58        Parameters
59        ----------
60        p : float, optional
61            Probability that this filter will be applied. Default=1
62        """
63        self.name = self.__class__.__name__
64        self.logger = logging.getLogger("hojichar.document_filters." + self.name)
65        assert 0 <= p <= 1
66        self.p = p
67        self.skip_rejected = skip_rejected

Parameters

p : float, optional Probability that this filter will be applied. Default=1

def apply( self, document: hojichar.Document) -> hojichar.Document:
69    def apply(self, document: Document) -> Document:
70        """Definition of filter behavior.
71
72        In this method, the filter will modify `document.text`, or
73        set `document.is_rejected = True` to discard the document.
74
75        Do not define a filter that changes both `document.text` and `document.token`
76
77        Parameters
78        ----------
79        document : Document
80            Input document
81
82        Returns
83        -------
84        Document
85            Processed Document
86        """
87        raise NotImplementedError(f"{self.__class__.__name__}.apply method is not defined")
88        return document

Definition of filter behavior.

In this method, the filter will modify document.text, or set document.is_rejected = True to discard the document.

Do not define a filter that changes both document.text and document.token

Parameters

document : Document Input document

Returns

Document Processed Document

def apply_filter( self, document: hojichar.Document) -> hojichar.Document:
90    def apply_filter(self, document: Document) -> Document:
91        document = self.apply(document)
92        return document
def get_jsonalbe_vars(self, exclude_keys: Optional[Set[str]] = None) -> Dict[str, Any]:
 99    def get_jsonalbe_vars(self, exclude_keys: Optional[Set[str]] = None) -> Dict[str, Any]:
100        """
101        Get the member variable of this filter.
102        Eligible variables are primitive types; [bool, int, float, str, None],
103        and the name of the variable not starts with the underscore; `_`.
104        """
105        if exclude_keys is None:
106            exclude_keys = set()
107        return {
108            k: v
109            for k, v in vars(self).items()
110            if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_")))
111        }

Get the member variable of this filter. Eligible variables are primitive types; [bool, int, float, str, None], and the name of the variable not starts with the underscore; _.

class TokenFilter:
114class TokenFilter:
115    """
116    Base class for token-level filters.
117
118    Token filters, which shuld be implemented in hojichar/filters/token_filters.py,
119    must inherit from this class.
120    """
121
122    def __init__(
123        self, p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any
124    ) -> None:
125        self.name = self.__class__.__name__
126        self.logger = logging.getLogger("hojichar.token_filters." + self.name)
127        assert 0 <= p <= 1
128        self.p = p
129        self.skip_rejected = skip_rejected
130
131    def apply(self, token: Token) -> Token:
132        raise NotImplementedError(f"{self.__class__.__name__}.apply method is not defined")
133        return token
134
135    def apply_filter(self, document: Document) -> Document:
136        document.tokens = [self.apply(token) for token in document.tokens if not token.is_rejected]
137        return document
138
139    def __call__(self, text: str) -> str:
140        token = Token(text)
141        token = self.apply(token)
142        return token.text
143
144    def get_jsonable_vars(self) -> dict:
145        # Output key-values of member variables that can be obtained by var(self), except "logger".
146        exclude_keys = ["logger"]
147        return dict(filter(lambda item: item[0] not in exclude_keys, vars(self).items()))
148
149    def get_jsonalbe_vars(self, exclude_keys: Optional[Set[str]] = None) -> dict:
150        """
151        Get the member variable of this filter.
152        Eligible variables are primitive types; [bool, int, float, str, None],
153        and the name of the variable not starts with the underscore; `_`.
154        """
155        if exclude_keys is None:
156            exclude_keys = set()
157        return {
158            k: v
159            for k, v in vars(self).items()
160            if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_")))
161        }

Base class for token-level filters.

Token filters, which shuld be implemented in hojichar/filters/token_filters.py, must inherit from this class.

TokenFilter(p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any)
122    def __init__(
123        self, p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any
124    ) -> None:
125        self.name = self.__class__.__name__
126        self.logger = logging.getLogger("hojichar.token_filters." + self.name)
127        assert 0 <= p <= 1
128        self.p = p
129        self.skip_rejected = skip_rejected
def apply(self, token: hojichar.Token) -> hojichar.Token:
131    def apply(self, token: Token) -> Token:
132        raise NotImplementedError(f"{self.__class__.__name__}.apply method is not defined")
133        return token
def apply_filter( self, document: hojichar.Document) -> hojichar.Document:
135    def apply_filter(self, document: Document) -> Document:
136        document.tokens = [self.apply(token) for token in document.tokens if not token.is_rejected]
137        return document
def get_jsonable_vars(self) -> dict:
144    def get_jsonable_vars(self) -> dict:
145        # Output key-values of member variables that can be obtained by var(self), except "logger".
146        exclude_keys = ["logger"]
147        return dict(filter(lambda item: item[0] not in exclude_keys, vars(self).items()))
def get_jsonalbe_vars(self, exclude_keys: Optional[Set[str]] = None) -> dict:
149    def get_jsonalbe_vars(self, exclude_keys: Optional[Set[str]] = None) -> dict:
150        """
151        Get the member variable of this filter.
152        Eligible variables are primitive types; [bool, int, float, str, None],
153        and the name of the variable not starts with the underscore; `_`.
154        """
155        if exclude_keys is None:
156            exclude_keys = set()
157        return {
158            k: v
159            for k, v in vars(self).items()
160            if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_")))
161        }

Get the member variable of this filter. Eligible variables are primitive types; [bool, int, float, str, None], and the name of the variable not starts with the underscore; _.

class Document:
19class Document:
20    def __init__(
21        self, text: str, is_rejected: bool = False, tokens: Optional[List[Token]] = None
22    ) -> None:
23        self.text = text
24        self.__original = text
25        self.is_rejected = is_rejected
26        if tokens is None:
27            self.tokens: List[Token] = []
28
29        self.dedup_lsh: List[str] = []
30        self.reject_reason: Dict[str, Any] = {}
31
32    @property
33    def original(self) -> str:
34        return self.__original
35
36    def set_tokens(self, tokens: List[str]) -> None:
37        self.tokens = [Token(token) for token in tokens]
38
39    def get_tokens(self) -> List[str]:
40        return [token.text for token in self.tokens]
41
42    def __str__(self) -> str:
43        return self.text
Document( text: str, is_rejected: bool = False, tokens: Optional[List[hojichar.Token]] = None)
20    def __init__(
21        self, text: str, is_rejected: bool = False, tokens: Optional[List[Token]] = None
22    ) -> None:
23        self.text = text
24        self.__original = text
25        self.is_rejected = is_rejected
26        if tokens is None:
27            self.tokens: List[Token] = []
28
29        self.dedup_lsh: List[str] = []
30        self.reject_reason: Dict[str, Any] = {}
def set_tokens(self, tokens: List[str]) -> None:
36    def set_tokens(self, tokens: List[str]) -> None:
37        self.tokens = [Token(token) for token in tokens]
def get_tokens(self) -> List[str]:
39    def get_tokens(self) -> List[str]:
40        return [token.text for token in self.tokens]
class Token:
 5class Token:
 6    def __init__(self, text: str, is_rejected: bool = False) -> None:
 7        self.text = text
 8        self.__original = text
 9        self.is_rejected = is_rejected
10
11    @property
12    def original(self) -> str:
13        return self.__original
14
15    def __str__(self) -> str:
16        return self.text
Token(text: str, is_rejected: bool = False)
6    def __init__(self, text: str, is_rejected: bool = False) -> None:
7        self.text = text
8        self.__original = text
9        self.is_rejected = is_rejected
class Parallel:
 47class Parallel:
 48    """
 49    The Parallel class provides a way to apply a hojichar.Compose filter
 50    to an iterator of documents in a parallel manner using a specified
 51    number of worker processes. This class should be used as a context
 52    manager with a 'with' statement.
 53
 54    Example:
 55
 56    doc_iter = (hojichar.Document(d) for d in open("my_text.txt"))
 57    with Parallel(my_filter, num_jobs=8) as pfilter:
 58        for doc in pfilter.imap_apply(doc_iter):
 59            pass  # Process the filtered document as needed.
 60    """
 61
 62    def __init__(
 63        self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False
 64    ):
 65        """
 66        Initializes a new instance of the Parallel class.
 67
 68        Args:
 69            filter (hojichar.Compose): A composed filter object that specifies the
 70                processing operations to apply to each document in parallel.
 71                A copy of the filter is made within a 'with' statement. When the 'with'
 72                block terminates,the statistical information obtained through `filter.statistics`
 73                or`filter.statistics_obj` is replaced with the total value of the statistical
 74                information processed within the 'with' block.
 75
 76            num_jobs (int | None, optional): The number of worker processes to use.
 77                If None, then the number returned by os.cpu_count() is used. Defaults to None.
 78            ignore_errors (bool, optional): If set to True, any exceptions thrown during
 79                the processing of a document will be caught and logged, but will not
 80                stop the processing of further documents. If set to False, the first
 81                exception thrown will terminate the entire parallel processing operation.
 82                Defaults to False.
 83        """
 84        self.filter = filter
 85        self.num_jobs = num_jobs
 86        self.ignore_errors = ignore_errors
 87
 88        self._pool: multiprocessing.pool.Pool | None = None
 89        self._pid_stats: dict[int, StatsContainer] | None = None
 90
 91    def __enter__(self) -> Parallel:
 92        self._pool = multiprocessing.Pool(
 93            processes=self.num_jobs,
 94            initializer=_init_worker,
 95            initargs=(self.filter, self.ignore_errors),
 96        )
 97        self._pid_stats = dict()
 98        return self
 99
100    def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]:
101        """
102        Takes an iterator of Documents and applies the Compose filter to
103        each Document in a parallel manner. This is a generator method
104        that yields processed Documents.
105
106        Args:
107            docs (Iterator[hojichar.Document]): An iterator of Documents to be processed.
108
109        Raises:
110            RuntimeError: If the Parallel instance is not properly initialized. This
111                generally happens when the method is called outside of a 'with' statement.
112            Exception: If any exceptions are raised within the worker processes.
113
114        Yields:
115            Iterator[hojichar.Document]: An iterator that yields processed Documents.
116        """
117        if self._pool is None or self._pid_stats is None:
118            raise RuntimeError(
119                "Parallel instance not properly initialized. Use within a 'with' statement."
120            )
121        try:
122            for doc, pid, stats_obj, err_msg in self._pool.imap_unordered(_worker, docs):
123                self._pid_stats[pid] = stats_obj
124                if err_msg is not None:
125                    logger.error(f"Error in worker {pid}: {err_msg}")
126                yield doc
127        except Exception:
128            self.__exit__(None, None, None)
129            raise
130
131    def __exit__(self, exc_type, exc_value, traceback) -> None:  # type: ignore
132        if self._pool:
133            self._pool.terminate()
134            self._pool.join()
135        if self._pid_stats:
136            self.filter._statistics.stats = self.filter._statistics.stats + functools.reduce(
137                lambda x, y: x + y, self._pid_stats.values()
138            )
139
140    @property
141    def statistics_obj(self) -> StatsContainer:
142        """
143        Returns a statistics object of the total statistical
144        values processed within the Parallel block.
145
146        Returns:
147            StatsContainer: Statistics object
148        """
149        if self._pid_stats:
150            stats: StatsContainer = functools.reduce(lambda x, y: x + y, self._pid_stats.values())
151        else:
152            stats = copy(self.filter.statistics_obj).reset()
153        return stats
154
155    @property
156    def statistics(self) -> dict:
157        """
158        Returns a statistics dict which friendly with human of the total statistical
159        values processed within the Parallel block.
160
161        Returns:
162            dict: Human readable statistics values
163        """
164        return self.statistics_obj.get_human_readable_values()

The Parallel class provides a way to apply a hojichar.Compose filter to an iterator of documents in a parallel manner using a specified number of worker processes. This class should be used as a context manager with a 'with' statement.

Example:

doc_iter = (hojichar.Document(d) for d in open("my_text.txt")) with Parallel(my_filter, num_jobs=8) as pfilter: for doc in pfilter.imap_apply(doc_iter): pass # Process the filtered document as needed.

Parallel( filter: hojichar.Compose, num_jobs: 'int | None' = None, ignore_errors: bool = False)
62    def __init__(
63        self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False
64    ):
65        """
66        Initializes a new instance of the Parallel class.
67
68        Args:
69            filter (hojichar.Compose): A composed filter object that specifies the
70                processing operations to apply to each document in parallel.
71                A copy of the filter is made within a 'with' statement. When the 'with'
72                block terminates,the statistical information obtained through `filter.statistics`
73                or`filter.statistics_obj` is replaced with the total value of the statistical
74                information processed within the 'with' block.
75
76            num_jobs (int | None, optional): The number of worker processes to use.
77                If None, then the number returned by os.cpu_count() is used. Defaults to None.
78            ignore_errors (bool, optional): If set to True, any exceptions thrown during
79                the processing of a document will be caught and logged, but will not
80                stop the processing of further documents. If set to False, the first
81                exception thrown will terminate the entire parallel processing operation.
82                Defaults to False.
83        """
84        self.filter = filter
85        self.num_jobs = num_jobs
86        self.ignore_errors = ignore_errors
87
88        self._pool: multiprocessing.pool.Pool | None = None
89        self._pid_stats: dict[int, StatsContainer] | None = None

Initializes a new instance of the Parallel class.

Args: filter (hojichar.Compose): A composed filter object that specifies the processing operations to apply to each document in parallel. A copy of the filter is made within a 'with' statement. When the 'with' block terminates,the statistical information obtained through filter.statistics orfilter.statistics_obj is replaced with the total value of the statistical information processed within the 'with' block.

num_jobs (int | None, optional): The number of worker processes to use.
    If None, then the number returned by os.cpu_count() is used. Defaults to None.
ignore_errors (bool, optional): If set to True, any exceptions thrown during
    the processing of a document will be caught and logged, but will not
    stop the processing of further documents. If set to False, the first
    exception thrown will terminate the entire parallel processing operation.
    Defaults to False.
def imap_apply( self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]:
100    def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]:
101        """
102        Takes an iterator of Documents and applies the Compose filter to
103        each Document in a parallel manner. This is a generator method
104        that yields processed Documents.
105
106        Args:
107            docs (Iterator[hojichar.Document]): An iterator of Documents to be processed.
108
109        Raises:
110            RuntimeError: If the Parallel instance is not properly initialized. This
111                generally happens when the method is called outside of a 'with' statement.
112            Exception: If any exceptions are raised within the worker processes.
113
114        Yields:
115            Iterator[hojichar.Document]: An iterator that yields processed Documents.
116        """
117        if self._pool is None or self._pid_stats is None:
118            raise RuntimeError(
119                "Parallel instance not properly initialized. Use within a 'with' statement."
120            )
121        try:
122            for doc, pid, stats_obj, err_msg in self._pool.imap_unordered(_worker, docs):
123                self._pid_stats[pid] = stats_obj
124                if err_msg is not None:
125                    logger.error(f"Error in worker {pid}: {err_msg}")
126                yield doc
127        except Exception:
128            self.__exit__(None, None, None)
129            raise

Takes an iterator of Documents and applies the Compose filter to each Document in a parallel manner. This is a generator method that yields processed Documents.

Args: docs (Iterator[hojichar.Document]): An iterator of Documents to be processed.

Raises: RuntimeError: If the Parallel instance is not properly initialized. This generally happens when the method is called outside of a 'with' statement. Exception: If any exceptions are raised within the worker processes.

Yields: Iterator[hojichar.Document]: An iterator that yields processed Documents.

statistics_obj: hojichar.StatsContainer

Returns a statistics object of the total statistical values processed within the Parallel block.

Returns: StatsContainer: Statistics object

statistics: dict

Returns a statistics dict which friendly with human of the total statistical values processed within the Parallel block.

Returns: dict: Human readable statistics values

@dataclasses.dataclass
class StatsContainer:
115@dataclasses.dataclass
116class StatsContainer:
117    total_info: DocStatistics
118    layers_info: Dict[str, FilterStatistics]  # Key of the dict is filter name.
119
120    def __add__(self, other: StatsContainer) -> StatsContainer:
121        assert self.layers_info.keys() == other.layers_info.keys(), "Layer names must match"
122        return StatsContainer(
123            self.total_info + other.total_info,
124            {k: v + other.layers_info[k] for k, v in self.layers_info.items()},
125        )
126
127    def get_human_readable_values(self) -> dict:
128        return {
129            "total_info": self.total_info.get_human_readable_values(),
130            "layers_info": [
131                layer.get_human_readable_values() for layer in self.layers_info.values()
132            ],
133        }
134
135    def reset(self) -> StatsContainer:
136        self.total_info.reset
137        for layer in self.layers_info.values():
138            layer.reset()
139        return self
StatsContainer( total_info: hojichar.core.inspection.DocStatistics, layers_info: Dict[str, hojichar.core.inspection.FilterStatistics])
def get_human_readable_values(self) -> dict:
127    def get_human_readable_values(self) -> dict:
128        return {
129            "total_info": self.total_info.get_human_readable_values(),
130            "layers_info": [
131                layer.get_human_readable_values() for layer in self.layers_info.values()
132            ],
133        }
def reset(self) -> hojichar.StatsContainer:
135    def reset(self) -> StatsContainer:
136        self.total_info.reset
137        for layer in self.layers_info.values():
138            layer.reset()
139        return self