hojichar
HojiChar: The Text Processing Pipeline
Official docs: https://hojichar.github.io/HojiChar/hojichar.html
Features
- HojiChar provides a way to combine multiple arbitrary text processing tasks into a streamlined pipeline.
- The sequence of operations can be described declaratively, ensuring portability.
- HojiChar allows users to gather detailed statistical information from large amounts of text during processing.
- It enables management of any Python text processing tasks, providing a Command Line Interface (CLI) capable of parallel processing.
Background and what is for HojiChar
Text preprocessing is far from a one-size-fits-all process. Depending on the data source and the specific task at hand, various steps including normalization, noise removal, and filtering may be necessary. Not all texts require the same level of preprocessing. For instance, relatively clean texts may only need minimal filtering, while "dirtier" sources like Common Crawl data often require more thorough processing. As a result, the preprocessing profile has to be tailored to each specific domain.
Many preprocessing operations can be viewed as filters, taking string as input, applying a transformation, and outputting the processed string. Even though these operations might seem straightforward individually, managing them in a multi-layered, efficient manner can be challenging.
Inspired by torchvision.transforms
and iver56/audiomentations, HojiChar addresses these challenges. It enables users to define each text processing step as a class inheriting from hojichar.Filter
and use hojichar.Compose
to chain them together into a single filter. By writing out the Compose
recipe as a profile, the preprocessing process for a specific domain's text can be made portable. Moreover, Compose
automatically logs various metrics for each filter, such as byte changes, processing time, and number of rejected texts. This allows users to assess the validity of each operation and consider trade-offs between computation time and performance.
While there are other text normalization tools available, most are designed to perform a specific set of operations. Text preprocessing, despite its importance in the LLM era, is often considered a mundane task compared to machine learning or artificial intelligence tasks. As a result, many existing solutions can be ad hoc, poorly maintained, or inadequately tested. Recognizing these issues, we developed HojiChar as a robust tool for configuring text preprocessing.
Install
pip install hojichar
If you want to use the additional filters, install the package with the following command:
pip install 'hojichar[all]'
If you want to use AsyncChatAPI
filter, install the package with the following command:
pip install 'hojichar[openai]'
Defining a Compose Object
The Compose
class in HojiChar allows you to create a sequence of text processing filters.
from hojichar import Compose, document_filters
cleaner = Compose([
document_filters.JSONLoader(key="text"),
document_filters.AcceptJapanese(),
document_filters.DocumentLengthFilter(min_doc_len=0,max_doc_len=1000),
document_filters.ExampleHojiChar(),
document_filters.JSONDumper()
])
When a Compose
object is called, it accepts a string and returns the processed string.
>>> cleaner('{"text": "こんにちは、"}')
{"text": "こんにちは、<hojichar>"}
The filter pipeline above accomplishes the following steps:
- Extracts the value from the
'text'
key in the JSON object. - Discards the string if it's not in Japanese.
- Rejects any text shorter than 0 characters or longer than 1000 characters.
- Appends
<hojichar>
to the string. - Outputs the processed string as JSON with the key "text".
The filters used in the pipeline are predefined filters found in hojichar.filters
.
While HojiChar provides some fundamental text processing filters and plans to add more in the future, users can also define their custom filters.
User-defined Filters
A filter composing a Compose
object is a class that inherits the Filter
class and implements the text processing within the apply
function.
from hojichar.core.filter_interface import Filter
class YourFilter(Filter):
def apply(self, document):
text = document.text
"""
Write your text transformation...
"""
document.text = text
return document
The apply
method accepts a hojichar.Document
type as an argument and returns it after the transformations. The Document
is a class that encapsulates a string.
The Document class can have additional metadata via the extras attribute. This allows you to associate values with the document that can be utilized in subsequent filters. Reject documents
- The
hojichar.Document
has anis_rejected
attribute. If a filter sets this flag toTrue
,Compose
will discard the document during processing.
Definition of __init__
for custom filter
When creating a user-defined class and applying a custom constructor, make sure to initialize the parent class.
class YourFilter(Filter):
def __init__(self, your_param, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.your_param = your_param
def apply(self, document):
text = document.text
text = process(text, self.your_param)
document.text = text
return document
This is because The Filter
class implicitly has several arguments, one of which is p
.
cleaner = Compose([
document_filters.JSONLoader(key="text"),
document_filters.AcceptJapanese(p=0.5),
document_filters.JSONDumper()
])
The p
argument passed to the document_filters.AcceptJapanese
constructor determines the probability of applying the filter; with a probability of 1-p
, it acts as an identity function. This behavior is defined in the parent class hojichar.Filter
.
Batch and Stream Processing with apply_batch
and apply_stream
The Filter
and Compose
classes support efficient batch and stream processing through the apply_batch
and apply_stream
methods.
apply_batch
- The
apply_batch
method processes a list ofDocument
objects in one go. By default, it applies theapply
method to each document individually. - Users can override
apply_batch
in custom filters for optimized batch operations.class YourBatchFilter(Filter): def apply_batch(self, documents: Sequence[Document]) -> list[Document]: # Implement your batch processing logic here return documents
apply_stream
The apply_stream
method processes an iterable (e.g., generator) of Document
objects, ideal for large datasets or stream-based processing. If the use_batch
flag is set to True
in a Filter
's constructor, its apply_batch implementation will be utilized during stream processing.
Example Usage:
stream = (Document(f"text {i}") for i in range(10000))
processed_stream = cleaner.apply_stream(stream)
for doc in processed_stream:
print(doc.text)
This allows HojiChar to efficiently process massive corpora while maintaining low memory consumption.
Additional Notes on Compose
- Even though the behavior of a
Compose
object when called is a text-in, text-out function,Compose
itself also inherits from theFilter
class. Therefore, applying theapply
method to aCompose
object results inhojihcar.Document
class being used as input and output. Compose
class behaves like a Filter. If you add a Compose object as one of the filters in the constructor of Compose, the filter will be unfolded recursively.
HojiChar running asynchronously
- HojiChar supports asynchronous processing of text data using the
AsyncCompose
class. This allows you to build pipelines that can handle out-of-CPU processing, such as making API calls. You can define async versions of filter using the
AsyncFilter
class.from hojichar import AsyncFilter class YourAsyncFilter(AsyncFilter): async def apply(self, document): text = document.text # Perform asynchronous processing here document.text = text return document
The
AsyncCompose
class accepts bothFilter
andAsyncFilter
objects, allowing you to mix synchronous and asynchronous filters in a single pipeline.
Example
Nowadays, text processing is enhanced by the intelligence of LLMs.
This example demonstrates how to use the AsyncChatAPI
filter to process text data with OpenAI compatible APIs. This filter allows you to build high throughput of "Chain of LLMs" easily.
import os
from hojichar import AsyncCompose
from hojichar.filters.document_filters import JSONLoader, JSONDumper
from hojichar.utils.async_handlers import write_stream_to_file
async_pipeline = AsyncCompose(
[
JSONLoader(input_key="text"),
AsyncChatAPI(
model_id="gpt-4o",
openai_endpoint_url="https://api.openai.com/v1",
openai_api_key=os.getenv("OPENAI_API_KEY"),
max_concurrent_requests=128,
output_key="llm_output",
message_generator=lambda doc: [{"role": "user", "content": doc.text[:1000]}],
),
JSONDumper(export_extras=True),
]
)
with open("input.jsonl") as f:
with async_pipeline:
async_output_stream = (str(doc) async for doc in async_pipeline.apply_stream(f))
await write_stream_to_file(async_output_stream, "output.jsonl", chunk_size=128) # Write async-iterable to file efficiently
- You can use this filter by installing
'hojichar[openai]'
- The filter works with OpenAI compatible APIs, like the endpoint hosted by vLLM. It's useful for text-augumentation tasks.
- The AsyncChatAPI works 1K req/sec with optimized vLLM server. (We reccomend to use
uvloop
to get better throughput.)
- The AsyncChatAPI works 1K req/sec with optimized vLLM server. (We reccomend to use
Get Metrics of processing
HojiChar tracks detailed statistics at both the filter and pipeline levels, helping you monitor and debug your processing pipeline.
Each Filter
(including Compose
) maintains a Statistics
object containing information such as input size, output size, discarded document count, and processing time.
Example: Getting Statistics from a Compose Object
stats = cleaner.get_total_statistics_map()
print(stats)
[{'cumulative_time_ns': 337250,
'diff_bytes': 10,
'diff_chars': 10,
'discard_num': 0,
'input_bytes': 45,
'input_chars': 23,
'input_num': 1,
'name': 'Total',
'output_bytes': 55,
'output_chars': 33,
'output_num': 1},
{'cumulative_time_ns': 80209,
'diff_bytes': -12,
'diff_chars': -12,
'discard_num': 0,
'input_bytes': 45,
'input_chars': 23,
'input_num': 1,
'name': '0-JSONLoader',
'output_bytes': 33,
'output_chars': 11,
'output_num': 1},
{'cumulative_time_ns': 17500,
'diff_bytes': 0,
'diff_chars': 0,
'discard_num': 0,
'input_bytes': 33,
'input_chars': 11,
'input_num': 1,
'name': '1-AcceptJapanese',
'output_bytes': 33,
'output_chars': 11,
'output_num': 1},
{'cumulative_time_ns': 8125,
'diff_bytes': 0,
'diff_chars': 0,
'discard_num': 0,
'input_bytes': 33,
'input_chars': 11,
'input_num': 1,
'name': '2-DocumentLengthFilter',
'output_bytes': 33,
'output_chars': 11,
'output_num': 1},
{'cumulative_time_ns': 6042,
'diff_bytes': 10,
'diff_chars': 10,
'discard_num': 0,
'input_bytes': 33,
'input_chars': 11,
'input_num': 1,
'name': '3-ExampleHojiChar',
'output_bytes': 43,
'output_chars': 21,
'output_num': 1},
{'cumulative_time_ns': 81125,
'diff_bytes': 12,
'diff_chars': 12,
'discard_num': 0,
'input_bytes': 43,
'input_chars': 21,
'input_num': 1,
'name': '4-JSONDumper',
'output_bytes': 55,
'output_chars': 33,
'output_num': 1}]
- Use
get_statistics()
to get the raw Statistics object for any filter. - Use
get_total_statistics()
to get a list of statistics for all filters in a Compose pipeline. - Use
get_total_statistics_map()
to retrieve the statistics as a list of dicts.
These tools allow granular monitoring of how each filter contributes to data reduction, rejection, or transformation.
Parallel application of Compose
The hojichar.Parallel
class allows for the application of Compose
to an iterable of Document
concurrently. This class empowers users to process vast collections of documents by harnessing the power of multiple CPU cores.
Example usage of Parallel
class to proces a very large JSON Lines file concurrently.
import hojichar
input_file = "your_text.jsonl"
input_doc_iter = (hojichar.Document(line) for line in open(input_file))
cleaner = hojichar.Compose([
hojichar.document_filters.JSONLoader(),
hojichar.document_filters.DocumentNormalizer(),
# Insert your filters
hojichar.document_filters.JSONDumper(),
])
with hojichar.Parallel(cleaner, num_jobs=10) as pfilter:
out_doc_iter = pfilter.imap_apply(input_doc_iter)
with open("your_processed_text.jsonl", "w") as fp:
for doc in out_doc_iter:
fp.write(doc.text + "\n")
- Always use the
Parallel
class within awith
statement. Parallel.imap_apply(doc_iter)
processes an iterator ofDocument
and returns an iterator of the processed documents.- For additional options and details about the
Parallel
class, please refer to the official documentation.
CLI tool and preprocessing profile
- HojiChar provides CLI tools for text preprocess pipeline.
User defines a series of preprocessing into a python file as profile.
Example:
cat <your_text.jsonl> | hojichar -p your_preprocessing_profile.py -o your_text_preprocessed.jsonl
hojichar --help
usage: hojichar [-h] --profile <profile.py> [--args ARGS [ARGS ...]] [--output OUTPUT] [--input INPUT] [--dump-stats <path to stats.json>] [--exit-on-error] [--all] [--jobs JOBS] options: -h, --help show this help message and exit --profile <profile.py>, -p <profile.py> Path to a Python file that implements your custom filter. --args ARGS [ARGS ...] Pass additional arguments to the profile. Use it like `--args arg1 arg2` etc. The arguments should be space-separated. --output OUTPUT, -o OUTPUT Specifies the path for the output file. Defaults to standard output. --input INPUT, -i INPUT Specifies the path for the input file. Defaults to standard input. If set this path, the progress bar is enabled. --dump-stats <path to stats.json> Dump statistics to file. If the file exists, it will be appended. --exit-on-error Exit if an exception occurs during filtering. Useful for debugging custom filters. --all A flag that specifies whether to include discarded samples. This is useful when inspecting discarded samples. --jobs JOBS, -j JOBS The number ob parallel jobs. By default, the nuber of the CPU core.
Definition of Profile
- HojiChar CLI receives a series of preprocessing as a profile.
- The preprocessing profile is provided as a Python file. Two patterns of the file are allowed.
- hojichar.utils.load_compose.load_compose() loads these profile.
FILTER
profile
hojichar.Compose
must be defined asFILTER
variable.Example.
import json from hojichar import Compose, Filter from hojichar.filters.document_filters import ExampleHojiChar, JSONLoader class JSONDumper(Filter): def apply(self, document): text = document.text document.text = json.dumps({"text": text}, ensure_ascii=False) return document # FILTER must define Compose object. FILTER = Compose( [ JSONLoader(), ExampleHojiChar(), JSONDumper(), ] )
- Pass the texts to the filter you have defined using a pipe as follows.
cat <your_file> | hojichar -p example_profile.py
- Pass the texts to the filter you have defined using a pipe as follows.
hojichar.utils.load_compose.load_filter_from_file()
loads this type of profile.
FACTORY
profile
- A callable function that returns
hojichar.Compose
must be defined asFACTORY
variable. - The callable can receive arguments. In this way, parameters can be passed to the profile.
- Some kinds of value are not preferred to static. For example, random seeds and some flags modify the behavior of a filter, etc
FACTORY
provides a mechanism to pass those values as arguments to the preprocessing.
Example.
import json from hojichar import Compose, Filter from hojichar.filters.document_filters import JSONLoader class AddSomething(Filter): # Concat some value after every document. def __init__(self, something: str, *args, **kwargs) -> None: self.something = something def apply(self, document): text = document.text + self.something document.text = text return document class JSONDumper(Filter): def apply(self, document): text = document.text document.text = json.dumps({"text": text}, ensure_ascii=False) return document def callback(something): return Compose( [ JSONLoader(), AddSomething(something), JSONDumper(), ] ) # FACTORY must be callable which returns Compose object. FACTORY = callback
Using
FACTORY
profile with arguments in CLI.cat <your_file> | hojichar -p example_profile.py --args arg1 arg2
hojichar.utils.load_compose.load_parametrized_filter_from_file()
orload_factory_from_file
loads this type of profile.
For Developers
Installing from the Source Directory
To install the package, execute the following commands:
git clone https://github.com/HojiChar/HojiChar.git
cd HojiChar
uv sync --all-extras
Testing
Some filters incorporate doctests. You can run these tests with the command:
pytest --doctest-modules .
This command should be executed from the root of the project.
Code style
- HojiChar requires type hints for all code. Type checking is performed in continuous integration (CI) in addition to the pytest tests.
- HojiChar code is subject to inspection and formatting by the
ruff
Linter. For configuration details, please refer topyproject.toml
. You can perform linting and formatting from the root of the project using the following commands:
Linting
uvx ruff check .
Formatting
uvx ruff format .
Building the Documentation
We use Pdoc for building the documentation. You can build the documentation using the following command:
pdoc -o docs hojichar
Run this command from the project root.
In practice, the process of building the documentation is automated by CI. When a Pull Request is merged into the main branch, the documentation is built in the docs/
directory of the docs
branch. This directory is then deployed to the official documentation site by GitHub Pages.
Creating a Source Tarball
To create a source tarball, for instance, for packaging or distribution, run the following command:
uv build
The tarball will be created in the dist directory. This command will compile the source code, and the resulting tarball can be installed with no additional dependencies other than the Python standard library.
Creating a Release and Uploading it to PyPI
Versions uploaded to PyPI are identified by git tags. The __version__
variable in __init__.py
or the version
entry in pyproject.toml
are ignored. The uv-dynamic-versioning
plugin is used to implement this process.
The steps to push to PyPI are as follows, although in actuality, the process is automated by CI when a GitHub release is created from the tag.
git checkout v0.1.2
uv build
uv publish --index testpypi --token ${PYPI_TOKEN}
The actual task for the manager is to apply the appropriate tag to the commit to be released and to create the release from GitHub:
git tag -a v0.1.2 -m "Version 0.1.2"
git push origin v0.1.2
1""" 2.. include:: ../README.md 3""" 4 5from .core.async_composition import AsyncCompose, AsyncFilterAdapter 6from .core.async_filter_interface import AsyncFilter 7from .core.composition import Compose 8from .core.filter_interface import Filter, TokenFilter 9from .core.inspection import StatsContainer 10from .core.models import Document, Token 11from .core.parallel import Parallel 12from .filters import ( 13 deduplication, 14 document_filters, 15 language_identification, 16 token_filters, 17 tokenization, 18) 19 20__version__ = "0.0.0" # Replaced by uv-dynamic-versioning when deploying 21 22__all__ = [ 23 "core", 24 "filters", 25 "utils", 26 "Compose", 27 "Filter", 28 "TokenFilter", 29 "Document", 30 "Token", 31 "Parallel", 32 "StatsContainer", 33 "deduplication", 34 "document_filters", 35 "language_identification", 36 "token_filters", 37 "tokenization", 38 "AsyncCompose", 39 "AsyncFilterAdapter", 40 "AsyncFilter", 41]
15class Compose(Filter): 16 def __init__( 17 self, 18 filters: List[Union[Filter, TokenFilter]], 19 random_state: Optional[Union[int, np.random.Generator]] = None, 20 *args: Any, 21 **kwargs: Any, 22 ) -> None: 23 """ 24 Compose a filter from pre-defined filter-objects. 25 Filter which has `skip_rejected` flag ignores a document which has `is_rejected` flag. 26 By doing so, Compose avoid applying filters that do not affect the output. 27 28 Parameters 29 ---------- 30 filters : List[Union[Filter, TokenFilter]] 31 Filter instances which apply to the corpus. 32 33 random_state : Union[None, int, np.random.Generator], optional 34 Default = None 35 Seed for applying filters randomly. 36 `random_state` must be int or np.random.Generator instance. 37 """ 38 super().__init__(random_state=random_state, *args, **kwargs) 39 self.set_filters(filters) 40 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 41 42 self._statistics.name = "Total" 43 44 def set_filters(self, filters: List[Union[Filter, TokenFilter]]) -> None: 45 """ 46 Set the filter to a Compose object. The filter is expanded if the 47 list of filters in the argument contains a filter bound by Compose. 48 49 Args: 50 filters (List[Union[Filter, TokenFilter]]): Target filters 51 """ 52 self.filters: List[Union[Filter, TokenFilter]] = [] 53 54 filter_idx = 0 55 for f in filters: 56 if isinstance(f, Compose): 57 for sub in f.filters: 58 sub._set_rng_if_not_initialized(self._rng) 59 name = f"{filter_idx}-{sub.__class__.__name__}" 60 sub.name = name 61 sub._statistics.name = name 62 self.filters.append(sub) 63 filter_idx += 1 64 else: 65 f._set_rng_if_not_initialized(self._rng) 66 name = f"{filter_idx}-{f.__class__.__name__}" 67 f.name = name 68 f._statistics.name = name 69 self.filters.append(f) 70 filter_idx += 1 71 72 def __call__(self, text: str, **kwargs: Any) -> str: 73 """ 74 Apply the composed filter to a text and return the processed text. 75 If the document is rejected, return an empty string. 76 """ 77 document = Document(text, **kwargs) 78 document = self.apply(document) 79 if document.is_rejected: 80 return "" 81 else: 82 return document.text 83 84 def apply(self, document: Document) -> Document: 85 """ 86 Apply the composed filter to a document and return the processed document. 87 """ 88 stat = get_doc_info(document) 89 for i, filt in enumerate(self.filters): 90 document = filt._apply(document) 91 new_stat = get_doc_info(document) 92 self._statistics.update_by_diff(stat, new_stat) 93 return document 94 95 def apply_batch(self, batch: Sequence[Document]) -> List[Document]: 96 """ 97 Apply the composed filter to a batch of documents and return the processed documents. 98 The `apply_batch` method implemented in sub-filters is called in order. 99 """ 100 101 stats = [get_doc_info(doc) for doc in batch] 102 for i, filt in enumerate(self.filters): 103 batch = filt._apply_batch(batch) 104 batch = self._finalize_batch(batch, stats) 105 return list(batch) 106 107 def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]: 108 """ 109 Apply the composed filter to a stream of documents and return the processed documents. 110 The `apply_stream` method implemented in sub-filters is called in order. 111 112 113 In a sub-filter, if `apply_batch` is overridden and implemented, you need to set `use_batch` 114 to True at that filter to utilize that implementation. Otherwise, the 115 method implemented in `apply` will be applied to the stream. 116 """ 117 stream = self._count_input_stats(stream) 118 for i, filt in enumerate(self.filters): 119 stream = filt.apply_stream(stream) 120 121 for doc in stream: 122 in_stat = doc.extras["__init_stats"] 123 out_stat = get_doc_info(doc) 124 125 self._statistics.update_by_diff(in_stat, out_stat) 126 del doc.extras["__init_stats"] 127 yield doc 128 129 def _count_input_stats(self, stream: Iterable[Document]) -> Iterable[Document]: 130 for doc in stream: 131 doc.extras["__init_stats"] = get_doc_info(doc) 132 yield doc 133 134 def get_total_statistics(self) -> List[Statistics]: 135 """ 136 Get the statistics of the Compose object and sub filters. 137 138 The statistics of the Compose class are stored in an object with the name "Total", 139 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 140 """ 141 stats = [] 142 stats.append(self.get_statistics()) 143 for i, filt in enumerate(self.filters): 144 stats.append(filt.get_statistics()) 145 return stats 146 147 def get_total_statistics_map(self) -> List[Dict[str, Any]]: 148 """ 149 Get the statistics of the Compose object and sub filters as a list of dictionaries. 150 """ 151 stats = self.get_total_statistics() 152 return [stat.to_dict() for stat in stats] 153 154 def shutdown(self) -> None: 155 for f in self.filters: 156 f.shutdown() 157 158 super().shutdown() 159 160 @property 161 def statistics(self) -> dict: 162 """ 163 Deprecated 164 165 Get the statistics of the Compose object and sub filters. 166 167 This property is retained for compatibility with previous versions. 168 Please use `get_total_statistics` or `get_total_statistics_map` instead. 169 """ 170 return inspection.statistics_obj_adapter( # type: ignore 171 self.get_total_statistics() 172 ).get_human_readable_values() 173 174 @property 175 def statistics_obj(self) -> inspection.StatsContainer: 176 """ 177 Deprecated 178 179 Get the statistics of the Compose object and sub filters. 180 This method returns a StatsContainer object which contains the statistics 181 of the Compose object and sub filters. 182 183 This property is retained for compatibility with previous versions. 184 Please use `get_total_statistics` or `get_total_statistics_map` instead. 185 """ 186 return inspection.statistics_obj_adapter(self.get_total_statistics()) # type: ignore 187 188 @deprecated_since("1.0.0", "get_total_statistics") 189 def summary(self, format: str = "print") -> None: 190 info = [ 191 { 192 "layer": i, 193 "name": filt.name, 194 "doc": filt.__doc__, 195 } 196 for i, filt in enumerate(self.filters) 197 ] 198 199 def to_json(filter_info: dict) -> dict: 200 filter_info["doc"] = "".join(d.strip() for d in filter_info["doc"].split("\n")) 201 return filter_info 202 203 if format == "json": 204 print(json.dumps(list(map(to_json, info)), ensure_ascii=False, indent="\t")) 205 if format == "print": 206 for layer in info: 207 print(f"[{layer['layer']}] {layer['name']}") 208 pprint.pprint(layer["doc"])
Base class for all filters. Document-level filters must inherit from this class.
The definition of text processing is in apply
method.
If you define a new filter, override the method.
When this class is called, apply the filter from string to string.
With context manager, you can use the filter as follows:
with YourFilter(p=0.5) as filt:
text = filt("This is a sample text.")
16 def __init__( 17 self, 18 filters: List[Union[Filter, TokenFilter]], 19 random_state: Optional[Union[int, np.random.Generator]] = None, 20 *args: Any, 21 **kwargs: Any, 22 ) -> None: 23 """ 24 Compose a filter from pre-defined filter-objects. 25 Filter which has `skip_rejected` flag ignores a document which has `is_rejected` flag. 26 By doing so, Compose avoid applying filters that do not affect the output. 27 28 Parameters 29 ---------- 30 filters : List[Union[Filter, TokenFilter]] 31 Filter instances which apply to the corpus. 32 33 random_state : Union[None, int, np.random.Generator], optional 34 Default = None 35 Seed for applying filters randomly. 36 `random_state` must be int or np.random.Generator instance. 37 """ 38 super().__init__(random_state=random_state, *args, **kwargs) 39 self.set_filters(filters) 40 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 41 42 self._statistics.name = "Total"
Compose a filter from pre-defined filter-objects.
Filter which has skip_rejected
flag ignores a document which has is_rejected
flag.
By doing so, Compose avoid applying filters that do not affect the output.
Parameters
filters : List[Union[Filter, TokenFilter]] Filter instances which apply to the corpus.
random_state : Union[None, int, np.random.Generator], optional
Default = None
Seed for applying filters randomly.
random_state
must be int or np.random.Generator instance.
44 def set_filters(self, filters: List[Union[Filter, TokenFilter]]) -> None: 45 """ 46 Set the filter to a Compose object. The filter is expanded if the 47 list of filters in the argument contains a filter bound by Compose. 48 49 Args: 50 filters (List[Union[Filter, TokenFilter]]): Target filters 51 """ 52 self.filters: List[Union[Filter, TokenFilter]] = [] 53 54 filter_idx = 0 55 for f in filters: 56 if isinstance(f, Compose): 57 for sub in f.filters: 58 sub._set_rng_if_not_initialized(self._rng) 59 name = f"{filter_idx}-{sub.__class__.__name__}" 60 sub.name = name 61 sub._statistics.name = name 62 self.filters.append(sub) 63 filter_idx += 1 64 else: 65 f._set_rng_if_not_initialized(self._rng) 66 name = f"{filter_idx}-{f.__class__.__name__}" 67 f.name = name 68 f._statistics.name = name 69 self.filters.append(f) 70 filter_idx += 1
Set the filter to a Compose object. The filter is expanded if the list of filters in the argument contains a filter bound by Compose.
Args: filters (List[Union[Filter, TokenFilter]]): Target filters
84 def apply(self, document: Document) -> Document: 85 """ 86 Apply the composed filter to a document and return the processed document. 87 """ 88 stat = get_doc_info(document) 89 for i, filt in enumerate(self.filters): 90 document = filt._apply(document) 91 new_stat = get_doc_info(document) 92 self._statistics.update_by_diff(stat, new_stat) 93 return document
Apply the composed filter to a document and return the processed document.
95 def apply_batch(self, batch: Sequence[Document]) -> List[Document]: 96 """ 97 Apply the composed filter to a batch of documents and return the processed documents. 98 The `apply_batch` method implemented in sub-filters is called in order. 99 """ 100 101 stats = [get_doc_info(doc) for doc in batch] 102 for i, filt in enumerate(self.filters): 103 batch = filt._apply_batch(batch) 104 batch = self._finalize_batch(batch, stats) 105 return list(batch)
Apply the composed filter to a batch of documents and return the processed documents.
The apply_batch
method implemented in sub-filters is called in order.
107 def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]: 108 """ 109 Apply the composed filter to a stream of documents and return the processed documents. 110 The `apply_stream` method implemented in sub-filters is called in order. 111 112 113 In a sub-filter, if `apply_batch` is overridden and implemented, you need to set `use_batch` 114 to True at that filter to utilize that implementation. Otherwise, the 115 method implemented in `apply` will be applied to the stream. 116 """ 117 stream = self._count_input_stats(stream) 118 for i, filt in enumerate(self.filters): 119 stream = filt.apply_stream(stream) 120 121 for doc in stream: 122 in_stat = doc.extras["__init_stats"] 123 out_stat = get_doc_info(doc) 124 125 self._statistics.update_by_diff(in_stat, out_stat) 126 del doc.extras["__init_stats"] 127 yield doc
Apply the composed filter to a stream of documents and return the processed documents.
The apply_stream
method implemented in sub-filters is called in order.
In a sub-filter, if apply_batch
is overridden and implemented, you need to set use_batch
to True at that filter to utilize that implementation. Otherwise, the
method implemented in apply
will be applied to the stream.
134 def get_total_statistics(self) -> List[Statistics]: 135 """ 136 Get the statistics of the Compose object and sub filters. 137 138 The statistics of the Compose class are stored in an object with the name "Total", 139 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 140 """ 141 stats = [] 142 stats.append(self.get_statistics()) 143 for i, filt in enumerate(self.filters): 144 stats.append(filt.get_statistics()) 145 return stats
Get the statistics of the Compose object and sub filters.
The statistics of the Compose class are stored in an object with the name "Total", and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
147 def get_total_statistics_map(self) -> List[Dict[str, Any]]: 148 """ 149 Get the statistics of the Compose object and sub filters as a list of dictionaries. 150 """ 151 stats = self.get_total_statistics() 152 return [stat.to_dict() for stat in stats]
Get the statistics of the Compose object and sub filters as a list of dictionaries.
154 def shutdown(self) -> None: 155 for f in self.filters: 156 f.shutdown() 157 158 super().shutdown()
This method is called when the filter is no longer needed. You can override this method to release resources or perform cleanup tasks.
Deprecated
Get the statistics of the Compose object and sub filters.
This property is retained for compatibility with previous versions.
Please use get_total_statistics
or get_total_statistics_map
instead.
Deprecated
Get the statistics of the Compose object and sub filters. This method returns a StatsContainer object which contains the statistics of the Compose object and sub filters.
This property is retained for compatibility with previous versions.
Please use get_total_statistics
or get_total_statistics_map
instead.
188 @deprecated_since("1.0.0", "get_total_statistics") 189 def summary(self, format: str = "print") -> None: 190 info = [ 191 { 192 "layer": i, 193 "name": filt.name, 194 "doc": filt.__doc__, 195 } 196 for i, filt in enumerate(self.filters) 197 ] 198 199 def to_json(filter_info: dict) -> dict: 200 filter_info["doc"] = "".join(d.strip() for d in filter_info["doc"].split("\n")) 201 return filter_info 202 203 if format == "json": 204 print(json.dumps(list(map(to_json, info)), ensure_ascii=False, indent="\t")) 205 if format == "print": 206 for layer in info: 207 print(f"[{layer['layer']}] {layer['name']}") 208 pprint.pprint(layer["doc"])
Inherited Members
22class Filter(ABC): 23 """ 24 Base class for all filters. 25 Document-level filters must inherit from this class. 26 27 The definition of text processing is in `apply` method. 28 If you define a new filter, override the method. 29 30 When this class is called, apply the filter from string to string. 31 32 With context manager, you can use the filter as follows: 33 ```python 34 with YourFilter(p=0.5) as filt: 35 text = filt("This is a sample text.") 36 ``` 37 38 """ 39 40 def __init__( 41 self, 42 p: float = 1.0, 43 skip_rejected: bool = True, 44 *args: Any, 45 random_state: Optional[Union[int, np.random.Generator]] = None, 46 use_batch: bool = False, 47 batch_size: int = 128, 48 **kwargs: Any, 49 ) -> None: 50 """ 51 Initialize the filter. 52 Parameters 53 ---------- 54 p : float 55 The probability of applying the filter. 56 If `p` is 1, the filter will always be applied. 57 skip_rejected : bool 58 If `True`, the filter will skip documents that are already rejected. 59 If you want to apply the filter to all documents (e.g., postprocess), set this to `False`. 60 random_state : Optional[Union[int, np.random.Generator]] 61 Seed for the random number generator. 62 If `None`, a new random number generator will be created. 63 If `None`, and use in the `Compose` class, the random state is shared with the `Compose` object. 64 use_batch : bool 65 If `True`, the filter will process documents in batches in the `apply_stream` method. 66 batch_size : int 67 The size of the batch to process documents in the `apply_stream` method. 68 kwargs : Any 69 Additional keyword arguments to pass to the filter. 70 """ 71 self.name = self.__class__.__name__ 72 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 73 assert 0 <= p <= 1 74 self.p = p 75 self.__init_rng(random_state) 76 self.skip_rejected = skip_rejected 77 self.use_batch = use_batch 78 self.batch_size = batch_size 79 80 self._statistics: Statistics = Statistics() 81 82 @abstractmethod 83 def apply(self, document: Document) -> Document: 84 """ 85 Definition of filter behavior. 86 87 The document must have a protocol `TextContent`, 88 and mostly used hojichar.Document class. 89 90 In this method, the filter will modify `document.text` or 91 `document.extras` and set `document.is_rejected = True` to discard the document. 92 93 Parameters 94 ---------- 95 document : Document 96 Input document 97 98 Returns 99 ------- 100 Document 101 Processed Document 102 """ 103 104 @deprecated_since(version="1.0.0", alternative="apply") 105 def apply_filter(self, document: Document) -> Document: 106 document = self.apply(document) 107 return document 108 109 def _check_skip(self, document: Document) -> bool: 110 """ 111 Check if the document should be skipped by this filter. 112 If `skip_rejected` is set to `True`, this method will return `True` 113 if the document is already rejected. 114 If `p` is less than 1, this method will return `True` with a probability of `1 - p`. 115 """ 116 skip = self.skip_rejected and document.is_rejected 117 if skip: 118 return True 119 if self.p < 1: 120 if self._rng.random() > self.p: 121 return True 122 return False 123 124 def _apply(self, document: Document) -> Document: 125 """ 126 Apply the filter to a single document. 127 This method 128 - checks if the document should be skipped 129 - counts and logging the statistics 130 - logging the reason for rejection if the document is rejected 131 132 This method may be used in `apply` method of `Compose` class. 133 """ 134 135 stats = get_doc_info(document) 136 137 if not self._check_skip(document): 138 document = self.apply(document) 139 140 new_stats = get_doc_info(document) 141 self._statistics.update_by_diff(stats, new_stats) 142 143 if not stats["is_rejected"] and new_stats["is_rejected"]: 144 document.reject_reason = self.get_jsonable_vars() 145 146 return document 147 148 def apply_batch(self, batch: Sequence[Document]) -> List[Document]: 149 """ 150 Apply the filter to a batch of documents. 151 You can override this method if you want to 152 apply the filter to a batch of documents at once. 153 154 This method may be used in `apply_batch` method of `Compose` class. 155 156 Parameters 157 ---------- 158 documents : Sequence[Document] 159 List-like object of input documents 160 161 Returns 162 ------- 163 list[Document] 164 List of processed documents 165 """ 166 return [self.apply(document) for document in batch] 167 168 def _apply_batch(self, batch: Sequence[Document]) -> List[Document]: 169 """ 170 Apply the filter to a batch of documents. 171 This method 172 - checks if the documents should be skipped 173 - counts and logs the statistics 174 - logs the reason for rejection if any document is rejected 175 """ 176 skip = False 177 if self.p < 1: 178 skip = self._rng.random() > self.p 179 180 stats = [get_doc_info(document=doc) for doc in batch] 181 if not skip: 182 batch = self.apply_batch(batch) 183 batch = self._finalize_batch(batch, stats) 184 return batch 185 186 def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]: 187 """ 188 Apply the filter to a stream of documents. 189 This method is used when you want to process documents one by one. 190 If `use_batch` is set to `True` in the constructor, 191 this method will process documents in batches using the `apply_batch` method. 192 193 Even if an exception occurs during processing, the process will continue, and the following actions will be taken: 194 - Set the `is_rejected` flag of the document to `True` 195 - Set the error details in `reject_reason` 196 - Increment the `errors` count in the statistics retrievable via `get_statistics` 197 198 Parameters 199 ---------- 200 stream : Iterable[Document] 201 Stream of input documents 202 203 Returns 204 ------- 205 Iterable[Document] 206 Stream of processed documents 207 """ 208 209 if not self.use_batch: 210 for document in stream: 211 yield self._try_process(document, self._apply) 212 else: 213 batch: list[Document] = [] 214 for document in stream: 215 if self._check_skip(document): 216 yield document 217 continue 218 219 batch.append(document) 220 if len(batch) >= self.batch_size: 221 stats = [get_doc_info(doc) for doc in batch] 222 batch = self._try_process(batch, self.apply_batch) 223 batch = self._finalize_batch(batch, stats) 224 yield from batch 225 batch.clear() 226 if batch: 227 stats = [get_doc_info(doc) for doc in batch] 228 batch = self._try_process(batch, self.apply_batch) 229 batch = self._finalize_batch(batch, stats) 230 yield from batch 231 232 def _try_process(self, target: T, func: Callable[[T], T]) -> T: 233 try: 234 return func(target) 235 except Exception as e: 236 if isinstance(target, Document): 237 msg = f"{e!r} occurs while processing {self.name} with {target!r}" 238 target.is_rejected = True 239 target.reject_reason = {"error": msg} 240 self._statistics.errors += 1 241 self.logger.error(msg, exc_info=True) 242 return target # type: ignore[return-value] 243 if isinstance(target, list): 244 msg = f"{e!r} occurs while batch processing {self.name}" 245 self.logger.error(msg, exc_info=True) 246 for doc in target: 247 doc.is_rejected = True 248 doc.reject_reason = {"error": msg} 249 self._statistics.errors += len(target) 250 return target # type: ignore[return-value] 251 else: 252 raise e 253 254 def __call__(self, text: str, **kwargs: Any) -> str: 255 document = Document(text, **kwargs) 256 document = self._apply(document) 257 return document.text 258 259 def get_statistics(self) -> Statistics: 260 """ 261 Get the statistics of this filter. 262 This method returns the statistics of the filter, 263 which includes the number of processed documents, discarded documents, and other statistics. 264 """ 265 return self._statistics 266 267 def get_statistics_map(self) -> Dict[str, Any]: 268 """ 269 Get the statistics of this filter as a dictionary. 270 """ 271 return self._statistics.to_dict() 272 273 def shutdown(self) -> None: 274 """ 275 This method is called when the filter is no longer needed. 276 You can override this method to release resources or perform cleanup tasks. 277 """ 278 pass 279 280 def __enter__(self) -> "Filter": 281 return self 282 283 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 284 """ 285 This method is called when the filter is used in a context manager. 286 It calls the `shutdown` method to release resources or perform cleanup tasks. 287 """ 288 self.shutdown() 289 290 def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> Dict[str, Any]: 291 """ 292 Get the member variable of this filter. 293 Eligible variables are primitive types; [bool, int, float, str, None], 294 and the name of the variable not starts with the underscore; `_`. 295 """ 296 if exclude_keys is None: 297 exclude_keys = set() 298 return { 299 k: v 300 for k, v in vars(self).items() 301 if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_"))) 302 } 303 304 def _finalize_batch( 305 self: "Filter", 306 batch: Sequence[Document], 307 old_stats: List[Dict[str, Any]] = [], 308 ) -> List[Document]: 309 new_stats = [get_doc_info(doc) for doc in batch] 310 for old, new, doc in zip(old_stats, new_stats, batch): 311 self._statistics.update_by_diff(old, new) 312 if not old["is_rejected"] and new["is_rejected"]: 313 doc.reject_reason = self.get_jsonable_vars() 314 return list(batch) 315 316 def __init_rng(self, random_state: Optional[Union[int, np.random.Generator]]) -> None: 317 self._owns_rng = True 318 if random_state is None: 319 self._rng = np.random.default_rng() 320 self._owns_rng = False 321 elif isinstance(random_state, int): 322 self._rng = np.random.default_rng(random_state) 323 elif isinstance(random_state, np.random.Generator): 324 self._rng = random_state 325 326 def _set_rng_if_not_initialized(self, rng: np.random.Generator) -> None: 327 """ 328 Set the random number generator for this filter if it is not already initialized. 329 This method is called by Compose class. 330 """ 331 if not self._owns_rng: 332 self._rng = rng
Base class for all filters. Document-level filters must inherit from this class.
The definition of text processing is in apply
method.
If you define a new filter, override the method.
When this class is called, apply the filter from string to string.
With context manager, you can use the filter as follows:
with YourFilter(p=0.5) as filt:
text = filt("This is a sample text.")
40 def __init__( 41 self, 42 p: float = 1.0, 43 skip_rejected: bool = True, 44 *args: Any, 45 random_state: Optional[Union[int, np.random.Generator]] = None, 46 use_batch: bool = False, 47 batch_size: int = 128, 48 **kwargs: Any, 49 ) -> None: 50 """ 51 Initialize the filter. 52 Parameters 53 ---------- 54 p : float 55 The probability of applying the filter. 56 If `p` is 1, the filter will always be applied. 57 skip_rejected : bool 58 If `True`, the filter will skip documents that are already rejected. 59 If you want to apply the filter to all documents (e.g., postprocess), set this to `False`. 60 random_state : Optional[Union[int, np.random.Generator]] 61 Seed for the random number generator. 62 If `None`, a new random number generator will be created. 63 If `None`, and use in the `Compose` class, the random state is shared with the `Compose` object. 64 use_batch : bool 65 If `True`, the filter will process documents in batches in the `apply_stream` method. 66 batch_size : int 67 The size of the batch to process documents in the `apply_stream` method. 68 kwargs : Any 69 Additional keyword arguments to pass to the filter. 70 """ 71 self.name = self.__class__.__name__ 72 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 73 assert 0 <= p <= 1 74 self.p = p 75 self.__init_rng(random_state) 76 self.skip_rejected = skip_rejected 77 self.use_batch = use_batch 78 self.batch_size = batch_size 79 80 self._statistics: Statistics = Statistics()
Initialize the filter.
Parameters
p : float
The probability of applying the filter.
If p
is 1, the filter will always be applied.
skip_rejected : bool
If True
, the filter will skip documents that are already rejected.
If you want to apply the filter to all documents (e.g., postprocess), set this to False
.
random_state : Optional[Union[int, np.random.Generator]]
Seed for the random number generator.
If None
, a new random number generator will be created.
If None
, and use in the Compose
class, the random state is shared with the Compose
object.
use_batch : bool
If True
, the filter will process documents in batches in the apply_stream
method.
batch_size : int
The size of the batch to process documents in the apply_stream
method.
kwargs : Any
Additional keyword arguments to pass to the filter.
82 @abstractmethod 83 def apply(self, document: Document) -> Document: 84 """ 85 Definition of filter behavior. 86 87 The document must have a protocol `TextContent`, 88 and mostly used hojichar.Document class. 89 90 In this method, the filter will modify `document.text` or 91 `document.extras` and set `document.is_rejected = True` to discard the document. 92 93 Parameters 94 ---------- 95 document : Document 96 Input document 97 98 Returns 99 ------- 100 Document 101 Processed Document 102 """
Definition of filter behavior.
The document must have a protocol TextContent
,
and mostly used hojichar.Document class.
In this method, the filter will modify document.text
or
document.extras
and set document.is_rejected = True
to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
148 def apply_batch(self, batch: Sequence[Document]) -> List[Document]: 149 """ 150 Apply the filter to a batch of documents. 151 You can override this method if you want to 152 apply the filter to a batch of documents at once. 153 154 This method may be used in `apply_batch` method of `Compose` class. 155 156 Parameters 157 ---------- 158 documents : Sequence[Document] 159 List-like object of input documents 160 161 Returns 162 ------- 163 list[Document] 164 List of processed documents 165 """ 166 return [self.apply(document) for document in batch]
Apply the filter to a batch of documents. You can override this method if you want to apply the filter to a batch of documents at once.
This method may be used in apply_batch
method of Compose
class.
Parameters
documents : Sequence[Document] List-like object of input documents
Returns
list[Document] List of processed documents
186 def apply_stream(self, stream: Iterable[Document]) -> Iterable[Document]: 187 """ 188 Apply the filter to a stream of documents. 189 This method is used when you want to process documents one by one. 190 If `use_batch` is set to `True` in the constructor, 191 this method will process documents in batches using the `apply_batch` method. 192 193 Even if an exception occurs during processing, the process will continue, and the following actions will be taken: 194 - Set the `is_rejected` flag of the document to `True` 195 - Set the error details in `reject_reason` 196 - Increment the `errors` count in the statistics retrievable via `get_statistics` 197 198 Parameters 199 ---------- 200 stream : Iterable[Document] 201 Stream of input documents 202 203 Returns 204 ------- 205 Iterable[Document] 206 Stream of processed documents 207 """ 208 209 if not self.use_batch: 210 for document in stream: 211 yield self._try_process(document, self._apply) 212 else: 213 batch: list[Document] = [] 214 for document in stream: 215 if self._check_skip(document): 216 yield document 217 continue 218 219 batch.append(document) 220 if len(batch) >= self.batch_size: 221 stats = [get_doc_info(doc) for doc in batch] 222 batch = self._try_process(batch, self.apply_batch) 223 batch = self._finalize_batch(batch, stats) 224 yield from batch 225 batch.clear() 226 if batch: 227 stats = [get_doc_info(doc) for doc in batch] 228 batch = self._try_process(batch, self.apply_batch) 229 batch = self._finalize_batch(batch, stats) 230 yield from batch
Apply the filter to a stream of documents.
This method is used when you want to process documents one by one.
If use_batch
is set to True
in the constructor,
this method will process documents in batches using the apply_batch
method.
Even if an exception occurs during processing, the process will continue, and the following actions will be taken:
- Set the
is_rejected
flag of the document toTrue
- Set the error details in
reject_reason
- Increment the
errors
count in the statistics retrievable viaget_statistics
Parameters
stream : Iterable[Document] Stream of input documents
Returns
Iterable[Document] Stream of processed documents
259 def get_statistics(self) -> Statistics: 260 """ 261 Get the statistics of this filter. 262 This method returns the statistics of the filter, 263 which includes the number of processed documents, discarded documents, and other statistics. 264 """ 265 return self._statistics
Get the statistics of this filter. This method returns the statistics of the filter, which includes the number of processed documents, discarded documents, and other statistics.
267 def get_statistics_map(self) -> Dict[str, Any]: 268 """ 269 Get the statistics of this filter as a dictionary. 270 """ 271 return self._statistics.to_dict()
Get the statistics of this filter as a dictionary.
273 def shutdown(self) -> None: 274 """ 275 This method is called when the filter is no longer needed. 276 You can override this method to release resources or perform cleanup tasks. 277 """ 278 pass
This method is called when the filter is no longer needed. You can override this method to release resources or perform cleanup tasks.
290 def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> Dict[str, Any]: 291 """ 292 Get the member variable of this filter. 293 Eligible variables are primitive types; [bool, int, float, str, None], 294 and the name of the variable not starts with the underscore; `_`. 295 """ 296 if exclude_keys is None: 297 exclude_keys = set() 298 return { 299 k: v 300 for k, v in vars(self).items() 301 if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_"))) 302 }
Get the member variable of this filter.
Eligible variables are primitive types; [bool, int, float, str, None],
and the name of the variable not starts with the underscore; _
.
335@deprecated_since(version="1.0.0", alternative="Filter") 336class TokenFilter(Filter, ABC): 337 """ 338 Base class for token-level filters. 339 340 Token filters, which shuld be implemented in hojichar/filters/token_filters.py, 341 must inherit from this class. 342 """ 343 344 def __init__( 345 self, p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any 346 ) -> None: 347 self.name = self.__class__.__name__ 348 self.logger = logging.getLogger("hojichar.token_filters." + self.name) 349 assert 0 <= p <= 1 350 self.p = p 351 self.skip_rejected = skip_rejected 352 353 def apply(self, token: Token) -> Token: # type: ignore 354 raise NotImplementedError(f"{self.__class__.__name__}.apply method is not defined") 355 return token 356 357 def apply_filter(self, document: Document) -> Document: 358 document.tokens = [self.apply(token) for token in document.tokens if not token.is_rejected] 359 return document 360 361 def __call__(self, text: str) -> str: # type: ignore 362 token = Token(text) 363 token = self.apply(token) 364 return token.text 365 366 def _apply(self, document: Document) -> Document: 367 """ 368 Apply the token filter to a single document. 369 This method checks if the document should be skipped. 370 """ 371 if self.skip_rejected and document.is_rejected: 372 return document 373 return self.apply_filter(document) 374 375 def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> dict: 376 """ 377 Get the member variable of this filter. 378 Eligible variables are primitive types; [bool, int, float, str, None], 379 and the name of the variable not starts with the underscore; `_`. 380 """ 381 if exclude_keys is None: 382 exclude_keys = set() 383 return { 384 k: v 385 for k, v in vars(self).items() 386 if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_"))) 387 }
Base class for token-level filters.
Token filters, which shuld be implemented in hojichar/filters/token_filters.py, must inherit from this class.
344 def __init__( 345 self, p: float = 1, skip_rejected: bool = True, *args: Any, **kwargs: Any 346 ) -> None: 347 self.name = self.__class__.__name__ 348 self.logger = logging.getLogger("hojichar.token_filters." + self.name) 349 assert 0 <= p <= 1 350 self.p = p 351 self.skip_rejected = skip_rejected
Initialize the filter.
Parameters
p : float
The probability of applying the filter.
If p
is 1, the filter will always be applied.
skip_rejected : bool
If True
, the filter will skip documents that are already rejected.
If you want to apply the filter to all documents (e.g., postprocess), set this to False
.
random_state : Optional[Union[int, np.random.Generator]]
Seed for the random number generator.
If None
, a new random number generator will be created.
If None
, and use in the Compose
class, the random state is shared with the Compose
object.
use_batch : bool
If True
, the filter will process documents in batches in the apply_stream
method.
batch_size : int
The size of the batch to process documents in the apply_stream
method.
kwargs : Any
Additional keyword arguments to pass to the filter.
353 def apply(self, token: Token) -> Token: # type: ignore 354 raise NotImplementedError(f"{self.__class__.__name__}.apply method is not defined") 355 return token
Definition of filter behavior.
The document must have a protocol TextContent
,
and mostly used hojichar.Document class.
In this method, the filter will modify document.text
or
document.extras
and set document.is_rejected = True
to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
375 def get_jsonable_vars(self, exclude_keys: Optional[Set[str]] = None) -> dict: 376 """ 377 Get the member variable of this filter. 378 Eligible variables are primitive types; [bool, int, float, str, None], 379 and the name of the variable not starts with the underscore; `_`. 380 """ 381 if exclude_keys is None: 382 exclude_keys = set() 383 return { 384 k: v 385 for k, v in vars(self).items() 386 if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_"))) 387 }
Get the member variable of this filter.
Eligible variables are primitive types; [bool, int, float, str, None],
and the name of the variable not starts with the underscore; _
.
Inherited Members
24class Document: 25 """ 26 Document class represents a text document with metadata. 27 It contains the text of the document, a flag indicating whether it is rejected, 28 and additional metadata stored in the `extras` dictionary. 29 30 The `tokens` attribute will be deprecated in future versions, 31 and users are encouraged to use the `extras` dictionary to store token-related information. 32 33 Attributes: 34 text (str): The text content of the document. 35 is_rejected (bool): A flag indicating whether the document is rejected. 36 extras (Dict[str, Any]): A dictionary to store additional metadata about the document. 37 reject_reason (Dict[str, Any]): A dictionary to store the reason for rejection. The 38 filter class and the member name and value will logged at the filter is logged here. 39 40 Next attributes will be deprecated in future versions: 41 dedup_lsh (List[str]): A list for deduplication using Locality Sensitive Hashing (LSH). 42 tokens (List[Token]): A list of tokens extracted from the document. 43 """ 44 45 def __init__( 46 self, 47 text: str, 48 is_rejected: bool = False, 49 tokens: Optional[List[Token]] = None, 50 extras: Optional[Dict[str, Any]] = None, 51 ) -> None: 52 self.text = text 53 self.__original = text 54 self.is_rejected = is_rejected 55 if tokens is None: 56 self.tokens: List[Token] = [] 57 else: 58 self.tokens = tokens 59 60 if extras is None: 61 self.extras: Dict[str, Any] = {} 62 else: 63 self.extras = extras 64 65 self.dedup_lsh: List[str] = [] 66 self.reject_reason: Dict[str, Any] = {} 67 68 @property 69 def original(self) -> str: 70 return self.__original 71 72 @deprecated_since("1.0.0") 73 def set_tokens(self, tokens: List[str]) -> None: 74 self.tokens = [Token(token) for token in tokens] 75 76 @deprecated_since("1.0.0") 77 def get_tokens(self) -> List[str]: 78 return [token.text for token in self.tokens] 79 80 def __str__(self) -> str: 81 return self.text 82 83 def __repr__(self) -> str: 84 return ( 85 f"Document(text={self.text!r}, is_rejected={self.is_rejected}, extras={self.extras})" # noqa 86 )
Document class represents a text document with metadata.
It contains the text of the document, a flag indicating whether it is rejected,
and additional metadata stored in the extras
dictionary.
The tokens
attribute will be deprecated in future versions,
and users are encouraged to use the extras
dictionary to store token-related information.
Attributes: text (str): The text content of the document. is_rejected (bool): A flag indicating whether the document is rejected. extras (Dict[str, Any]): A dictionary to store additional metadata about the document. reject_reason (Dict[str, Any]): A dictionary to store the reason for rejection. The filter class and the member name and value will logged at the filter is logged here.
Next attributes will be deprecated in future versions: dedup_lsh (List[str]): A list for deduplication using Locality Sensitive Hashing (LSH). tokens (List[Token]): A list of tokens extracted from the document.
45 def __init__( 46 self, 47 text: str, 48 is_rejected: bool = False, 49 tokens: Optional[List[Token]] = None, 50 extras: Optional[Dict[str, Any]] = None, 51 ) -> None: 52 self.text = text 53 self.__original = text 54 self.is_rejected = is_rejected 55 if tokens is None: 56 self.tokens: List[Token] = [] 57 else: 58 self.tokens = tokens 59 60 if extras is None: 61 self.extras: Dict[str, Any] = {} 62 else: 63 self.extras = extras 64 65 self.dedup_lsh: List[str] = [] 66 self.reject_reason: Dict[str, Any] = {}
9@deprecated_since("0.1.0", "Document") 10class Token: 11 def __init__(self, text: str, is_rejected: bool = False) -> None: 12 self.text = text 13 self.__original = text 14 self.is_rejected = is_rejected 15 16 @property 17 def original(self) -> str: 18 return self.__original 19 20 def __str__(self) -> str: 21 return self.text
48class Parallel: 49 """ 50 The Parallel class provides a way to apply a hojichar.Compose filter 51 to an iterator of documents in a parallel manner using a specified 52 number of worker processes. This class should be used as a context 53 manager with a 'with' statement. 54 55 Example: 56 57 doc_iter = (hojichar.Document(d) for d in open("my_text.txt")) 58 with Parallel(my_filter, num_jobs=8) as pfilter: 59 for doc in pfilter.imap_apply(doc_iter): 60 pass # Process the filtered document as needed. 61 """ 62 63 def __init__( 64 self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False 65 ): 66 """ 67 Initializes a new instance of the Parallel class. 68 69 Args: 70 filter (hojichar.Compose): A composed filter object that specifies the 71 processing operations to apply to each document in parallel. 72 A copy of the filter is made within a 'with' statement. When the 'with' 73 block terminates,the statistical information obtained through `filter.statistics` 74 or`filter.statistics_obj` is replaced with the total value of the statistical 75 information processed within the 'with' block. 76 77 num_jobs (int | None, optional): The number of worker processes to use. 78 If None, then the number returned by os.cpu_count() is used. Defaults to None. 79 ignore_errors (bool, optional): If set to True, any exceptions thrown during 80 the processing of a document will be caught and logged, but will not 81 stop the processing of further documents. If set to False, the first 82 exception thrown will terminate the entire parallel processing operation. 83 Defaults to False. 84 """ 85 self.filter = filter 86 self.num_jobs = num_jobs 87 self.ignore_errors = ignore_errors 88 89 self._pool: Pool | None = None 90 self._pid_stats: dict[int, List[Statistics]] | None = None 91 92 def __enter__(self) -> Parallel: 93 self._pool = Pool( 94 processes=self.num_jobs, 95 initializer=_init_worker, 96 initargs=(self.filter, self.ignore_errors), 97 ) 98 self._pid_stats = dict() 99 return self 100 101 def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]: 102 """ 103 Takes an iterator of Documents and applies the Compose filter to 104 each Document in a parallel manner. This is a generator method 105 that yields processed Documents. 106 107 Args: 108 docs (Iterator[hojichar.Document]): An iterator of Documents to be processed. 109 110 Raises: 111 RuntimeError: If the Parallel instance is not properly initialized. This 112 generally happens when the method is called outside of a 'with' statement. 113 Exception: If any exceptions are raised within the worker processes. 114 115 Yields: 116 Iterator[hojichar.Document]: An iterator that yields processed Documents. 117 """ 118 if self._pool is None or self._pid_stats is None: 119 raise RuntimeError( 120 "Parallel instance not properly initialized. Use within a 'with' statement." 121 ) 122 try: 123 for doc, pid, stat, err_msg in self._pool.imap_unordered(_worker, docs): 124 self._pid_stats[pid] = stat 125 if err_msg is not None: 126 logger.error(f"Error in worker {pid}: {err_msg}") 127 yield doc 128 except Exception: 129 self.__exit__(None, None, None) 130 raise 131 132 def __exit__(self, exc_type, exc_value, traceback) -> None: # type: ignore 133 if self._pool: 134 self._pool.terminate() 135 self._pool.join() 136 if self._pid_stats: 137 total_stats = functools.reduce( 138 lambda x, y: Statistics.add_list_of_stats(x, y), self._pid_stats.values() 139 ) 140 self.filter._statistics.update(Statistics.get_filter("Total", total_stats)) 141 for stat in total_stats: 142 for filt in self.filter.filters: 143 if stat.name == filt.name: 144 filt._statistics.update(stat) 145 break 146 147 def get_total_statistics(self) -> List[Statistics]: 148 """ 149 Returns a statistics object of the total statistical 150 values processed within the Parallel block. 151 152 Returns: 153 StatsContainer: Statistics object 154 """ 155 if self._pid_stats: 156 total_stats = functools.reduce( 157 lambda x, y: Statistics.add_list_of_stats(x, y), self._pid_stats.values() 158 ) 159 return total_stats 160 else: 161 return [] 162 163 def get_total_statistics_map(self) -> List[dict]: 164 return [stat.to_dict() for stat in self.get_total_statistics()] 165 166 @property 167 def statistics_obj(self) -> inspection.StatsContainer: 168 """ 169 Returns the statistics object of the Parallel instance. 170 This is a StatsContainer object which contains the statistics 171 of the Parallel instance and sub filters. 172 173 Returns: 174 StatsContainer: Statistics object 175 """ 176 return inspection.statistics_obj_adapter(self.get_total_statistics()) # type: ignore
The Parallel class provides a way to apply a hojichar.Compose filter to an iterator of documents in a parallel manner using a specified number of worker processes. This class should be used as a context manager with a 'with' statement.
Example:
doc_iter = (hojichar.Document(d) for d in open("my_text.txt")) with Parallel(my_filter, num_jobs=8) as pfilter: for doc in pfilter.imap_apply(doc_iter): pass # Process the filtered document as needed.
63 def __init__( 64 self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False 65 ): 66 """ 67 Initializes a new instance of the Parallel class. 68 69 Args: 70 filter (hojichar.Compose): A composed filter object that specifies the 71 processing operations to apply to each document in parallel. 72 A copy of the filter is made within a 'with' statement. When the 'with' 73 block terminates,the statistical information obtained through `filter.statistics` 74 or`filter.statistics_obj` is replaced with the total value of the statistical 75 information processed within the 'with' block. 76 77 num_jobs (int | None, optional): The number of worker processes to use. 78 If None, then the number returned by os.cpu_count() is used. Defaults to None. 79 ignore_errors (bool, optional): If set to True, any exceptions thrown during 80 the processing of a document will be caught and logged, but will not 81 stop the processing of further documents. If set to False, the first 82 exception thrown will terminate the entire parallel processing operation. 83 Defaults to False. 84 """ 85 self.filter = filter 86 self.num_jobs = num_jobs 87 self.ignore_errors = ignore_errors 88 89 self._pool: Pool | None = None 90 self._pid_stats: dict[int, List[Statistics]] | None = None
Initializes a new instance of the Parallel class.
Args:
filter (hojichar.Compose): A composed filter object that specifies the
processing operations to apply to each document in parallel.
A copy of the filter is made within a 'with' statement. When the 'with'
block terminates,the statistical information obtained through filter.statistics
orfilter.statistics_obj
is replaced with the total value of the statistical
information processed within the 'with' block.
num_jobs (int | None, optional): The number of worker processes to use.
If None, then the number returned by os.cpu_count() is used. Defaults to None.
ignore_errors (bool, optional): If set to True, any exceptions thrown during
the processing of a document will be caught and logged, but will not
stop the processing of further documents. If set to False, the first
exception thrown will terminate the entire parallel processing operation.
Defaults to False.
101 def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]: 102 """ 103 Takes an iterator of Documents and applies the Compose filter to 104 each Document in a parallel manner. This is a generator method 105 that yields processed Documents. 106 107 Args: 108 docs (Iterator[hojichar.Document]): An iterator of Documents to be processed. 109 110 Raises: 111 RuntimeError: If the Parallel instance is not properly initialized. This 112 generally happens when the method is called outside of a 'with' statement. 113 Exception: If any exceptions are raised within the worker processes. 114 115 Yields: 116 Iterator[hojichar.Document]: An iterator that yields processed Documents. 117 """ 118 if self._pool is None or self._pid_stats is None: 119 raise RuntimeError( 120 "Parallel instance not properly initialized. Use within a 'with' statement." 121 ) 122 try: 123 for doc, pid, stat, err_msg in self._pool.imap_unordered(_worker, docs): 124 self._pid_stats[pid] = stat 125 if err_msg is not None: 126 logger.error(f"Error in worker {pid}: {err_msg}") 127 yield doc 128 except Exception: 129 self.__exit__(None, None, None) 130 raise
Takes an iterator of Documents and applies the Compose filter to each Document in a parallel manner. This is a generator method that yields processed Documents.
Args: docs (Iterator[hojichar.Document]): An iterator of Documents to be processed.
Raises: RuntimeError: If the Parallel instance is not properly initialized. This generally happens when the method is called outside of a 'with' statement. Exception: If any exceptions are raised within the worker processes.
Yields: Iterator[hojichar.Document]: An iterator that yields processed Documents.
147 def get_total_statistics(self) -> List[Statistics]: 148 """ 149 Returns a statistics object of the total statistical 150 values processed within the Parallel block. 151 152 Returns: 153 StatsContainer: Statistics object 154 """ 155 if self._pid_stats: 156 total_stats = functools.reduce( 157 lambda x, y: Statistics.add_list_of_stats(x, y), self._pid_stats.values() 158 ) 159 return total_stats 160 else: 161 return []
Returns a statistics object of the total statistical values processed within the Parallel block.
Returns: StatsContainer: Statistics object
Returns the statistics object of the Parallel instance. This is a StatsContainer object which contains the statistics of the Parallel instance and sub filters.
Returns: StatsContainer: Statistics object
116@dataclasses.dataclass 117class StatsContainer: 118 total_info: DocStatistics 119 layers_info: Dict[str, FilterStatistics] # Key of the dict is filter name. 120 121 def __add__(self, other: StatsContainer) -> StatsContainer: 122 assert self.layers_info.keys() == other.layers_info.keys(), "Layer names must match" 123 return StatsContainer( 124 self.total_info + other.total_info, 125 {k: v + other.layers_info[k] for k, v in self.layers_info.items()}, 126 ) 127 128 def get_human_readable_values(self) -> dict: 129 return { 130 "total_info": self.total_info.get_human_readable_values(), 131 "layers_info": [ 132 layer.get_human_readable_values() for layer in self.layers_info.values() 133 ], 134 } 135 136 def reset(self) -> StatsContainer: 137 self.total_info.reset 138 for layer in self.layers_info.values(): 139 layer.reset() 140 return self
72class AsyncCompose(AsyncFilter): 73 def __init__( 74 self, 75 filters: list[AsyncFilter | Filter], 76 random_state: int | np.random.Generator | None = None, 77 executor: ThreadPoolExecutor | None = None, 78 *args: Any, 79 **kwargs: Any, 80 ): 81 super().__init__(random_state=random_state, *args, **kwargs) 82 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 83 self._statistics.name = "Total" 84 self._has_external_executor = executor is not None 85 self._executor = executor or ThreadPoolExecutor() 86 self.set_filters(filters) 87 88 def set_filters(self, filters: list[AsyncFilter | Filter]) -> None: 89 self.filters: list[AsyncFilter] = [] 90 filter_idx = 0 91 for f in filters: 92 if isinstance(f, (AsyncCompose, Compose)): 93 for sub in f.filters: 94 name = f"{filter_idx}-{sub.__class__.__name__}" 95 if isinstance(sub, Filter): 96 name = f"{filter_idx}-{sub.__class__.__name__}" 97 sub = AsyncFilterAdapter(sub, executor=self._executor) 98 99 sub._set_rng_if_not_initialized(self._rng) 100 sub.name = name 101 sub._statistics.name = name 102 self.filters.append(sub) 103 filter_idx += 1 104 else: 105 name = f"{filter_idx}-{f.__class__.__name__}" 106 if isinstance(f, Filter): 107 name = f"{filter_idx}-{f.__class__.__name__}" 108 f = AsyncFilterAdapter(f, executor=self._executor) 109 f._set_rng_if_not_initialized(self._rng) 110 f.name = name 111 f._statistics.name = name 112 self.filters.append(f) 113 filter_idx += 1 114 115 async def apply(self, document: Document) -> Document: 116 stat = get_doc_info(document) 117 for filter_idx, filt in enumerate(self.filters): 118 document = await filt._apply(document) 119 new_stat = get_doc_info(document) 120 async with self._stats_lock: 121 self._statistics.update_by_diff(stat, new_stat) 122 return document 123 124 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 125 stats = [get_doc_info(doc) for doc in batch] 126 for i, filt in enumerate(self.filters): 127 batch = await filt._apply_batch(batch) 128 batch = await self._finalize_batch(batch, stats) 129 return list(batch) 130 131 async def apply_stream( 132 self, 133 stream: AsyncIterable[Document] | Iterable[Document], 134 ) -> AsyncGenerator[Document, None]: 135 async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor) 136 async_stream = self._count_input_stats(async_stream) 137 138 for i, filt in enumerate(self.filters): 139 async_stream = filt.apply_stream(async_stream) 140 141 async for doc in async_stream: 142 in_stat = doc.extras["__init_stats"] 143 out_stat = get_doc_info(doc) 144 async with self._stats_lock: 145 self._statistics.update_by_diff(in_stat, out_stat) 146 del doc.extras["__init_stats"] 147 yield doc 148 149 async def _count_input_stats( 150 self, async_stream: AsyncIterable[Document] 151 ) -> AsyncGenerator[Document, None]: 152 async for doc in async_stream: 153 doc.extras["__init_stats"] = get_doc_info(doc) 154 yield doc 155 156 def get_total_statistics(self) -> list[Statistics]: 157 """ 158 Get the statistics of the Compose object and sub filters. 159 160 The statistics of the Compose class are stored in an object with the name "Total", 161 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 162 """ 163 stats = [] 164 stats.append(self.get_statistics()) 165 for i, filt in enumerate(self.filters): 166 stats.append(filt.get_statistics()) 167 return stats 168 169 def get_total_statistics_map(self) -> list[dict[str, Any]]: 170 """ 171 Get the statistics of the Compose object and sub filters as a list of dictionaries. 172 """ 173 stats = self.get_total_statistics() 174 return [stat.to_dict() for stat in stats] 175 176 async def shutdown(self) -> None: 177 for filt in self.filters: 178 await filt.shutdown() 179 if not self._has_external_executor: 180 self._executor.shutdown() 181 182 async def __aenter__(self) -> "AsyncCompose": 183 return self 184 185 async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 186 await self.shutdown() 187 if exc_type is not None: 188 raise exc_value
Helper class that provides a standard way to create an ABC using inheritance.
73 def __init__( 74 self, 75 filters: list[AsyncFilter | Filter], 76 random_state: int | np.random.Generator | None = None, 77 executor: ThreadPoolExecutor | None = None, 78 *args: Any, 79 **kwargs: Any, 80 ): 81 super().__init__(random_state=random_state, *args, **kwargs) 82 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 83 self._statistics.name = "Total" 84 self._has_external_executor = executor is not None 85 self._executor = executor or ThreadPoolExecutor() 86 self.set_filters(filters)
Base class for asynchronous filters.
Parameters
p : float
The probability of applying the filter.
If p
is 1, the filter will always be applied.
skip_rejected : bool
If True
, the filter will skip documents that are already rejected.
If you want to apply the filter to all documents (e.g., postprocess), set this to False
.
random_state : Optional[Union[int, np.random.Generator]]
Seed for the random number generator.
If None
is specified, the random number generator managed by the Compose class will be used.
use_batch : bool
If True
, the filter will process documents in batches in the apply_stream
method.
batch_size : int
The size of the batch to process documents in the apply_stream
method.
88 def set_filters(self, filters: list[AsyncFilter | Filter]) -> None: 89 self.filters: list[AsyncFilter] = [] 90 filter_idx = 0 91 for f in filters: 92 if isinstance(f, (AsyncCompose, Compose)): 93 for sub in f.filters: 94 name = f"{filter_idx}-{sub.__class__.__name__}" 95 if isinstance(sub, Filter): 96 name = f"{filter_idx}-{sub.__class__.__name__}" 97 sub = AsyncFilterAdapter(sub, executor=self._executor) 98 99 sub._set_rng_if_not_initialized(self._rng) 100 sub.name = name 101 sub._statistics.name = name 102 self.filters.append(sub) 103 filter_idx += 1 104 else: 105 name = f"{filter_idx}-{f.__class__.__name__}" 106 if isinstance(f, Filter): 107 name = f"{filter_idx}-{f.__class__.__name__}" 108 f = AsyncFilterAdapter(f, executor=self._executor) 109 f._set_rng_if_not_initialized(self._rng) 110 f.name = name 111 f._statistics.name = name 112 self.filters.append(f) 113 filter_idx += 1
115 async def apply(self, document: Document) -> Document: 116 stat = get_doc_info(document) 117 for filter_idx, filt in enumerate(self.filters): 118 document = await filt._apply(document) 119 new_stat = get_doc_info(document) 120 async with self._stats_lock: 121 self._statistics.update_by_diff(stat, new_stat) 122 return document
Definition of async filter behavior.
In this method, the filter will modify document.text
or
document.extras
and set document.is_rejected = True
to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
124 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 125 stats = [get_doc_info(doc) for doc in batch] 126 for i, filt in enumerate(self.filters): 127 batch = await filt._apply_batch(batch) 128 batch = await self._finalize_batch(batch, stats) 129 return list(batch)
Apply the filter to a Sequence of documents.
By default, the processing implemented in apply
is executed asynchronously and concurrently.
If the filter processing can be optimized for batch processing, override this method.
131 async def apply_stream( 132 self, 133 stream: AsyncIterable[Document] | Iterable[Document], 134 ) -> AsyncGenerator[Document, None]: 135 async_stream = handle_stream_as_async(stream, chunk_size=1000, executor=self._executor) 136 async_stream = self._count_input_stats(async_stream) 137 138 for i, filt in enumerate(self.filters): 139 async_stream = filt.apply_stream(async_stream) 140 141 async for doc in async_stream: 142 in_stat = doc.extras["__init_stats"] 143 out_stat = get_doc_info(doc) 144 async with self._stats_lock: 145 self._statistics.update_by_diff(in_stat, out_stat) 146 del doc.extras["__init_stats"] 147 yield doc
Apply the filter to a stream of documents (Iterable or AsyncIterable).
If use_batch is set to True
at initialization, the filter will process documents in batches.
If the stream is not asynchronous, use handle_stream_as_async to convert it to an asynchronous stream.
Even if an exception occurs during processing, the process will continue, and the following actions will be taken:
- Set the
is_rejected
flag of the document toTrue
- Set the error details in
reject_reason
- Increment the
errors
count in the statistics retrievable viaget_statistics
156 def get_total_statistics(self) -> list[Statistics]: 157 """ 158 Get the statistics of the Compose object and sub filters. 159 160 The statistics of the Compose class are stored in an object with the name "Total", 161 and sub-filters's are stored with names in the format {filter_index}-{filter class name}. 162 """ 163 stats = [] 164 stats.append(self.get_statistics()) 165 for i, filt in enumerate(self.filters): 166 stats.append(filt.get_statistics()) 167 return stats
Get the statistics of the Compose object and sub filters.
The statistics of the Compose class are stored in an object with the name "Total", and sub-filters's are stored with names in the format {filter_index}-{filter class name}.
169 def get_total_statistics_map(self) -> list[dict[str, Any]]: 170 """ 171 Get the statistics of the Compose object and sub filters as a list of dictionaries. 172 """ 173 stats = self.get_total_statistics() 174 return [stat.to_dict() for stat in stats]
Get the statistics of the Compose object and sub filters as a list of dictionaries.
176 async def shutdown(self) -> None: 177 for filt in self.filters: 178 await filt.shutdown() 179 if not self._has_external_executor: 180 self._executor.shutdown()
You can override this method to release resources or perform cleanup tasks.
Inherited Members
18class AsyncFilterAdapter(AsyncFilter): 19 """ 20 Adapter class for executing hojichar.Filter asynchronously. 21 """ 22 23 def __init__( 24 self, 25 sync_filter: Filter, 26 *args: Any, 27 executor: Executor | None = None, 28 use_batch: bool = True, 29 **kwargs: Any, 30 ): 31 """ 32 Adapter class for executing hojichar.Filter asynchronously. 33 Used to incorporate Filter into AsyncCompose. 34 35 To reduce the overhead of asynchronous context switching, 36 use_batch is set to True by default to process in batches 37 in apply_stream, regardless of the sync_filter's use_batch setting. 38 39 If performing CPU-bound and heavy processing, you can specify an executor 40 to offload the processing to the executor. However, due to Python's GIL 41 constraints, using ThreadPoolExecutor will not parallelize CPU-bound 42 processing, and the entire process will be locked. 43 44 By using ProcessPoolExecutor as the executor, it may be possible to 45 parallelize CPU-bound processing. However, for parallelizing CPU-bound 46 processing, it is recommended to use the hojichar.Parallel class to 47 parallelize synchronous Compose pipeline. 48 """ 49 super().__init__(*args, use_batch=use_batch, **kwargs) 50 self.sync_filter = sync_filter 51 self._has_external_executor = executor is not None 52 self._executor = executor or ThreadPoolExecutor() 53 self.batch_size = sync_filter.batch_size 54 55 async def apply(self, document: Document) -> Document: 56 loop = asyncio.get_running_loop() 57 return await loop.run_in_executor(self._executor, self.sync_filter.apply, document) 58 59 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 60 loop = asyncio.get_running_loop() 61 return await loop.run_in_executor( 62 self._executor, 63 lambda: self.sync_filter.apply_batch(batch), 64 ) 65 66 async def shutdown(self) -> None: 67 self.sync_filter.shutdown() 68 if not self._has_external_executor: 69 self._executor.shutdown()
Adapter class for executing hojichar.Filter asynchronously.
23 def __init__( 24 self, 25 sync_filter: Filter, 26 *args: Any, 27 executor: Executor | None = None, 28 use_batch: bool = True, 29 **kwargs: Any, 30 ): 31 """ 32 Adapter class for executing hojichar.Filter asynchronously. 33 Used to incorporate Filter into AsyncCompose. 34 35 To reduce the overhead of asynchronous context switching, 36 use_batch is set to True by default to process in batches 37 in apply_stream, regardless of the sync_filter's use_batch setting. 38 39 If performing CPU-bound and heavy processing, you can specify an executor 40 to offload the processing to the executor. However, due to Python's GIL 41 constraints, using ThreadPoolExecutor will not parallelize CPU-bound 42 processing, and the entire process will be locked. 43 44 By using ProcessPoolExecutor as the executor, it may be possible to 45 parallelize CPU-bound processing. However, for parallelizing CPU-bound 46 processing, it is recommended to use the hojichar.Parallel class to 47 parallelize synchronous Compose pipeline. 48 """ 49 super().__init__(*args, use_batch=use_batch, **kwargs) 50 self.sync_filter = sync_filter 51 self._has_external_executor = executor is not None 52 self._executor = executor or ThreadPoolExecutor() 53 self.batch_size = sync_filter.batch_size
Adapter class for executing hojichar.Filter asynchronously. Used to incorporate Filter into AsyncCompose.
To reduce the overhead of asynchronous context switching, use_batch is set to True by default to process in batches in apply_stream, regardless of the sync_filter's use_batch setting.
If performing CPU-bound and heavy processing, you can specify an executor to offload the processing to the executor. However, due to Python's GIL constraints, using ThreadPoolExecutor will not parallelize CPU-bound processing, and the entire process will be locked.
By using ProcessPoolExecutor as the executor, it may be possible to parallelize CPU-bound processing. However, for parallelizing CPU-bound processing, it is recommended to use the hojichar.Parallel class to parallelize synchronous Compose pipeline.
55 async def apply(self, document: Document) -> Document: 56 loop = asyncio.get_running_loop() 57 return await loop.run_in_executor(self._executor, self.sync_filter.apply, document)
Definition of async filter behavior.
In this method, the filter will modify document.text
or
document.extras
and set document.is_rejected = True
to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
59 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 60 loop = asyncio.get_running_loop() 61 return await loop.run_in_executor( 62 self._executor, 63 lambda: self.sync_filter.apply_batch(batch), 64 )
Apply the filter to a Sequence of documents.
By default, the processing implemented in apply
is executed asynchronously and concurrently.
If the filter processing can be optimized for batch processing, override this method.
66 async def shutdown(self) -> None: 67 self.sync_filter.shutdown() 68 if not self._has_external_executor: 69 self._executor.shutdown()
You can override this method to release resources or perform cleanup tasks.
Inherited Members
34class AsyncFilter(ABC): 35 def __init__( 36 self, 37 *args: Any, 38 p: float = 1.0, 39 skip_rejected: bool = True, 40 random_state: int | np.random.Generator | None = None, 41 use_batch: bool = True, 42 batch_size: int = 128, 43 **kwargs: Any, 44 ): 45 """ 46 Base class for asynchronous filters. 47 48 Parameters 49 ---------- 50 p : float 51 The probability of applying the filter. 52 If `p` is 1, the filter will always be applied. 53 skip_rejected : bool 54 If `True`, the filter will skip documents that are already rejected. 55 If you want to apply the filter to all documents (e.g., postprocess), set this to `False`. 56 random_state : Optional[Union[int, np.random.Generator]] 57 Seed for the random number generator. 58 If `None` is specified, the random number generator managed by the Compose class will be used. 59 use_batch : bool 60 If `True`, the filter will process documents in batches in the `apply_stream` method. 61 batch_size : int 62 The size of the batch to process documents in the `apply_stream` method. 63 """ 64 self.name = self.__class__.__name__ 65 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 66 assert 0 <= p <= 1 67 self.p = p 68 self.__init_rng(random_state) 69 self.skip_rejected = skip_rejected 70 self.use_batch = use_batch 71 self.batch_size = batch_size 72 73 self._statistics = Statistics(name=self.name) 74 self._stats_lock = asyncio.Lock() 75 76 @abstractmethod 77 async def apply(self, document: Document) -> Document: 78 """ 79 Definition of async filter behavior. 80 81 In this method, the filter will modify `document.text` or 82 `document.extras` and set `document.is_rejected = True` to discard the document. 83 84 Parameters 85 ---------- 86 document : Document 87 Input document 88 89 Returns 90 ------- 91 Document 92 Processed Document 93 """ 94 pass 95 96 def _check_skip(self, document: Document) -> bool: 97 """ 98 Check if the document should be skipped by this filter. 99 If `skip_rejected` is set to `True`, this method will return `True` 100 if the document is already rejected. 101 If `p` is less than 1, this method will return `True` with a probability of `1 - p`. 102 """ 103 skip = self.skip_rejected and document.is_rejected 104 if skip: 105 return True 106 if self.p < 1: 107 if self._rng.random() > self.p: 108 return True 109 return False 110 111 async def _apply(self, document: Document) -> Document: 112 stats = get_doc_info(document) 113 if not self._check_skip(document): 114 document = await self.apply(document) 115 new_stats = get_doc_info(document) 116 async with self._stats_lock: 117 self._statistics.update_by_diff(stats, new_stats) 118 119 if not stats["is_rejected"] and new_stats["is_rejected"]: 120 document.reject_reason = self.get_jsonable_vars() 121 return document 122 123 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 124 """ 125 Apply the filter to a Sequence of documents. 126 By default, the processing implemented in `apply` is executed asynchronously and concurrently. 127 If the filter processing can be optimized for batch processing, override this method. 128 """ 129 tasks = [self.apply(doc) for doc in batch] 130 return await asyncio.gather(*tasks) 131 132 async def _apply_batch(self, batch: Sequence[Document]) -> list[Document]: 133 skip = False 134 if self.p < 1: 135 skip = self._rng.random() > self.p 136 137 stats = [get_doc_info(doc) for doc in batch] 138 if not skip: 139 batch = await self.apply_batch(batch) 140 batch = await self._finalize_batch(batch, stats) 141 return list(batch) 142 143 async def apply_stream( 144 self, 145 stream: Iterable[Document] | AsyncIterable[Document], 146 ) -> AsyncGenerator[Document, None]: 147 """ 148 Apply the filter to a stream of documents (Iterable or AsyncIterable). 149 If use_batch is set to `True` at initialization, the filter will process documents in batches. 150 If the stream is not asynchronous, use handle_stream_as_async to convert it to an asynchronous stream. 151 152 Even if an exception occurs during processing, the process will continue, and the following actions will be taken: 153 - Set the `is_rejected` flag of the document to `True` 154 - Set the error details in `reject_reason` 155 - Increment the `errors` count in the statistics retrievable via `get_statistics` 156 """ 157 async_stream: AsyncIterable[Document] = handle_stream_as_async(stream) 158 159 if not self.use_batch: 160 async for doc in async_stream: 161 yield await self._try_process(doc, self._apply) 162 else: 163 batch: list[Document] = [] 164 async for doc in async_stream: 165 if self._check_skip(doc): 166 yield doc 167 continue 168 169 batch.append(doc) 170 # Batch size reached, apply batch 171 if len(batch) >= self.batch_size: 172 stats = [get_doc_info(doc) for doc in batch] 173 batch = await self._try_process(batch, self.apply_batch) 174 batch = await self._finalize_batch(batch, stats) 175 for out in batch: 176 yield out 177 batch.clear() 178 179 # Flush remaining documents in the batch 180 if batch: 181 stats = [get_doc_info(doc) for doc in batch] 182 batch = await self._try_process(batch, self.apply_batch) 183 batch = await self._finalize_batch(batch, stats) 184 for out in batch: 185 yield out 186 187 async def _try_process(self, target: T, func: Callable[[T], Awaitable[T]]) -> T: 188 try: 189 return await func(target) 190 except Exception as e: 191 if isinstance(target, Document): 192 msg = f"{e!r} occurs while processing {self.name} with {target!r}" 193 self.logger.error(msg, exc_info=True) 194 target.is_rejected = True 195 target.reject_reason = {"error": msg} 196 async with self._stats_lock: 197 self._statistics.errors += 1 198 return target # type: ignore[return-value] 199 if isinstance(target, list): 200 msg = f"{e!r} occurs while batch processing {self.name}" 201 self.logger.error(msg, exc_info=True) 202 for doc in target: 203 doc.is_rejected = True 204 doc.reject_reason = {"error": msg} 205 async with self._stats_lock: 206 self._statistics.errors += len(target) 207 return target # type: ignore[return-value] 208 raise e 209 210 async def __call__(self, text: str) -> str: 211 document = Document(text=text) 212 return (await self._apply(document)).text 213 214 def get_statistics(self) -> Statistics: 215 """ 216 Get the statistics of this filter. 217 Returns: 218 Statistics: The statistics of this filter. 219 """ 220 return self._statistics 221 222 def get_statistics_map(self) -> dict[str, Statistics]: 223 """ 224 Get the statistics of this filter as a dictionary. 225 """ 226 return self._statistics.to_dict() 227 228 async def shutdown(self) -> None: 229 """ 230 You can override this method to release resources or perform cleanup tasks. 231 """ 232 pass 233 234 async def __aenter__(self) -> "AsyncFilter": 235 return self 236 237 async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 238 await self.shutdown() 239 240 def get_jsonable_vars(self, exclude_keys: set[str] | None = None) -> dict[str, Any]: 241 """ 242 Get the member variable of this filter. 243 Eligible variables are primitive types; [bool, int, float, str, None], 244 and the name of the variable not starts with the underscore; `_`. 245 """ 246 if exclude_keys is None: 247 exclude_keys = set() 248 return { 249 k: v 250 for k, v in vars(self).items() 251 if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_"))) 252 } 253 254 async def _finalize_batch( 255 self, 256 batch: Sequence[Document], 257 old_stats: list[dict[str, Any]], 258 ) -> list[Document]: 259 new_stats = [get_doc_info(doc) for doc in batch] 260 for old, new, doc in zip(old_stats, new_stats, batch): 261 async with self._stats_lock: 262 self._statistics.update_by_diff(old, new) 263 if not old["is_rejected"] and new["is_rejected"]: 264 doc.reject_reason = self.get_jsonable_vars() 265 return list(batch) 266 267 def __init_rng(self, random_state: int | np.random.Generator | None) -> None: 268 self._owns_rng = True 269 if random_state is None: 270 self._rng = np.random.default_rng() 271 self._owns_rng = False 272 elif isinstance(random_state, int): 273 self._rng = np.random.default_rng(random_state) 274 elif isinstance(random_state, np.random.Generator): 275 self._rng = random_state 276 else: 277 raise TypeError( 278 f"random_state must be int or np.random.Generator, not {type(random_state)}" 279 ) 280 281 def _set_rng_if_not_initialized(self, rng: np.random.Generator) -> None: 282 """ 283 Set the random number generator for this filter if it is not already initialized. 284 This method is called by Compose class. 285 """ 286 if not self._owns_rng: 287 self._rng = rng
Helper class that provides a standard way to create an ABC using inheritance.
35 def __init__( 36 self, 37 *args: Any, 38 p: float = 1.0, 39 skip_rejected: bool = True, 40 random_state: int | np.random.Generator | None = None, 41 use_batch: bool = True, 42 batch_size: int = 128, 43 **kwargs: Any, 44 ): 45 """ 46 Base class for asynchronous filters. 47 48 Parameters 49 ---------- 50 p : float 51 The probability of applying the filter. 52 If `p` is 1, the filter will always be applied. 53 skip_rejected : bool 54 If `True`, the filter will skip documents that are already rejected. 55 If you want to apply the filter to all documents (e.g., postprocess), set this to `False`. 56 random_state : Optional[Union[int, np.random.Generator]] 57 Seed for the random number generator. 58 If `None` is specified, the random number generator managed by the Compose class will be used. 59 use_batch : bool 60 If `True`, the filter will process documents in batches in the `apply_stream` method. 61 batch_size : int 62 The size of the batch to process documents in the `apply_stream` method. 63 """ 64 self.name = self.__class__.__name__ 65 self.logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}") 66 assert 0 <= p <= 1 67 self.p = p 68 self.__init_rng(random_state) 69 self.skip_rejected = skip_rejected 70 self.use_batch = use_batch 71 self.batch_size = batch_size 72 73 self._statistics = Statistics(name=self.name) 74 self._stats_lock = asyncio.Lock()
Base class for asynchronous filters.
Parameters
p : float
The probability of applying the filter.
If p
is 1, the filter will always be applied.
skip_rejected : bool
If True
, the filter will skip documents that are already rejected.
If you want to apply the filter to all documents (e.g., postprocess), set this to False
.
random_state : Optional[Union[int, np.random.Generator]]
Seed for the random number generator.
If None
is specified, the random number generator managed by the Compose class will be used.
use_batch : bool
If True
, the filter will process documents in batches in the apply_stream
method.
batch_size : int
The size of the batch to process documents in the apply_stream
method.
76 @abstractmethod 77 async def apply(self, document: Document) -> Document: 78 """ 79 Definition of async filter behavior. 80 81 In this method, the filter will modify `document.text` or 82 `document.extras` and set `document.is_rejected = True` to discard the document. 83 84 Parameters 85 ---------- 86 document : Document 87 Input document 88 89 Returns 90 ------- 91 Document 92 Processed Document 93 """ 94 pass
Definition of async filter behavior.
In this method, the filter will modify document.text
or
document.extras
and set document.is_rejected = True
to discard the document.
Parameters
document : Document Input document
Returns
Document Processed Document
123 async def apply_batch(self, batch: Sequence[Document]) -> list[Document]: 124 """ 125 Apply the filter to a Sequence of documents. 126 By default, the processing implemented in `apply` is executed asynchronously and concurrently. 127 If the filter processing can be optimized for batch processing, override this method. 128 """ 129 tasks = [self.apply(doc) for doc in batch] 130 return await asyncio.gather(*tasks)
Apply the filter to a Sequence of documents.
By default, the processing implemented in apply
is executed asynchronously and concurrently.
If the filter processing can be optimized for batch processing, override this method.
143 async def apply_stream( 144 self, 145 stream: Iterable[Document] | AsyncIterable[Document], 146 ) -> AsyncGenerator[Document, None]: 147 """ 148 Apply the filter to a stream of documents (Iterable or AsyncIterable). 149 If use_batch is set to `True` at initialization, the filter will process documents in batches. 150 If the stream is not asynchronous, use handle_stream_as_async to convert it to an asynchronous stream. 151 152 Even if an exception occurs during processing, the process will continue, and the following actions will be taken: 153 - Set the `is_rejected` flag of the document to `True` 154 - Set the error details in `reject_reason` 155 - Increment the `errors` count in the statistics retrievable via `get_statistics` 156 """ 157 async_stream: AsyncIterable[Document] = handle_stream_as_async(stream) 158 159 if not self.use_batch: 160 async for doc in async_stream: 161 yield await self._try_process(doc, self._apply) 162 else: 163 batch: list[Document] = [] 164 async for doc in async_stream: 165 if self._check_skip(doc): 166 yield doc 167 continue 168 169 batch.append(doc) 170 # Batch size reached, apply batch 171 if len(batch) >= self.batch_size: 172 stats = [get_doc_info(doc) for doc in batch] 173 batch = await self._try_process(batch, self.apply_batch) 174 batch = await self._finalize_batch(batch, stats) 175 for out in batch: 176 yield out 177 batch.clear() 178 179 # Flush remaining documents in the batch 180 if batch: 181 stats = [get_doc_info(doc) for doc in batch] 182 batch = await self._try_process(batch, self.apply_batch) 183 batch = await self._finalize_batch(batch, stats) 184 for out in batch: 185 yield out
Apply the filter to a stream of documents (Iterable or AsyncIterable).
If use_batch is set to True
at initialization, the filter will process documents in batches.
If the stream is not asynchronous, use handle_stream_as_async to convert it to an asynchronous stream.
Even if an exception occurs during processing, the process will continue, and the following actions will be taken:
- Set the
is_rejected
flag of the document toTrue
- Set the error details in
reject_reason
- Increment the
errors
count in the statistics retrievable viaget_statistics
214 def get_statistics(self) -> Statistics: 215 """ 216 Get the statistics of this filter. 217 Returns: 218 Statistics: The statistics of this filter. 219 """ 220 return self._statistics
Get the statistics of this filter. Returns: Statistics: The statistics of this filter.
222 def get_statistics_map(self) -> dict[str, Statistics]: 223 """ 224 Get the statistics of this filter as a dictionary. 225 """ 226 return self._statistics.to_dict()
Get the statistics of this filter as a dictionary.
228 async def shutdown(self) -> None: 229 """ 230 You can override this method to release resources or perform cleanup tasks. 231 """ 232 pass
You can override this method to release resources or perform cleanup tasks.
240 def get_jsonable_vars(self, exclude_keys: set[str] | None = None) -> dict[str, Any]: 241 """ 242 Get the member variable of this filter. 243 Eligible variables are primitive types; [bool, int, float, str, None], 244 and the name of the variable not starts with the underscore; `_`. 245 """ 246 if exclude_keys is None: 247 exclude_keys = set() 248 return { 249 k: v 250 for k, v in vars(self).items() 251 if (_is_jsonable(v) and (k not in exclude_keys) and (not k.startswith("_"))) 252 }
Get the member variable of this filter.
Eligible variables are primitive types; [bool, int, float, str, None],
and the name of the variable not starts with the underscore; _
.