hojichar.core.parallel
1from __future__ import annotations 2 3import functools 4import logging 5import multiprocessing 6import os 7import signal 8from copy import copy 9from typing import Iterator 10 11import hojichar 12from hojichar.core.inspection import StatsContainer 13 14logger = logging.getLogger(__name__) 15 16 17PARALLEL_BASE_FILTER: hojichar.Compose 18WORKER_PARAM_IGNORE_ERRORS: bool 19 20 21def _init_worker(filter: hojichar.Compose, ignore_errors: bool) -> None: 22 signal.signal(signal.SIGINT, signal.SIG_IGN) 23 global PARALLEL_BASE_FILTER, WORKER_PARAM_IGNORE_ERRORS 24 PARALLEL_BASE_FILTER = hojichar.Compose(copy(filter.filters)) # TODO random state treatment 25 WORKER_PARAM_IGNORE_ERRORS = ignore_errors 26 27 28def _worker( 29 doc: hojichar.Document, 30) -> tuple[hojichar.Document, int, StatsContainer, str | None]: 31 global PARALLEL_BASE_FILTER, WORKER_PARAM_IGNORE_ERRORS 32 ignore_errors = WORKER_PARAM_IGNORE_ERRORS 33 error_message = None 34 try: 35 result = PARALLEL_BASE_FILTER.apply(doc) 36 except Exception as e: 37 if ignore_errors: 38 logger.error(e) 39 error_message = str(e) 40 result = hojichar.Document("", is_rejected=True) 41 else: 42 raise e # If we're not ignoring errors, let this one propagate 43 return result, os.getpid(), PARALLEL_BASE_FILTER.statistics_obj, error_message 44 45 46class Parallel: 47 """ 48 The Parallel class provides a way to apply a hojichar.Compose filter 49 to an iterator of documents in a parallel manner using a specified 50 number of worker processes. This class should be used as a context 51 manager with a 'with' statement. 52 53 Example: 54 55 doc_iter = (hojichar.Document(d) for d in open("my_text.txt")) 56 with Parallel(my_filter, num_jobs=8) as pfilter: 57 for doc in pfilter.imap_apply(doc_iter): 58 pass # Process the filtered document as needed. 59 """ 60 61 def __init__( 62 self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False 63 ): 64 """ 65 Initializes a new instance of the Parallel class. 66 67 Args: 68 filter (hojichar.Compose): A composed filter object that specifies the 69 processing operations to apply to each document in parallel. 70 A copy of the filter is made within a 'with' statement. When the 'with' 71 block terminates,the statistical information obtained through `filter.statistics` 72 or`filter.statistics_obj` is replaced with the total value of the statistical 73 information processed within the 'with' block. 74 75 num_jobs (int | None, optional): The number of worker processes to use. 76 If None, then the number returned by os.cpu_count() is used. Defaults to None. 77 ignore_errors (bool, optional): If set to True, any exceptions thrown during 78 the processing of a document will be caught and logged, but will not 79 stop the processing of further documents. If set to False, the first 80 exception thrown will terminate the entire parallel processing operation. 81 Defaults to False. 82 """ 83 self.filter = filter 84 self.num_jobs = num_jobs 85 self.ignore_errors = ignore_errors 86 87 self._pool: multiprocessing.pool.Pool | None = None 88 self._pid_stats: dict[int, StatsContainer] | None = None 89 90 def __enter__(self) -> Parallel: 91 self._pool = multiprocessing.Pool( 92 processes=self.num_jobs, 93 initializer=_init_worker, 94 initargs=(self.filter, self.ignore_errors), 95 ) 96 self._pid_stats = dict() 97 return self 98 99 def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]: 100 """ 101 Takes an iterator of Documents and applies the Compose filter to 102 each Document in a parallel manner. This is a generator method 103 that yields processed Documents. 104 105 Args: 106 docs (Iterator[hojichar.Document]): An iterator of Documents to be processed. 107 108 Raises: 109 RuntimeError: If the Parallel instance is not properly initialized. This 110 generally happens when the method is called outside of a 'with' statement. 111 Exception: If any exceptions are raised within the worker processes. 112 113 Yields: 114 Iterator[hojichar.Document]: An iterator that yields processed Documents. 115 """ 116 if self._pool is None or self._pid_stats is None: 117 raise RuntimeError( 118 "Parallel instance not properly initialized. Use within a 'with' statement." 119 ) 120 try: 121 for doc, pid, stats_obj, err_msg in self._pool.imap_unordered(_worker, docs): 122 self._pid_stats[pid] = stats_obj 123 if err_msg is not None: 124 logger.error(f"Error in worker {pid}: {err_msg}") 125 yield doc 126 except Exception: 127 self.__exit__(None, None, None) 128 raise 129 130 def __exit__(self, exc_type, exc_value, traceback) -> None: # type: ignore 131 if self._pool: 132 self._pool.terminate() 133 self._pool.join() 134 if self._pid_stats: 135 self.filter._statistics.stats = self.filter._statistics.stats + functools.reduce( 136 lambda x, y: x + y, self._pid_stats.values() 137 ) 138 139 @property 140 def statistics_obj(self) -> StatsContainer: 141 """ 142 Returns a statistics object of the total statistical 143 values processed within the Parallel block. 144 145 Returns: 146 StatsContainer: Statistics object 147 """ 148 if self._pid_stats: 149 stats: StatsContainer = functools.reduce(lambda x, y: x + y, self._pid_stats.values()) 150 else: 151 stats = copy(self.filter.statistics_obj).reset() 152 return stats 153 154 @property 155 def statistics(self) -> dict: 156 """ 157 Returns a statistics dict which friendly with human of the total statistical 158 values processed within the Parallel block. 159 160 Returns: 161 dict: Human readable statistics values 162 """ 163 return self.statistics_obj.get_human_readable_values()
47class Parallel: 48 """ 49 The Parallel class provides a way to apply a hojichar.Compose filter 50 to an iterator of documents in a parallel manner using a specified 51 number of worker processes. This class should be used as a context 52 manager with a 'with' statement. 53 54 Example: 55 56 doc_iter = (hojichar.Document(d) for d in open("my_text.txt")) 57 with Parallel(my_filter, num_jobs=8) as pfilter: 58 for doc in pfilter.imap_apply(doc_iter): 59 pass # Process the filtered document as needed. 60 """ 61 62 def __init__( 63 self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False 64 ): 65 """ 66 Initializes a new instance of the Parallel class. 67 68 Args: 69 filter (hojichar.Compose): A composed filter object that specifies the 70 processing operations to apply to each document in parallel. 71 A copy of the filter is made within a 'with' statement. When the 'with' 72 block terminates,the statistical information obtained through `filter.statistics` 73 or`filter.statistics_obj` is replaced with the total value of the statistical 74 information processed within the 'with' block. 75 76 num_jobs (int | None, optional): The number of worker processes to use. 77 If None, then the number returned by os.cpu_count() is used. Defaults to None. 78 ignore_errors (bool, optional): If set to True, any exceptions thrown during 79 the processing of a document will be caught and logged, but will not 80 stop the processing of further documents. If set to False, the first 81 exception thrown will terminate the entire parallel processing operation. 82 Defaults to False. 83 """ 84 self.filter = filter 85 self.num_jobs = num_jobs 86 self.ignore_errors = ignore_errors 87 88 self._pool: multiprocessing.pool.Pool | None = None 89 self._pid_stats: dict[int, StatsContainer] | None = None 90 91 def __enter__(self) -> Parallel: 92 self._pool = multiprocessing.Pool( 93 processes=self.num_jobs, 94 initializer=_init_worker, 95 initargs=(self.filter, self.ignore_errors), 96 ) 97 self._pid_stats = dict() 98 return self 99 100 def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]: 101 """ 102 Takes an iterator of Documents and applies the Compose filter to 103 each Document in a parallel manner. This is a generator method 104 that yields processed Documents. 105 106 Args: 107 docs (Iterator[hojichar.Document]): An iterator of Documents to be processed. 108 109 Raises: 110 RuntimeError: If the Parallel instance is not properly initialized. This 111 generally happens when the method is called outside of a 'with' statement. 112 Exception: If any exceptions are raised within the worker processes. 113 114 Yields: 115 Iterator[hojichar.Document]: An iterator that yields processed Documents. 116 """ 117 if self._pool is None or self._pid_stats is None: 118 raise RuntimeError( 119 "Parallel instance not properly initialized. Use within a 'with' statement." 120 ) 121 try: 122 for doc, pid, stats_obj, err_msg in self._pool.imap_unordered(_worker, docs): 123 self._pid_stats[pid] = stats_obj 124 if err_msg is not None: 125 logger.error(f"Error in worker {pid}: {err_msg}") 126 yield doc 127 except Exception: 128 self.__exit__(None, None, None) 129 raise 130 131 def __exit__(self, exc_type, exc_value, traceback) -> None: # type: ignore 132 if self._pool: 133 self._pool.terminate() 134 self._pool.join() 135 if self._pid_stats: 136 self.filter._statistics.stats = self.filter._statistics.stats + functools.reduce( 137 lambda x, y: x + y, self._pid_stats.values() 138 ) 139 140 @property 141 def statistics_obj(self) -> StatsContainer: 142 """ 143 Returns a statistics object of the total statistical 144 values processed within the Parallel block. 145 146 Returns: 147 StatsContainer: Statistics object 148 """ 149 if self._pid_stats: 150 stats: StatsContainer = functools.reduce(lambda x, y: x + y, self._pid_stats.values()) 151 else: 152 stats = copy(self.filter.statistics_obj).reset() 153 return stats 154 155 @property 156 def statistics(self) -> dict: 157 """ 158 Returns a statistics dict which friendly with human of the total statistical 159 values processed within the Parallel block. 160 161 Returns: 162 dict: Human readable statistics values 163 """ 164 return self.statistics_obj.get_human_readable_values()
The Parallel class provides a way to apply a hojichar.Compose filter to an iterator of documents in a parallel manner using a specified number of worker processes. This class should be used as a context manager with a 'with' statement.
Example:
doc_iter = (hojichar.Document(d) for d in open("my_text.txt")) with Parallel(my_filter, num_jobs=8) as pfilter: for doc in pfilter.imap_apply(doc_iter): pass # Process the filtered document as needed.
62 def __init__( 63 self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False 64 ): 65 """ 66 Initializes a new instance of the Parallel class. 67 68 Args: 69 filter (hojichar.Compose): A composed filter object that specifies the 70 processing operations to apply to each document in parallel. 71 A copy of the filter is made within a 'with' statement. When the 'with' 72 block terminates,the statistical information obtained through `filter.statistics` 73 or`filter.statistics_obj` is replaced with the total value of the statistical 74 information processed within the 'with' block. 75 76 num_jobs (int | None, optional): The number of worker processes to use. 77 If None, then the number returned by os.cpu_count() is used. Defaults to None. 78 ignore_errors (bool, optional): If set to True, any exceptions thrown during 79 the processing of a document will be caught and logged, but will not 80 stop the processing of further documents. If set to False, the first 81 exception thrown will terminate the entire parallel processing operation. 82 Defaults to False. 83 """ 84 self.filter = filter 85 self.num_jobs = num_jobs 86 self.ignore_errors = ignore_errors 87 88 self._pool: multiprocessing.pool.Pool | None = None 89 self._pid_stats: dict[int, StatsContainer] | None = None
Initializes a new instance of the Parallel class.
Args:
filter (hojichar.Compose): A composed filter object that specifies the
processing operations to apply to each document in parallel.
A copy of the filter is made within a 'with' statement. When the 'with'
block terminates,the statistical information obtained through filter.statistics
orfilter.statistics_obj
is replaced with the total value of the statistical
information processed within the 'with' block.
num_jobs (int | None, optional): The number of worker processes to use.
If None, then the number returned by os.cpu_count() is used. Defaults to None.
ignore_errors (bool, optional): If set to True, any exceptions thrown during
the processing of a document will be caught and logged, but will not
stop the processing of further documents. If set to False, the first
exception thrown will terminate the entire parallel processing operation.
Defaults to False.
100 def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]: 101 """ 102 Takes an iterator of Documents and applies the Compose filter to 103 each Document in a parallel manner. This is a generator method 104 that yields processed Documents. 105 106 Args: 107 docs (Iterator[hojichar.Document]): An iterator of Documents to be processed. 108 109 Raises: 110 RuntimeError: If the Parallel instance is not properly initialized. This 111 generally happens when the method is called outside of a 'with' statement. 112 Exception: If any exceptions are raised within the worker processes. 113 114 Yields: 115 Iterator[hojichar.Document]: An iterator that yields processed Documents. 116 """ 117 if self._pool is None or self._pid_stats is None: 118 raise RuntimeError( 119 "Parallel instance not properly initialized. Use within a 'with' statement." 120 ) 121 try: 122 for doc, pid, stats_obj, err_msg in self._pool.imap_unordered(_worker, docs): 123 self._pid_stats[pid] = stats_obj 124 if err_msg is not None: 125 logger.error(f"Error in worker {pid}: {err_msg}") 126 yield doc 127 except Exception: 128 self.__exit__(None, None, None) 129 raise
Takes an iterator of Documents and applies the Compose filter to each Document in a parallel manner. This is a generator method that yields processed Documents.
Args: docs (Iterator[hojichar.Document]): An iterator of Documents to be processed.
Raises: RuntimeError: If the Parallel instance is not properly initialized. This generally happens when the method is called outside of a 'with' statement. Exception: If any exceptions are raised within the worker processes.
Yields: Iterator[hojichar.Document]: An iterator that yields processed Documents.
Returns a statistics object of the total statistical values processed within the Parallel block.
Returns: StatsContainer: Statistics object