hojichar.core.parallel
1from __future__ import annotations 2 3import functools 4import logging 5import os 6import signal 7from copy import copy 8from multiprocessing.pool import Pool 9from typing import Iterator, List 10 11import hojichar 12from hojichar.core import inspection 13from hojichar.core.models import Statistics 14 15logger = logging.getLogger(__name__) 16 17 18PARALLEL_BASE_FILTER: hojichar.Compose 19WORKER_PARAM_IGNORE_ERRORS: bool 20 21 22def _init_worker(filter: hojichar.Compose, ignore_errors: bool) -> None: 23 signal.signal(signal.SIGINT, signal.SIG_IGN) 24 global PARALLEL_BASE_FILTER, WORKER_PARAM_IGNORE_ERRORS 25 PARALLEL_BASE_FILTER = hojichar.Compose(copy(filter.filters)) # TODO random state treatment 26 WORKER_PARAM_IGNORE_ERRORS = ignore_errors 27 28 29def _worker( 30 doc: hojichar.Document, 31) -> tuple[hojichar.Document, int, List[Statistics], str | None]: 32 global PARALLEL_BASE_FILTER, WORKER_PARAM_IGNORE_ERRORS 33 ignore_errors = WORKER_PARAM_IGNORE_ERRORS 34 error_message = None 35 try: 36 result = PARALLEL_BASE_FILTER.apply(doc) 37 except Exception as e: 38 if ignore_errors: 39 logger.error(e) 40 error_message = str(e) 41 result = hojichar.Document("", is_rejected=True) 42 else: 43 raise e # If we're not ignoring errors, let this one propagate 44 return result, os.getpid(), PARALLEL_BASE_FILTER.get_total_statistics(), error_message 45 46 47class Parallel: 48 """ 49 The Parallel class provides a way to apply a hojichar.Compose filter 50 to an iterator of documents in a parallel manner using a specified 51 number of worker processes. This class should be used as a context 52 manager with a 'with' statement. 53 54 Example: 55 56 doc_iter = (hojichar.Document(d) for d in open("my_text.txt")) 57 with Parallel(my_filter, num_jobs=8) as pfilter: 58 for doc in pfilter.imap_apply(doc_iter): 59 pass # Process the filtered document as needed. 60 """ 61 62 def __init__( 63 self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False 64 ): 65 """ 66 Initializes a new instance of the Parallel class. 67 68 Args: 69 filter (hojichar.Compose): A composed filter object that specifies the 70 processing operations to apply to each document in parallel. 71 A copy of the filter is made within a 'with' statement. When the 'with' 72 block terminates,the statistical information obtained through `filter.statistics` 73 or`filter.statistics_obj` is replaced with the total value of the statistical 74 information processed within the 'with' block. 75 76 num_jobs (int | None, optional): The number of worker processes to use. 77 If None, then the number returned by os.cpu_count() is used. Defaults to None. 78 ignore_errors (bool, optional): If set to True, any exceptions thrown during 79 the processing of a document will be caught and logged, but will not 80 stop the processing of further documents. If set to False, the first 81 exception thrown will terminate the entire parallel processing operation. 82 Defaults to False. 83 """ 84 self.filter = filter 85 self.num_jobs = num_jobs 86 self.ignore_errors = ignore_errors 87 88 self._pool: Pool | None = None 89 self._pid_stats: dict[int, List[Statistics]] | None = None 90 91 def __enter__(self) -> Parallel: 92 self._pool = Pool( 93 processes=self.num_jobs, 94 initializer=_init_worker, 95 initargs=(self.filter, self.ignore_errors), 96 ) 97 self._pid_stats = dict() 98 return self 99 100 def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]: 101 """ 102 Takes an iterator of Documents and applies the Compose filter to 103 each Document in a parallel manner. This is a generator method 104 that yields processed Documents. 105 106 Args: 107 docs (Iterator[hojichar.Document]): An iterator of Documents to be processed. 108 109 Raises: 110 RuntimeError: If the Parallel instance is not properly initialized. This 111 generally happens when the method is called outside of a 'with' statement. 112 Exception: If any exceptions are raised within the worker processes. 113 114 Yields: 115 Iterator[hojichar.Document]: An iterator that yields processed Documents. 116 """ 117 if self._pool is None or self._pid_stats is None: 118 raise RuntimeError( 119 "Parallel instance not properly initialized. Use within a 'with' statement." 120 ) 121 try: 122 for doc, pid, stat, err_msg in self._pool.imap_unordered(_worker, docs): 123 self._pid_stats[pid] = stat 124 if err_msg is not None: 125 logger.error(f"Error in worker {pid}: {err_msg}") 126 yield doc 127 except Exception: 128 self.__exit__(None, None, None) 129 raise 130 131 def __exit__(self, exc_type, exc_value, traceback) -> None: # type: ignore 132 if self._pool: 133 self._pool.terminate() 134 self._pool.join() 135 if self._pid_stats: 136 total_stats = functools.reduce( 137 lambda x, y: Statistics.add_list_of_stats(x, y), self._pid_stats.values() 138 ) 139 self.filter._statistics.update(Statistics.get_filter("Total", total_stats)) 140 for stat in total_stats: 141 for filt in self.filter.filters: 142 if stat.name == filt.name: 143 filt._statistics.update(stat) 144 break 145 146 def get_total_statistics(self) -> List[Statistics]: 147 """ 148 Returns a statistics object of the total statistical 149 values processed within the Parallel block. 150 151 Returns: 152 StatsContainer: Statistics object 153 """ 154 if self._pid_stats: 155 total_stats = functools.reduce( 156 lambda x, y: Statistics.add_list_of_stats(x, y), self._pid_stats.values() 157 ) 158 return total_stats 159 else: 160 return [] 161 162 def get_total_statistics_map(self) -> List[dict]: 163 return [stat.to_dict() for stat in self.get_total_statistics()] 164 165 @property 166 def statistics_obj(self) -> inspection.StatsContainer: 167 """ 168 Returns the statistics object of the Parallel instance. 169 This is a StatsContainer object which contains the statistics 170 of the Parallel instance and sub filters. 171 172 Returns: 173 StatsContainer: Statistics object 174 """ 175 return inspection.statistics_obj_adapter(self.get_total_statistics()) # type: ignore
48class Parallel: 49 """ 50 The Parallel class provides a way to apply a hojichar.Compose filter 51 to an iterator of documents in a parallel manner using a specified 52 number of worker processes. This class should be used as a context 53 manager with a 'with' statement. 54 55 Example: 56 57 doc_iter = (hojichar.Document(d) for d in open("my_text.txt")) 58 with Parallel(my_filter, num_jobs=8) as pfilter: 59 for doc in pfilter.imap_apply(doc_iter): 60 pass # Process the filtered document as needed. 61 """ 62 63 def __init__( 64 self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False 65 ): 66 """ 67 Initializes a new instance of the Parallel class. 68 69 Args: 70 filter (hojichar.Compose): A composed filter object that specifies the 71 processing operations to apply to each document in parallel. 72 A copy of the filter is made within a 'with' statement. When the 'with' 73 block terminates,the statistical information obtained through `filter.statistics` 74 or`filter.statistics_obj` is replaced with the total value of the statistical 75 information processed within the 'with' block. 76 77 num_jobs (int | None, optional): The number of worker processes to use. 78 If None, then the number returned by os.cpu_count() is used. Defaults to None. 79 ignore_errors (bool, optional): If set to True, any exceptions thrown during 80 the processing of a document will be caught and logged, but will not 81 stop the processing of further documents. If set to False, the first 82 exception thrown will terminate the entire parallel processing operation. 83 Defaults to False. 84 """ 85 self.filter = filter 86 self.num_jobs = num_jobs 87 self.ignore_errors = ignore_errors 88 89 self._pool: Pool | None = None 90 self._pid_stats: dict[int, List[Statistics]] | None = None 91 92 def __enter__(self) -> Parallel: 93 self._pool = Pool( 94 processes=self.num_jobs, 95 initializer=_init_worker, 96 initargs=(self.filter, self.ignore_errors), 97 ) 98 self._pid_stats = dict() 99 return self 100 101 def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]: 102 """ 103 Takes an iterator of Documents and applies the Compose filter to 104 each Document in a parallel manner. This is a generator method 105 that yields processed Documents. 106 107 Args: 108 docs (Iterator[hojichar.Document]): An iterator of Documents to be processed. 109 110 Raises: 111 RuntimeError: If the Parallel instance is not properly initialized. This 112 generally happens when the method is called outside of a 'with' statement. 113 Exception: If any exceptions are raised within the worker processes. 114 115 Yields: 116 Iterator[hojichar.Document]: An iterator that yields processed Documents. 117 """ 118 if self._pool is None or self._pid_stats is None: 119 raise RuntimeError( 120 "Parallel instance not properly initialized. Use within a 'with' statement." 121 ) 122 try: 123 for doc, pid, stat, err_msg in self._pool.imap_unordered(_worker, docs): 124 self._pid_stats[pid] = stat 125 if err_msg is not None: 126 logger.error(f"Error in worker {pid}: {err_msg}") 127 yield doc 128 except Exception: 129 self.__exit__(None, None, None) 130 raise 131 132 def __exit__(self, exc_type, exc_value, traceback) -> None: # type: ignore 133 if self._pool: 134 self._pool.terminate() 135 self._pool.join() 136 if self._pid_stats: 137 total_stats = functools.reduce( 138 lambda x, y: Statistics.add_list_of_stats(x, y), self._pid_stats.values() 139 ) 140 self.filter._statistics.update(Statistics.get_filter("Total", total_stats)) 141 for stat in total_stats: 142 for filt in self.filter.filters: 143 if stat.name == filt.name: 144 filt._statistics.update(stat) 145 break 146 147 def get_total_statistics(self) -> List[Statistics]: 148 """ 149 Returns a statistics object of the total statistical 150 values processed within the Parallel block. 151 152 Returns: 153 StatsContainer: Statistics object 154 """ 155 if self._pid_stats: 156 total_stats = functools.reduce( 157 lambda x, y: Statistics.add_list_of_stats(x, y), self._pid_stats.values() 158 ) 159 return total_stats 160 else: 161 return [] 162 163 def get_total_statistics_map(self) -> List[dict]: 164 return [stat.to_dict() for stat in self.get_total_statistics()] 165 166 @property 167 def statistics_obj(self) -> inspection.StatsContainer: 168 """ 169 Returns the statistics object of the Parallel instance. 170 This is a StatsContainer object which contains the statistics 171 of the Parallel instance and sub filters. 172 173 Returns: 174 StatsContainer: Statistics object 175 """ 176 return inspection.statistics_obj_adapter(self.get_total_statistics()) # type: ignore
The Parallel class provides a way to apply a hojichar.Compose filter to an iterator of documents in a parallel manner using a specified number of worker processes. This class should be used as a context manager with a 'with' statement.
Example:
doc_iter = (hojichar.Document(d) for d in open("my_text.txt")) with Parallel(my_filter, num_jobs=8) as pfilter: for doc in pfilter.imap_apply(doc_iter): pass # Process the filtered document as needed.
63 def __init__( 64 self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False 65 ): 66 """ 67 Initializes a new instance of the Parallel class. 68 69 Args: 70 filter (hojichar.Compose): A composed filter object that specifies the 71 processing operations to apply to each document in parallel. 72 A copy of the filter is made within a 'with' statement. When the 'with' 73 block terminates,the statistical information obtained through `filter.statistics` 74 or`filter.statistics_obj` is replaced with the total value of the statistical 75 information processed within the 'with' block. 76 77 num_jobs (int | None, optional): The number of worker processes to use. 78 If None, then the number returned by os.cpu_count() is used. Defaults to None. 79 ignore_errors (bool, optional): If set to True, any exceptions thrown during 80 the processing of a document will be caught and logged, but will not 81 stop the processing of further documents. If set to False, the first 82 exception thrown will terminate the entire parallel processing operation. 83 Defaults to False. 84 """ 85 self.filter = filter 86 self.num_jobs = num_jobs 87 self.ignore_errors = ignore_errors 88 89 self._pool: Pool | None = None 90 self._pid_stats: dict[int, List[Statistics]] | None = None
Initializes a new instance of the Parallel class.
Args:
filter (hojichar.Compose): A composed filter object that specifies the
processing operations to apply to each document in parallel.
A copy of the filter is made within a 'with' statement. When the 'with'
block terminates,the statistical information obtained through filter.statistics
orfilter.statistics_obj
is replaced with the total value of the statistical
information processed within the 'with' block.
num_jobs (int | None, optional): The number of worker processes to use.
If None, then the number returned by os.cpu_count() is used. Defaults to None.
ignore_errors (bool, optional): If set to True, any exceptions thrown during
the processing of a document will be caught and logged, but will not
stop the processing of further documents. If set to False, the first
exception thrown will terminate the entire parallel processing operation.
Defaults to False.
101 def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]: 102 """ 103 Takes an iterator of Documents and applies the Compose filter to 104 each Document in a parallel manner. This is a generator method 105 that yields processed Documents. 106 107 Args: 108 docs (Iterator[hojichar.Document]): An iterator of Documents to be processed. 109 110 Raises: 111 RuntimeError: If the Parallel instance is not properly initialized. This 112 generally happens when the method is called outside of a 'with' statement. 113 Exception: If any exceptions are raised within the worker processes. 114 115 Yields: 116 Iterator[hojichar.Document]: An iterator that yields processed Documents. 117 """ 118 if self._pool is None or self._pid_stats is None: 119 raise RuntimeError( 120 "Parallel instance not properly initialized. Use within a 'with' statement." 121 ) 122 try: 123 for doc, pid, stat, err_msg in self._pool.imap_unordered(_worker, docs): 124 self._pid_stats[pid] = stat 125 if err_msg is not None: 126 logger.error(f"Error in worker {pid}: {err_msg}") 127 yield doc 128 except Exception: 129 self.__exit__(None, None, None) 130 raise
Takes an iterator of Documents and applies the Compose filter to each Document in a parallel manner. This is a generator method that yields processed Documents.
Args: docs (Iterator[hojichar.Document]): An iterator of Documents to be processed.
Raises: RuntimeError: If the Parallel instance is not properly initialized. This generally happens when the method is called outside of a 'with' statement. Exception: If any exceptions are raised within the worker processes.
Yields: Iterator[hojichar.Document]: An iterator that yields processed Documents.
147 def get_total_statistics(self) -> List[Statistics]: 148 """ 149 Returns a statistics object of the total statistical 150 values processed within the Parallel block. 151 152 Returns: 153 StatsContainer: Statistics object 154 """ 155 if self._pid_stats: 156 total_stats = functools.reduce( 157 lambda x, y: Statistics.add_list_of_stats(x, y), self._pid_stats.values() 158 ) 159 return total_stats 160 else: 161 return []
Returns a statistics object of the total statistical values processed within the Parallel block.
Returns: StatsContainer: Statistics object
Returns the statistics object of the Parallel instance. This is a StatsContainer object which contains the statistics of the Parallel instance and sub filters.
Returns: StatsContainer: Statistics object