hojichar
HojiChar: The Text Processing Pipeline
Official docs: https://hojichar.github.io/HojiChar/hojichar.html
Features
- HojiChar provides a way to combine multiple arbitrary text processing tasks into a streamlined pipeline.
- The sequence of operations can be described declaratively, ensuring portability.
- HojiChar allows users to gather detailed statistical information from large amounts of text during processing.
- It enables management of any Python text processing tasks, providing a Command Line Interface (CLI) capable of parallel processing.
Background and what is for HojiChar
Text preprocessing is far from a one-size-fits-all process. Depending on the data source and the specific task at hand, various steps including normalization, noise removal, and filtering may be necessary. Not all texts require the same level of preprocessing. For instance, relatively clean texts may only need minimal filtering, while "dirtier" sources like Common Crawl data often require more thorough processing. As a result, the preprocessing profile has to be tailored to each specific domain.
Many preprocessing operations can be viewed as filters, taking string as input, applying a transformation, and outputting the processed string. Even though these operations might seem straightforward individually, managing them in a multi-layered, efficient manner can be challenging.
Inspired by torchvision.transforms
and iver56/audiomentations, HojiChar addresses these challenges. It enables users to define each text processing step as a class inheriting from hojichar.Filter
and use hojichar.Compose
to chain them together into a single filter. By writing out the Compose
recipe as a profile, the preprocessing process for a specific domain's text can be made portable. Moreover, Compose
automatically logs various metrics for each filter, such as byte changes, processing time, and number of rejected texts. This allows users to assess the validity of each operation and consider trade-offs between computation time and performance.
While there are other text normalization tools available, most are designed to perform a specific set of operations. Text preprocessing, despite its importance in the LLM era, is often considered a mundane task compared to machine learning or artificial intelligence tasks. As a result, many existing solutions can be ad hoc, poorly maintained, or inadequately tested. Recognizing these issues, we developed HojiChar as a robust tool for configuring text preprocessing.
Install
pip install hojichar
If you want to use the additional filters, install the package with the following command:
pip install hojichar[all]
Defining a Compose Object
The Compose
class in HojiChar allows you to create a sequence of text processing filters.
from hojichar import Compose, document_filters
cleaner = Compose([
document_filters.JSONLoader(key="text"),
document_filters.AcceptJapanese(),
document_filters.DocumentLengthFilter(min_doc_len=0,max_doc_len=1000),
document_filters.ExampleHojiChar(),
document_filters.JSONDumper()
])
When a Compose
object is called, it accepts a string and returns the processed string.
>>> cleaner('{"text": "こんにちは、"}')
{"text": "こんにちは、<hojichar>"}
The filter pipeline above accomplishes the following steps:
- Extracts the value from the
'text'
key in the JSON object. - Discards the string if it's not in Japanese.
- Rejects any text shorter than 0 characters or longer than 1000 characters.
- Appends
<hojichar>
to the string. - Outputs the processed string as JSON with the key "text".
The filters used in the pipeline are predefined filters found in hojichar.filters
.
While HojiChar provides some fundamental text processing filters and plans to add more in the future, users can also define their custom filters.
User-defined Filters
A filter composing a Compose
object is a class that inherits the Filter
class and implements the text processing within the apply
function.
from hojichar.core.filter_interface import Filter
class YourFilter(Filter):
def apply(self, document):
text = document.text
"""
Write your text transformation...
"""
document.text = text
return document
The apply
method accepts a hojichar.Document
type as an argument and returns it after the transformations. The Document
is a class that encapsulates a string.
The Document class can have additional metadata via the extras attribute. This allows you to associate values with the document that can be utilized in subsequent filters. Reject documents
- The
hojichar.Document
has anis_rejected
attribute. If a filter sets this flag toTrue
,Compose
will discard the document during processing.
Definition of __init__
for custom filter
When creating a user-defined class and applying a custom constructor, make sure to initialize the parent class.
class YourFilter(Filter):
def __init__(self, your_param, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.your_param = your_param
def apply(self, document):
text = document.text
text = process(text, self.your_param)
document.text = text
return document
This is because The Filter
class implicitly has several arguments, one of which is p
.
cleaner = Compose([
document_filters.JSONLoader(key="text"),
document_filters.AcceptJapanese(p=0.5),
document_filters.JSONDumper()
])
The p
argument passed to the document_filters.AcceptJapanese
constructor determines the probability of applying the filter; with a probability of 1-p
, it acts as an identity function. This behavior is defined in the parent class hojichar.Filter
.
Additional Notes on Compose
- Even though the behavior of a
Compose
object when called is a text-in, text-out function,Compose
itself also inherits from theFilter
class. Therefore, applying theapply
method to aCompose
object results inhojihcar.Document
class being used as input and output. Compose
class behaves like a Filter. If you add a Compose object as one of the filters in the constructor of Compose, the filter will be unfolded recursively.You can access various statistics regarding the processing performed by
Compose
throughCompose.statistics
orCompose.statistics_obj
.Compose.statistics
is a dictionary like above.{ "total_info": { "processed_num": 10928, "discard_num": 5513, "input_MB": 104.514584, "output_MB": 25.33024, "cumulative_time": 114.071047143, "total_token_num": 0 }, "layers_info": [ { "name": "0-JSONLoader", "discard_num": 0, "diff_MB": -1.9647932052612305, "cumulative_time": 0.420034328, "params": { "name": "JSONLoader", "p": 1, "skip_rejected": true, "key": "text", "ignore": true } }, { "name": "1-DocumentNormalizer", "discard_num": 0, "diff_MB": -1.5221118927001953, "cumulative_time": 8.286988707, "params": { "name": "DocumentNormalizer", "p": 1, "skip_rejected": true } }, { "name": "2-DocumentLengthFilter", "discard_num": 344, "diff_MB": -0.05566596984863281, "cumulative_time": 0.093768306, "params": { "name": "DocumentLengthFilter", "p": 1, "skip_rejected": true, "min_doc_len": 100, "max_doc_len": null } }, ] }
Compose.statistics_obj
is ahojichar.StatsContainer
class. Thehojichar.StatsContainer
class stores the raw values of the statistics dictionary, and addition operations are defined to easily calculate the total statistics processed with the same filter. You can get the statistics dictionary by callingCompose.statistics_obj.get_human_readable_values()
.
Parallel application of Compose
The hojichar.Parallel
class allows for the application of Compose
to an iterable of Document
concurrently. This class empowers users to process vast collections of documents by harnessing the power of multiple CPU cores.
Example usage of Parallel
class to proces a very large JSON Lines file concurrently.
import hojichar
input_file = "your_text.jsonl"
input_doc_iter = (hojichar.Document(line) for line in open(input_file))
cleaner = hojichar.Compose([
hojichar.document_filters.JSONLoader(),
hojichar.document_filters.DocumentNormalizer(),
# Insert your filters
hojichar.document_filters.JSONDumper(),
])
with hojichar.Parallel(cleaner, num_jobs=10) as pfilter:
out_doc_iter = pfilter.imap_apply(input_doc_iter)
with open("your_processed_text.jsonl", "w") as fp:
for doc in out_doc_iter:
fp.write(doc.text + "\n")
- Always use the
Parallel
class within awith
statement. Parallel.imap_apply(doc_iter)
processes an iterator ofDocument
and returns an iterator of the processed documents.- For additional options and details about the
Parallel
class, please refer to the official documentation.
CLI tool and preprocessing profile
- HojiChar provides CLI tools for text preprocess pipeline.
User defines a series of preprocessing into a python file as profile.
Example:
cat <your_text.jsonl> | hojichar -p your_preprocessing_profile.py -o your_text_preprocessed.jsonl
hojichar --help
usage: hojichar [-h] --profile <profile.py> [--args ARGS [ARGS ...]] [--output OUTPUT] [--input INPUT] [--dump-stats <path to stats.json>] [--exit-on-error] [--all] [--jobs JOBS] options: -h, --help show this help message and exit --profile <profile.py>, -p <profile.py> Path to a Python file that implements your custom filter. --args ARGS [ARGS ...] Pass additional arguments to the profile. Use it like `--args arg1 arg2` etc. The arguments should be space-separated. --output OUTPUT, -o OUTPUT Specifies the path for the output file. Defaults to standard output. --input INPUT, -i INPUT Specifies the path for the input file. Defaults to standard input. If set this path, the progress bar is enabled. --dump-stats <path to stats.json> Dump statistics to file. If the file exists, it will be appended. --exit-on-error Exit if an exception occurs during filtering. Useful for debugging custom filters. --all A flag that specifies whether to include discarded samples. This is useful when inspecting discarded samples. --jobs JOBS, -j JOBS The number ob parallel jobs. By default, the nuber of the CPU core.
Definition of Profile
- HojiChar CLI receives a series of preprocessing as a profile.
- The preprocessing profile is provided as a Python file. Two patterns of the file are allowed.
- hojichar.utils.load_compose.load_compose() loads these profile.
FILTER
profile
hojichar.Compose
must be defined asFILTER
variable.Example.
import json from hojichar import Compose, Filter from hojichar.filters.document_filters import ExampleHojiChar, JSONLoader class JSONDumper(Filter): def apply(self, document): text = document.text document.text = json.dumps({"text": text}, ensure_ascii=False) return document # FILTER must define Compose object. FILTER = Compose( [ JSONLoader(), ExampleHojiChar(), JSONDumper(), ] )
- Pass the texts to the filter you have defined using a pipe as follows.
cat <your_file> | hojichar -p example_profile.py
- Pass the texts to the filter you have defined using a pipe as follows.
hojichar.utils.load_compose.load_filter_from_file()
loads this type of profile.
FACTORY
profile
- A callable function that returns
hojichar.Compose
must be defined asFACTORY
variable. - The callable can receive arguments. In this way, parameters can be passed to the profile.
- Some kinds of value are not preferred to static. For example, random seeds and some flags modify the behavior of a filter, etc
FACTORY
provides a mechanism to pass those values as arguments to the preprocessing.
Example.
import json from hojichar import Compose, Filter from hojichar.filters.document_filters import JSONLoader class AddSomething(Filter): # Concat some value after every document. def __init__(self, something: str, *args, **kwargs) -> None: self.something = something def apply(self, document): text = document.text + self.something document.text = text return document class JSONDumper(Filter): def apply(self, document): text = document.text document.text = json.dumps({"text": text}, ensure_ascii=False) return document def callback(something): return Compose( [ JSONLoader(), AddSomething(something), JSONDumper(), ] ) # FACTORY must be callable which returns Compose object. FACTORY = callback
Using
FACTORY
profile with arguments in CLI.cat <your_file> | hojichar -p example_profile.py --args arg1 arg2
hojichar.utils.load_compose.load_parametrized_filter_from_file()
orload_factory_from_file
loads this type of profile.
For Developers
Installing from the Source Directory
To install the package, execute the following commands:
git clone https://github.com/HojiChar/HojiChar.git
cd HojiChar
poetry install
To install packages related to development, use:
poetry install --extras "dev lint test doc"
Testing
Some filters incorporate doctests. You can run these tests with the command:
pytest --doctest-modules .
This command should be executed from the root of the project.
Code style
- HojiChar requires type hints for all code. Type checking is performed in continuous integration (CI) in addition to the pytest tests.
- HojiChar code is subject to inspection by the Flake8 Linter and is formatted using Black and isort. For configuration details, please refer to
pyproject.toml
. You can perform linting and formatting from the root of the project using the following commands:
Linting
poetry run task lint
Formatting
poetry run task format
Building the Documentation
We use Pdoc for building the documentation. You can build the documentation using the following command:
pdoc -o docs hojichar
Run this command from the project root.
In practice, the process of building the documentation is automated by CI. When a Pull Request is merged into the main branch, the documentation is built in the docs/
directory of the docs
branch. This directory is then deployed to the official documentation site by GitHub Pages.
Creating a Source Tarball
To create a source tarball, for instance, for packaging or distribution, run the following command:
poetry build
The tarball will be created in the dist directory. This command will compile the source code, and the resulting tarball can be installed with no additional dependencies other than the Python standard library.
Creating a Release and Uploading it to PyPI
This command is primarily used by the project manager to create a release and upload it to PyPI.
Versions uploaded to PyPI are identified by git tags. The __version__
variable in __init__.py
or the version
entry in pyproject.toml
are ignored. The poetry-dynamic-versioning
Poetry plugin is used to implement this process.
To add the plugin, use:
poetry self add "poetry-dynamic-versioning[plugin]"
The steps to push to PyPI are as follows, although in actuality, the process is automated by CI when a GitHub release is created from the tag.
git checkout v0.1.2
poetry config pypi-token.pypi <API TOKEN>
poetry build
poetry publish
The actual task for the manager is to apply the appropriate tag to the commit to be released and to create the release from GitHub:
git tag -a v0.1.2 -m "Version 0.1.2"
git push origin v0.1.2
1""" 2.. include:: ../README.md 3""" 4from .core.composition import Compose 5from .core.filter_interface import Filter, TokenFilter 6from .core.inspection import StatsContainer 7from .core.models import Document, Token 8from .core.parallel import Parallel 9from .filters import ( 10 deduplication, 11 document_filters, 12 language_identification, 13 token_filters, 14 tokenization, 15) 16 17__version__ = "0.0.0" # Replaced by poetry-dynamic-versioning when deploying 18 19__all__ = [ 20 "core", 21 "filters", 22 "utils", 23 "Compose", 24 "Filter", 25 "TokenFilter", 26 "Document", 27 "Token", 28 "Parallel", 29 "StatsContainer", 30 "deduplication", 31 "document_filters", 32 "language_identification", 33 "token_filters", 34 "tokenization", 35]
19class Compose(Filter): 20 def __init__( 21 self, 22 filters: List[Union[Filter, TokenFilter]], 23 random_state: Optional[Union[int, np.random.Generator]] = None, 24 *args: Any, 25 **kwargs: Any, 26 ) -> None: 27 """ 28 Compose a filter from pre-defined filter-objects. 29 Filter which has `skip_rejected` flag ignores a document which has `is_rejected` flag. 30 By doing so, Compose avoid applying filters that do not affect the output. 31 32 Parameters 33 ---------- 34 filters : List[Union[Filter, TokenFilter]] 35 Filter instances which apply to the corpus. 36 37 random_state : Union[None, int, np.random.Generator], optional 38 Default = None 39 Seed for applying filters randomly. 40 `random_state` must be int or np.random.Generator instance. 41 """ 42 super().__init__(*args, **kwargs) 43 self.set_filters(filters) 44 self.logger = logging.getLogger("hojichar.Compose") 45 self.before_process_inspector = Inspector( 46 target_filter=BeforeProcessFilter(), filter_idx=-1 47 ) 48 self.inspectors = [ 49 Inspector(target_filter=filter, filter_idx=idx) 50 for idx, filter in enumerate(self.filters) 51 ] 52 self._statistics = StatisticsCounter(self.inspectors) 53 54 # Turn random_state into a `np.random.Generator` instance. 55 if random_state is None: 56 self.rng = np.random.default_rng() 57 elif isinstance(random_state, int): 58 self.rng = np.random.default_rng(random_state) 59 elif isinstance(random_state, np.random.Generator): 60 self.rng = random_state 61 else: 62 raise ValueError(f"{random_state} cannot be used to seed.") 63 64 def set_filters(self, filters: List[Union[Filter, TokenFilter]]) -> None: 65 """ 66 Set the filter to a Compose object. The filter is expanded if the 67 list of filters in the argument contains a filter bound by Compose. 68 69 Args: 70 filters (List[Union[Filter, TokenFilter]]): Target filters 71 """ 72 self.filters: List[Union[Filter, TokenFilter]] = [] 73 for filter in filters: 74 if isinstance(filter, Compose): 75 self.filters.extend(filter.filters) 76 else: 77 self.filters.append(filter) 78 79 def __call__(self, text: str) -> str: 80 document = Document(text) 81 document = self.apply(document) 82 if document.is_rejected: 83 return "" 84 else: 85 return document.text 86 87 def _apply_filter(self, filt: Union[Filter, TokenFilter], document: Document) -> Document: 88 if document.is_rejected and filt.skip_rejected: 89 pass 90 else: 91 if filt.p == 1: 92 document = filt.apply_filter(document) 93 else: 94 if self.rng.random() < filt.p: 95 document = filt.apply_filter(document) 96 return document 97 98 def apply(self, document: Document) -> Document: 99 document = self.before_process_inspector.apply(document) 100 previous_inspector = self.before_process_inspector 101 for i, filt in enumerate(self.filters): 102 inspector = self.inspectors[i] 103 document = self._apply_filter(filt=filt, document=document) 104 document = inspector.apply(document) 105 if (not previous_inspector.is_rejected) and inspector.is_rejected: 106 document.reject_reason = filt.get_jsonalbe_vars(exclude_keys={"skip_rejected"}) 107 previous_inspector = inspector 108 109 self._statistics.update_changes(document, self.before_process_inspector, self.inspectors) 110 return document 111 112 @property 113 def statistics(self) -> dict: 114 return self._statistics.get_statistics() 115 116 @property 117 def statistics_obj(self) -> StatsContainer: 118 return self._statistics.stats 119 120 def summary(self, format: str = "print") -> None: 121 info = [ 122 { 123 "layer": i, 124 "name": filt.name, 125 "doc": filt.__doc__, 126 } 127 for i, filt in enumerate(self.filters) 128 ] 129 130 def to_json(filter_info: dict) -> dict: 131 filter_info["doc"] = "".join(d.strip() for d in filter_info["doc"].split("\n")) 132 return filter_info 133 134 if format == "json": 135 print(json.dumps(list(map(to_json, info)), ensure_ascii=False, indent="\t")) 136 if format == "print": 137 for layer in info: 138 print(f"[{layer['layer']}] {layer['name']}") 139 pprint.pprint(layer["doc"])
Base class for all filters. Document-level filters must inherit from this class.
The definition of filter function is in apply
method.
If you define a new filter, you must define the method.
When this class is called, apply the filter from string to string.
If the filter create Document.tokens
form Document.text
, you
must implement tokenize
method.
If the filter update Document.text
by merging Document.tokens
, you
must implement merge
method.
Do not define a filter that changes both Document.text
and Document.token
to prevent unexpected behavior.
If you apply the filter to tokens, you can use TokenFilter
class.
Parameters
p: float
The probability apply the filter organized by hojichar.Compose
skip_reject: bool
If set True
, hojichar.Compose
make this filter ignore the document
which has is_rejected
flag.
This flag is True
by default since processing discarded documents
in subsequent filters is meaningless. However, in some cases, docs that
have been rejected need another filter. For example, analyzing false-positive,
discarded docs must be passed to JSON Dump filters. In such case,
set the skip_reject
flag as False
and make it pass all docs.
20 def __init__( 21 self, 22 filters: List[Union[Filter, TokenFilter]], 23 random_state: Optional[Union[int, np.random.Generator]] = None, 24 *args: Any, 25 **kwargs: Any, 26 ) -> None: 27 """ 28 Compose a filter from pre-defined filter-objects. 29 Filter which has `skip_rejected` flag ignores a document which has `is_rejected` flag. 30 By doing so, Compose avoid applying filters that do not affect the output. 31 32 Parameters 33 ---------- 34 filters : List[Union[Filter, TokenFilter]] 35 Filter instances which apply to the corpus. 36 37 random_state : Union[None, int, np.random.Generator], optional 38 Default = None 39 Seed for applying filters randomly. 40 `random_state` must be int or np.random.Generator instance. 41 """ 42 super().__init__(*args, **kwargs) 43 self.set_filters(filters) 44 self.logger = logging.getLogger("hojichar.Compose") 45 self.before_process_inspector = Inspector( 46 target_filter=BeforeProcessFilter(), filter_idx=-1 47 ) 48 self.inspectors = [ 49 Inspector(target_filter=filter, filter_idx=idx) 50 for idx, filter in enumerate(self.filters) 51 ] 52 self._statistics = StatisticsCounter(self.inspectors) 53 54 # Turn random_state into a `np.random.Generator` instance. 55 if random_state is None: 56 self.rng = np.random.default_rng() 57 elif isinstance(random_state, int): 58 self.rng = np.random.default_rng(random_state) 59 elif isinstance(random_state, np.random.Generator): 60 self.rng = random_state 61 else: 62 raise ValueError(f"{random_state} cannot be used to seed.")
Compose a filter from pre-defined filter-objects.
Filter which has skip_rejected
flag ignores a document which has is_rejected
flag.
By doing so, Compose avoid applying filters that do not affect the output.
Parameters
filters : List[Union[Filter, TokenFilter]] Filter instances which apply to the corpus.
random_state : Union[None, int, np.random.Generator], optional
Default = None
Seed for applying filters randomly.
random_state
must be int or np.random.Generator instance.
64 def set_filters(self, filters: List[Union[Filter, TokenFilter]]) -> None: 65 """ 66 Set the filter to a Compose object. The filter is expanded if the 67 list of filters in the argument contains a filter bound by Compose. 68 69 Args: 70 filters (List[Union[Filter, TokenFilter]]): Target filters 71 """ 72 self.filters: List[Union[Filter, TokenFilter]] = [] 73 for filter in filters: 74 if isinstance(filter, Compose): 75 self.filters.extend(filter.filters) 76 else: 77 self.filters.append(filter)
Set the filter to a Compose object. The filter is expanded if the list of filters in the argument contains a filter bound by Compose.
Args: filters (List[Union[Filter, TokenFilter]]): Target filters
98 def apply(self, document: Document) -> Document: 99 document = self.before_process_inspector.apply(document) 100 previous_inspector = self.before_process_inspector 101 for i, filt in enumerate(self.filters): 102 inspector = self.inspectors[i] 103 document = self._apply_filter(filt=filt, document=document) 104 document = inspector.apply(document) 105 if (not previous_inspector.is_rejected) and inspector.is_rejected: 106 document.reject_reason = filt.get_jsonalbe_vars(exclude_keys={"skip_rejected"}) 107 previous_inspector = inspector 108 109 self._statistics.update_changes(document, self.before_process_inspector, self.inspectors) 110 return document
Definition of filter behavior.
In this method, the filter will modify document.text
, or
set document.is_rejected = True
to discard the document.
Do not define a filter that changes both document.text
and document.token
Parameters
document : Document Input document
Returns
Document Processed Document
120 def summary(self, format: str = "print") -> None: 121 info = [ 122 { 123 "layer": i, 124 "name": filt.name, 125 "doc": filt.__doc__, 126 } 127 for i, filt in enumerate(self.filters) 128 ] 129 130 def to_json(filter_info: dict) -> dict: 131 filter_info["doc"] = "".join(d.strip() for d in filter_info["doc"].split("\n")) 132 return filter_info 133 134 if format == "json": 135 print(json.dumps(list(map(to_json, info)), ensure_ascii=False, indent="\t")) 136 if format == "print": 137 for layer in info: 138 print(f"[{layer['layer']}] {layer['name']}") 139 pprint.pprint(layer["doc"])
Inherited Members
22class Filter: 23 """ 24 Base class for all filters. 25 Document-level filters must inherit from this class. 26 27 The definition of filter function is in `apply` method. 28 If you define a new filter, you must define the method. 29 When this class is called, apply the filter from string to string. 30 31 If the filter create `Document.tokens` form `Document.text`, you 32 must implement `tokenize` method. 33 If the filter update `Document.text` by merging `Document.tokens`, you 34 must implement `merge` method. 35 Do not define a filter that changes both `Document.text` and `Document.token` 36 to prevent unexpected behavior. 37 38 If you apply the filter to tokens, you can use `TokenFilter` class. 39 40 Parameters 41 ---------- 42 p: float 43 The probability apply the filter organized by hojichar.Compose 44 skip_reject: bool 45 If set `True`, `hojichar.Compose` make this filter ignore the document 46 which has `is_rejected` flag. 47 This flag is `True` by default since processing discarded documents 48 in subsequent filters is meaningless. However, in some cases, docs that 49 have been rejected need another filter. For example, analyzing false-positive, 50 discarded docs must be passed to JSON Dump filters. In such case, 51 set the `skip_reject` flag as `False` and make it pass all docs. 52 """ 53 54 def __init__( 55 self, p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any 56 ) -> None: 57 """ 58 Parameters 59 ---------- 60 p : float, optional 61 Probability that this filter will be applied. Default=1 62 """ 63 self.name = self.__class__.__name__ 64 self.logger = logging.getLogger("hojichar.document_filters." + self.name) 65 assert 0 <= p <= 1 66 self.p = p 67 self.skip_rejected = skip_rejected 68 69 def apply(self, document: Document) -> Document: 70 """Definition of filter behavior. 71 72 In this method, the filter will modify `document.text`, or 73 set `document.is_rejected = True` to discard the document. 74 75 Do not define a filter that changes both `document.text` and `document.token` 76 77 Parameters 78 ---------- 79 document : Document 80 Input document 81 82 Returns 83 ------- 84 Document 85 Processed Document 86 """ 87 raise NotImplementedError(f"{self.__class__.__name__}.apply method is not defined") 88 return document 89 90 def apply_filter(self, document: Document) -> Document: 91 document = self.apply(document) 92 return document 93 94 def __call__(self, text: str) -> str: 95 document = Document(text) 96 document = self.apply(document) 97 return document.text 98 99 def get_jsonalbe_vars(self, exclude_keys: Optional[Set[str]] = None) -> Dict[str, Any]: 100 """ 101 Get the member variable of this filter. 102 Eligible variables are primitive types; [bool, int, float, str, None], 103 and the name of the variable not starts with the underscore; `_`. 104 """ 105 if exclude_keys is None: 106 exclude_keys = set() 107 return { 108 k: v 109 for k, v in vars(self).items() 110 if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_"))) 111 }
Base class for all filters. Document-level filters must inherit from this class.
The definition of filter function is in apply
method.
If you define a new filter, you must define the method.
When this class is called, apply the filter from string to string.
If the filter create Document.tokens
form Document.text
, you
must implement tokenize
method.
If the filter update Document.text
by merging Document.tokens
, you
must implement merge
method.
Do not define a filter that changes both Document.text
and Document.token
to prevent unexpected behavior.
If you apply the filter to tokens, you can use TokenFilter
class.
Parameters
p: float
The probability apply the filter organized by hojichar.Compose
skip_reject: bool
If set True
, hojichar.Compose
make this filter ignore the document
which has is_rejected
flag.
This flag is True
by default since processing discarded documents
in subsequent filters is meaningless. However, in some cases, docs that
have been rejected need another filter. For example, analyzing false-positive,
discarded docs must be passed to JSON Dump filters. In such case,
set the skip_reject
flag as False
and make it pass all docs.
54 def __init__( 55 self, p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any 56 ) -> None: 57 """ 58 Parameters 59 ---------- 60 p : float, optional 61 Probability that this filter will be applied. Default=1 62 """ 63 self.name = self.__class__.__name__ 64 self.logger = logging.getLogger("hojichar.document_filters." + self.name) 65 assert 0 <= p <= 1 66 self.p = p 67 self.skip_rejected = skip_rejected
Parameters
p : float, optional Probability that this filter will be applied. Default=1
69 def apply(self, document: Document) -> Document: 70 """Definition of filter behavior. 71 72 In this method, the filter will modify `document.text`, or 73 set `document.is_rejected = True` to discard the document. 74 75 Do not define a filter that changes both `document.text` and `document.token` 76 77 Parameters 78 ---------- 79 document : Document 80 Input document 81 82 Returns 83 ------- 84 Document 85 Processed Document 86 """ 87 raise NotImplementedError(f"{self.__class__.__name__}.apply method is not defined") 88 return document
Definition of filter behavior.
In this method, the filter will modify document.text
, or
set document.is_rejected = True
to discard the document.
Do not define a filter that changes both document.text
and document.token
Parameters
document : Document Input document
Returns
Document Processed Document
99 def get_jsonalbe_vars(self, exclude_keys: Optional[Set[str]] = None) -> Dict[str, Any]: 100 """ 101 Get the member variable of this filter. 102 Eligible variables are primitive types; [bool, int, float, str, None], 103 and the name of the variable not starts with the underscore; `_`. 104 """ 105 if exclude_keys is None: 106 exclude_keys = set() 107 return { 108 k: v 109 for k, v in vars(self).items() 110 if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_"))) 111 }
Get the member variable of this filter.
Eligible variables are primitive types; [bool, int, float, str, None],
and the name of the variable not starts with the underscore; _
.
114class TokenFilter: 115 """ 116 Base class for token-level filters. 117 118 Token filters, which shuld be implemented in hojichar/filters/token_filters.py, 119 must inherit from this class. 120 """ 121 122 def __init__( 123 self, p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any 124 ) -> None: 125 self.name = self.__class__.__name__ 126 self.logger = logging.getLogger("hojichar.token_filters." + self.name) 127 assert 0 <= p <= 1 128 self.p = p 129 self.skip_rejected = skip_rejected 130 131 def apply(self, token: Token) -> Token: 132 raise NotImplementedError(f"{self.__class__.__name__}.apply method is not defined") 133 return token 134 135 def apply_filter(self, document: Document) -> Document: 136 document.tokens = [self.apply(token) for token in document.tokens if not token.is_rejected] 137 return document 138 139 def __call__(self, text: str) -> str: 140 token = Token(text) 141 token = self.apply(token) 142 return token.text 143 144 def get_jsonable_vars(self) -> dict: 145 # Output key-values of member variables that can be obtained by var(self), except "logger". 146 exclude_keys = ["logger"] 147 return dict(filter(lambda item: item[0] not in exclude_keys, vars(self).items())) 148 149 def get_jsonalbe_vars(self, exclude_keys: Optional[Set[str]] = None) -> dict: 150 """ 151 Get the member variable of this filter. 152 Eligible variables are primitive types; [bool, int, float, str, None], 153 and the name of the variable not starts with the underscore; `_`. 154 """ 155 if exclude_keys is None: 156 exclude_keys = set() 157 return { 158 k: v 159 for k, v in vars(self).items() 160 if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_"))) 161 }
Base class for token-level filters.
Token filters, which shuld be implemented in hojichar/filters/token_filters.py, must inherit from this class.
122 def __init__( 123 self, p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any 124 ) -> None: 125 self.name = self.__class__.__name__ 126 self.logger = logging.getLogger("hojichar.token_filters." + self.name) 127 assert 0 <= p <= 1 128 self.p = p 129 self.skip_rejected = skip_rejected
149 def get_jsonalbe_vars(self, exclude_keys: Optional[Set[str]] = None) -> dict: 150 """ 151 Get the member variable of this filter. 152 Eligible variables are primitive types; [bool, int, float, str, None], 153 and the name of the variable not starts with the underscore; `_`. 154 """ 155 if exclude_keys is None: 156 exclude_keys = set() 157 return { 158 k: v 159 for k, v in vars(self).items() 160 if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_"))) 161 }
Get the member variable of this filter.
Eligible variables are primitive types; [bool, int, float, str, None],
and the name of the variable not starts with the underscore; _
.
19class Document: 20 def __init__( 21 self, 22 text: str, 23 is_rejected: bool = False, 24 tokens: Optional[List[Token]] = None, 25 extras: Optional[Dict[str, Any]] = None, 26 ) -> None: 27 self.text = text 28 self.__original = text 29 self.is_rejected = is_rejected 30 if tokens is None: 31 self.tokens: List[Token] = [] 32 33 if extras is None: 34 self.extras: Dict[str, Any] = {} 35 else: 36 self.extras = extras 37 38 self.dedup_lsh: List[str] = [] 39 self.reject_reason: Dict[str, Any] = {} 40 41 @property 42 def original(self) -> str: 43 return self.__original 44 45 def set_tokens(self, tokens: List[str]) -> None: 46 self.tokens = [Token(token) for token in tokens] 47 48 def get_tokens(self) -> List[str]: 49 return [token.text for token in self.tokens] 50 51 def __str__(self) -> str: 52 return self.text 53 54 def __repr__(self) -> str: 55 return f"Document(text={self.text!r}, is_rejected={self.is_rejected}, extras={self.extras})" # noqa
20 def __init__( 21 self, 22 text: str, 23 is_rejected: bool = False, 24 tokens: Optional[List[Token]] = None, 25 extras: Optional[Dict[str, Any]] = None, 26 ) -> None: 27 self.text = text 28 self.__original = text 29 self.is_rejected = is_rejected 30 if tokens is None: 31 self.tokens: List[Token] = [] 32 33 if extras is None: 34 self.extras: Dict[str, Any] = {} 35 else: 36 self.extras = extras 37 38 self.dedup_lsh: List[str] = [] 39 self.reject_reason: Dict[str, Any] = {}
47class Parallel: 48 """ 49 The Parallel class provides a way to apply a hojichar.Compose filter 50 to an iterator of documents in a parallel manner using a specified 51 number of worker processes. This class should be used as a context 52 manager with a 'with' statement. 53 54 Example: 55 56 doc_iter = (hojichar.Document(d) for d in open("my_text.txt")) 57 with Parallel(my_filter, num_jobs=8) as pfilter: 58 for doc in pfilter.imap_apply(doc_iter): 59 pass # Process the filtered document as needed. 60 """ 61 62 def __init__( 63 self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False 64 ): 65 """ 66 Initializes a new instance of the Parallel class. 67 68 Args: 69 filter (hojichar.Compose): A composed filter object that specifies the 70 processing operations to apply to each document in parallel. 71 A copy of the filter is made within a 'with' statement. When the 'with' 72 block terminates,the statistical information obtained through `filter.statistics` 73 or`filter.statistics_obj` is replaced with the total value of the statistical 74 information processed within the 'with' block. 75 76 num_jobs (int | None, optional): The number of worker processes to use. 77 If None, then the number returned by os.cpu_count() is used. Defaults to None. 78 ignore_errors (bool, optional): If set to True, any exceptions thrown during 79 the processing of a document will be caught and logged, but will not 80 stop the processing of further documents. If set to False, the first 81 exception thrown will terminate the entire parallel processing operation. 82 Defaults to False. 83 """ 84 self.filter = filter 85 self.num_jobs = num_jobs 86 self.ignore_errors = ignore_errors 87 88 self._pool: multiprocessing.pool.Pool | None = None 89 self._pid_stats: dict[int, StatsContainer] | None = None 90 91 def __enter__(self) -> Parallel: 92 self._pool = multiprocessing.Pool( 93 processes=self.num_jobs, 94 initializer=_init_worker, 95 initargs=(self.filter, self.ignore_errors), 96 ) 97 self._pid_stats = dict() 98 return self 99 100 def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]: 101 """ 102 Takes an iterator of Documents and applies the Compose filter to 103 each Document in a parallel manner. This is a generator method 104 that yields processed Documents. 105 106 Args: 107 docs (Iterator[hojichar.Document]): An iterator of Documents to be processed. 108 109 Raises: 110 RuntimeError: If the Parallel instance is not properly initialized. This 111 generally happens when the method is called outside of a 'with' statement. 112 Exception: If any exceptions are raised within the worker processes. 113 114 Yields: 115 Iterator[hojichar.Document]: An iterator that yields processed Documents. 116 """ 117 if self._pool is None or self._pid_stats is None: 118 raise RuntimeError( 119 "Parallel instance not properly initialized. Use within a 'with' statement." 120 ) 121 try: 122 for doc, pid, stats_obj, err_msg in self._pool.imap_unordered(_worker, docs): 123 self._pid_stats[pid] = stats_obj 124 if err_msg is not None: 125 logger.error(f"Error in worker {pid}: {err_msg}") 126 yield doc 127 except Exception: 128 self.__exit__(None, None, None) 129 raise 130 131 def __exit__(self, exc_type, exc_value, traceback) -> None: # type: ignore 132 if self._pool: 133 self._pool.terminate() 134 self._pool.join() 135 if self._pid_stats: 136 self.filter._statistics.stats = self.filter._statistics.stats + functools.reduce( 137 lambda x, y: x + y, self._pid_stats.values() 138 ) 139 140 @property 141 def statistics_obj(self) -> StatsContainer: 142 """ 143 Returns a statistics object of the total statistical 144 values processed within the Parallel block. 145 146 Returns: 147 StatsContainer: Statistics object 148 """ 149 if self._pid_stats: 150 stats: StatsContainer = functools.reduce(lambda x, y: x + y, self._pid_stats.values()) 151 else: 152 stats = copy(self.filter.statistics_obj).reset() 153 return stats 154 155 @property 156 def statistics(self) -> dict: 157 """ 158 Returns a statistics dict which friendly with human of the total statistical 159 values processed within the Parallel block. 160 161 Returns: 162 dict: Human readable statistics values 163 """ 164 return self.statistics_obj.get_human_readable_values()
The Parallel class provides a way to apply a hojichar.Compose filter to an iterator of documents in a parallel manner using a specified number of worker processes. This class should be used as a context manager with a 'with' statement.
Example:
doc_iter = (hojichar.Document(d) for d in open("my_text.txt")) with Parallel(my_filter, num_jobs=8) as pfilter: for doc in pfilter.imap_apply(doc_iter): pass # Process the filtered document as needed.
62 def __init__( 63 self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False 64 ): 65 """ 66 Initializes a new instance of the Parallel class. 67 68 Args: 69 filter (hojichar.Compose): A composed filter object that specifies the 70 processing operations to apply to each document in parallel. 71 A copy of the filter is made within a 'with' statement. When the 'with' 72 block terminates,the statistical information obtained through `filter.statistics` 73 or`filter.statistics_obj` is replaced with the total value of the statistical 74 information processed within the 'with' block. 75 76 num_jobs (int | None, optional): The number of worker processes to use. 77 If None, then the number returned by os.cpu_count() is used. Defaults to None. 78 ignore_errors (bool, optional): If set to True, any exceptions thrown during 79 the processing of a document will be caught and logged, but will not 80 stop the processing of further documents. If set to False, the first 81 exception thrown will terminate the entire parallel processing operation. 82 Defaults to False. 83 """ 84 self.filter = filter 85 self.num_jobs = num_jobs 86 self.ignore_errors = ignore_errors 87 88 self._pool: multiprocessing.pool.Pool | None = None 89 self._pid_stats: dict[int, StatsContainer] | None = None
Initializes a new instance of the Parallel class.
Args:
filter (hojichar.Compose): A composed filter object that specifies the
processing operations to apply to each document in parallel.
A copy of the filter is made within a 'with' statement. When the 'with'
block terminates,the statistical information obtained through filter.statistics
orfilter.statistics_obj
is replaced with the total value of the statistical
information processed within the 'with' block.
num_jobs (int | None, optional): The number of worker processes to use.
If None, then the number returned by os.cpu_count() is used. Defaults to None.
ignore_errors (bool, optional): If set to True, any exceptions thrown during
the processing of a document will be caught and logged, but will not
stop the processing of further documents. If set to False, the first
exception thrown will terminate the entire parallel processing operation.
Defaults to False.
100 def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]: 101 """ 102 Takes an iterator of Documents and applies the Compose filter to 103 each Document in a parallel manner. This is a generator method 104 that yields processed Documents. 105 106 Args: 107 docs (Iterator[hojichar.Document]): An iterator of Documents to be processed. 108 109 Raises: 110 RuntimeError: If the Parallel instance is not properly initialized. This 111 generally happens when the method is called outside of a 'with' statement. 112 Exception: If any exceptions are raised within the worker processes. 113 114 Yields: 115 Iterator[hojichar.Document]: An iterator that yields processed Documents. 116 """ 117 if self._pool is None or self._pid_stats is None: 118 raise RuntimeError( 119 "Parallel instance not properly initialized. Use within a 'with' statement." 120 ) 121 try: 122 for doc, pid, stats_obj, err_msg in self._pool.imap_unordered(_worker, docs): 123 self._pid_stats[pid] = stats_obj 124 if err_msg is not None: 125 logger.error(f"Error in worker {pid}: {err_msg}") 126 yield doc 127 except Exception: 128 self.__exit__(None, None, None) 129 raise
Takes an iterator of Documents and applies the Compose filter to each Document in a parallel manner. This is a generator method that yields processed Documents.
Args: docs (Iterator[hojichar.Document]): An iterator of Documents to be processed.
Raises: RuntimeError: If the Parallel instance is not properly initialized. This generally happens when the method is called outside of a 'with' statement. Exception: If any exceptions are raised within the worker processes.
Yields: Iterator[hojichar.Document]: An iterator that yields processed Documents.
Returns a statistics object of the total statistical values processed within the Parallel block.
Returns: StatsContainer: Statistics object
115@dataclasses.dataclass 116class StatsContainer: 117 total_info: DocStatistics 118 layers_info: Dict[str, FilterStatistics] # Key of the dict is filter name. 119 120 def __add__(self, other: StatsContainer) -> StatsContainer: 121 assert self.layers_info.keys() == other.layers_info.keys(), "Layer names must match" 122 return StatsContainer( 123 self.total_info + other.total_info, 124 {k: v + other.layers_info[k] for k, v in self.layers_info.items()}, 125 ) 126 127 def get_human_readable_values(self) -> dict: 128 return { 129 "total_info": self.total_info.get_human_readable_values(), 130 "layers_info": [ 131 layer.get_human_readable_values() for layer in self.layers_info.values() 132 ], 133 } 134 135 def reset(self) -> StatsContainer: 136 self.total_info.reset 137 for layer in self.layers_info.values(): 138 layer.reset() 139 return self