hojichar.core.parallel

  1from __future__ import annotations
  2
  3import functools
  4import logging
  5import os
  6import signal
  7from copy import copy
  8from multiprocessing.pool import Pool
  9from typing import Iterator, List
 10
 11import hojichar
 12from hojichar.core import inspection
 13from hojichar.core.models import Statistics
 14
 15logger = logging.getLogger(__name__)
 16
 17
 18PARALLEL_BASE_FILTER: hojichar.Compose
 19WORKER_PARAM_IGNORE_ERRORS: bool
 20
 21
 22def _init_worker(filter: hojichar.Compose, ignore_errors: bool) -> None:
 23    signal.signal(signal.SIGINT, signal.SIG_IGN)
 24    global PARALLEL_BASE_FILTER, WORKER_PARAM_IGNORE_ERRORS
 25    PARALLEL_BASE_FILTER = hojichar.Compose(copy(filter.filters))  # TODO random state treatment
 26    WORKER_PARAM_IGNORE_ERRORS = ignore_errors
 27
 28
 29def _worker(
 30    doc: hojichar.Document,
 31) -> tuple[hojichar.Document, int, List[Statistics], str | None]:
 32    global PARALLEL_BASE_FILTER, WORKER_PARAM_IGNORE_ERRORS
 33    ignore_errors = WORKER_PARAM_IGNORE_ERRORS
 34    error_message = None
 35    try:
 36        result = PARALLEL_BASE_FILTER.apply(doc)
 37    except Exception as e:
 38        if ignore_errors:
 39            logger.error(e)
 40            error_message = str(e)
 41            result = hojichar.Document("", is_rejected=True)
 42        else:
 43            raise e  # If we're not ignoring errors, let this one propagate
 44    return result, os.getpid(), PARALLEL_BASE_FILTER.get_total_statistics(), error_message
 45
 46
 47class Parallel:
 48    """
 49    The Parallel class provides a way to apply a hojichar.Compose filter
 50    to an iterator of documents in a parallel manner using a specified
 51    number of worker processes. This class should be used as a context
 52    manager with a 'with' statement.
 53
 54    Example:
 55
 56    doc_iter = (hojichar.Document(d) for d in open("my_text.txt"))
 57    with Parallel(my_filter, num_jobs=8) as pfilter:
 58        for doc in pfilter.imap_apply(doc_iter):
 59            pass  # Process the filtered document as needed.
 60    """
 61
 62    def __init__(
 63        self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False
 64    ):
 65        """
 66        Initializes a new instance of the Parallel class.
 67
 68        Args:
 69            filter (hojichar.Compose): A composed filter object that specifies the
 70                processing operations to apply to each document in parallel.
 71                A copy of the filter is made within a 'with' statement. When the 'with'
 72                block terminates,the statistical information obtained through `filter.statistics`
 73                or`filter.statistics_obj` is replaced with the total value of the statistical
 74                information processed within the 'with' block.
 75
 76            num_jobs (int | None, optional): The number of worker processes to use.
 77                If None, then the number returned by os.cpu_count() is used. Defaults to None.
 78            ignore_errors (bool, optional): If set to True, any exceptions thrown during
 79                the processing of a document will be caught and logged, but will not
 80                stop the processing of further documents. If set to False, the first
 81                exception thrown will terminate the entire parallel processing operation.
 82                Defaults to False.
 83        """
 84        self.filter = filter
 85        self.num_jobs = num_jobs
 86        self.ignore_errors = ignore_errors
 87
 88        self._pool: Pool | None = None
 89        self._pid_stats: dict[int, List[Statistics]] | None = None
 90
 91    def __enter__(self) -> Parallel:
 92        self._pool = Pool(
 93            processes=self.num_jobs,
 94            initializer=_init_worker,
 95            initargs=(self.filter, self.ignore_errors),
 96        )
 97        self._pid_stats = dict()
 98        return self
 99
100    def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]:
101        """
102        Takes an iterator of Documents and applies the Compose filter to
103        each Document in a parallel manner. This is a generator method
104        that yields processed Documents.
105
106        Args:
107            docs (Iterator[hojichar.Document]): An iterator of Documents to be processed.
108
109        Raises:
110            RuntimeError: If the Parallel instance is not properly initialized. This
111                generally happens when the method is called outside of a 'with' statement.
112            Exception: If any exceptions are raised within the worker processes.
113
114        Yields:
115            Iterator[hojichar.Document]: An iterator that yields processed Documents.
116        """
117        if self._pool is None or self._pid_stats is None:
118            raise RuntimeError(
119                "Parallel instance not properly initialized. Use within a 'with' statement."
120            )
121        try:
122            for doc, pid, stat, err_msg in self._pool.imap_unordered(_worker, docs):
123                self._pid_stats[pid] = stat
124                if err_msg is not None:
125                    logger.error(f"Error in worker {pid}: {err_msg}")
126                yield doc
127        except Exception:
128            self.__exit__(None, None, None)
129            raise
130
131    def __exit__(self, exc_type, exc_value, traceback) -> None:  # type: ignore
132        if self._pool:
133            self._pool.terminate()
134            self._pool.join()
135        if self._pid_stats:
136            total_stats = functools.reduce(
137                lambda x, y: Statistics.add_list_of_stats(x, y), self._pid_stats.values()
138            )
139            self.filter._statistics.update(Statistics.get_filter("Total", total_stats))
140            for stat in total_stats:
141                for filt in self.filter.filters:
142                    if stat.name == filt.name:
143                        filt._statistics.update(stat)
144                        break
145
146    def get_total_statistics(self) -> List[Statistics]:
147        """
148        Returns a statistics object of the total statistical
149        values processed within the Parallel block.
150
151        Returns:
152            StatsContainer: Statistics object
153        """
154        if self._pid_stats:
155            total_stats = functools.reduce(
156                lambda x, y: Statistics.add_list_of_stats(x, y), self._pid_stats.values()
157            )
158            return total_stats
159        else:
160            return []
161
162    def get_total_statistics_map(self) -> List[dict]:
163        return [stat.to_dict() for stat in self.get_total_statistics()]
164
165    @property
166    def statistics_obj(self) -> inspection.StatsContainer:
167        """
168        Returns the statistics object of the Parallel instance.
169        This is a StatsContainer object which contains the statistics
170        of the Parallel instance and sub filters.
171
172        Returns:
173            StatsContainer: Statistics object
174        """
175        return inspection.statistics_obj_adapter(self.get_total_statistics())  # type: ignore
class Parallel:
 48class Parallel:
 49    """
 50    The Parallel class provides a way to apply a hojichar.Compose filter
 51    to an iterator of documents in a parallel manner using a specified
 52    number of worker processes. This class should be used as a context
 53    manager with a 'with' statement.
 54
 55    Example:
 56
 57    doc_iter = (hojichar.Document(d) for d in open("my_text.txt"))
 58    with Parallel(my_filter, num_jobs=8) as pfilter:
 59        for doc in pfilter.imap_apply(doc_iter):
 60            pass  # Process the filtered document as needed.
 61    """
 62
 63    def __init__(
 64        self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False
 65    ):
 66        """
 67        Initializes a new instance of the Parallel class.
 68
 69        Args:
 70            filter (hojichar.Compose): A composed filter object that specifies the
 71                processing operations to apply to each document in parallel.
 72                A copy of the filter is made within a 'with' statement. When the 'with'
 73                block terminates,the statistical information obtained through `filter.statistics`
 74                or`filter.statistics_obj` is replaced with the total value of the statistical
 75                information processed within the 'with' block.
 76
 77            num_jobs (int | None, optional): The number of worker processes to use.
 78                If None, then the number returned by os.cpu_count() is used. Defaults to None.
 79            ignore_errors (bool, optional): If set to True, any exceptions thrown during
 80                the processing of a document will be caught and logged, but will not
 81                stop the processing of further documents. If set to False, the first
 82                exception thrown will terminate the entire parallel processing operation.
 83                Defaults to False.
 84        """
 85        self.filter = filter
 86        self.num_jobs = num_jobs
 87        self.ignore_errors = ignore_errors
 88
 89        self._pool: Pool | None = None
 90        self._pid_stats: dict[int, List[Statistics]] | None = None
 91
 92    def __enter__(self) -> Parallel:
 93        self._pool = Pool(
 94            processes=self.num_jobs,
 95            initializer=_init_worker,
 96            initargs=(self.filter, self.ignore_errors),
 97        )
 98        self._pid_stats = dict()
 99        return self
100
101    def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]:
102        """
103        Takes an iterator of Documents and applies the Compose filter to
104        each Document in a parallel manner. This is a generator method
105        that yields processed Documents.
106
107        Args:
108            docs (Iterator[hojichar.Document]): An iterator of Documents to be processed.
109
110        Raises:
111            RuntimeError: If the Parallel instance is not properly initialized. This
112                generally happens when the method is called outside of a 'with' statement.
113            Exception: If any exceptions are raised within the worker processes.
114
115        Yields:
116            Iterator[hojichar.Document]: An iterator that yields processed Documents.
117        """
118        if self._pool is None or self._pid_stats is None:
119            raise RuntimeError(
120                "Parallel instance not properly initialized. Use within a 'with' statement."
121            )
122        try:
123            for doc, pid, stat, err_msg in self._pool.imap_unordered(_worker, docs):
124                self._pid_stats[pid] = stat
125                if err_msg is not None:
126                    logger.error(f"Error in worker {pid}: {err_msg}")
127                yield doc
128        except Exception:
129            self.__exit__(None, None, None)
130            raise
131
132    def __exit__(self, exc_type, exc_value, traceback) -> None:  # type: ignore
133        if self._pool:
134            self._pool.terminate()
135            self._pool.join()
136        if self._pid_stats:
137            total_stats = functools.reduce(
138                lambda x, y: Statistics.add_list_of_stats(x, y), self._pid_stats.values()
139            )
140            self.filter._statistics.update(Statistics.get_filter("Total", total_stats))
141            for stat in total_stats:
142                for filt in self.filter.filters:
143                    if stat.name == filt.name:
144                        filt._statistics.update(stat)
145                        break
146
147    def get_total_statistics(self) -> List[Statistics]:
148        """
149        Returns a statistics object of the total statistical
150        values processed within the Parallel block.
151
152        Returns:
153            StatsContainer: Statistics object
154        """
155        if self._pid_stats:
156            total_stats = functools.reduce(
157                lambda x, y: Statistics.add_list_of_stats(x, y), self._pid_stats.values()
158            )
159            return total_stats
160        else:
161            return []
162
163    def get_total_statistics_map(self) -> List[dict]:
164        return [stat.to_dict() for stat in self.get_total_statistics()]
165
166    @property
167    def statistics_obj(self) -> inspection.StatsContainer:
168        """
169        Returns the statistics object of the Parallel instance.
170        This is a StatsContainer object which contains the statistics
171        of the Parallel instance and sub filters.
172
173        Returns:
174            StatsContainer: Statistics object
175        """
176        return inspection.statistics_obj_adapter(self.get_total_statistics())  # type: ignore

The Parallel class provides a way to apply a hojichar.Compose filter to an iterator of documents in a parallel manner using a specified number of worker processes. This class should be used as a context manager with a 'with' statement.

Example:

doc_iter = (hojichar.Document(d) for d in open("my_text.txt")) with Parallel(my_filter, num_jobs=8) as pfilter: for doc in pfilter.imap_apply(doc_iter): pass # Process the filtered document as needed.

Parallel( filter: hojichar.core.composition.Compose, num_jobs: int | None = None, ignore_errors: bool = False)
63    def __init__(
64        self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False
65    ):
66        """
67        Initializes a new instance of the Parallel class.
68
69        Args:
70            filter (hojichar.Compose): A composed filter object that specifies the
71                processing operations to apply to each document in parallel.
72                A copy of the filter is made within a 'with' statement. When the 'with'
73                block terminates,the statistical information obtained through `filter.statistics`
74                or`filter.statistics_obj` is replaced with the total value of the statistical
75                information processed within the 'with' block.
76
77            num_jobs (int | None, optional): The number of worker processes to use.
78                If None, then the number returned by os.cpu_count() is used. Defaults to None.
79            ignore_errors (bool, optional): If set to True, any exceptions thrown during
80                the processing of a document will be caught and logged, but will not
81                stop the processing of further documents. If set to False, the first
82                exception thrown will terminate the entire parallel processing operation.
83                Defaults to False.
84        """
85        self.filter = filter
86        self.num_jobs = num_jobs
87        self.ignore_errors = ignore_errors
88
89        self._pool: Pool | None = None
90        self._pid_stats: dict[int, List[Statistics]] | None = None

Initializes a new instance of the Parallel class.

Args: filter (hojichar.Compose): A composed filter object that specifies the processing operations to apply to each document in parallel. A copy of the filter is made within a 'with' statement. When the 'with' block terminates,the statistical information obtained through filter.statistics orfilter.statistics_obj is replaced with the total value of the statistical information processed within the 'with' block.

num_jobs (int | None, optional): The number of worker processes to use.
    If None, then the number returned by os.cpu_count() is used. Defaults to None.
ignore_errors (bool, optional): If set to True, any exceptions thrown during
    the processing of a document will be caught and logged, but will not
    stop the processing of further documents. If set to False, the first
    exception thrown will terminate the entire parallel processing operation.
    Defaults to False.
def imap_apply( self, docs: Iterator[hojichar.core.models.Document]) -> Iterator[hojichar.core.models.Document]:
101    def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]:
102        """
103        Takes an iterator of Documents and applies the Compose filter to
104        each Document in a parallel manner. This is a generator method
105        that yields processed Documents.
106
107        Args:
108            docs (Iterator[hojichar.Document]): An iterator of Documents to be processed.
109
110        Raises:
111            RuntimeError: If the Parallel instance is not properly initialized. This
112                generally happens when the method is called outside of a 'with' statement.
113            Exception: If any exceptions are raised within the worker processes.
114
115        Yields:
116            Iterator[hojichar.Document]: An iterator that yields processed Documents.
117        """
118        if self._pool is None or self._pid_stats is None:
119            raise RuntimeError(
120                "Parallel instance not properly initialized. Use within a 'with' statement."
121            )
122        try:
123            for doc, pid, stat, err_msg in self._pool.imap_unordered(_worker, docs):
124                self._pid_stats[pid] = stat
125                if err_msg is not None:
126                    logger.error(f"Error in worker {pid}: {err_msg}")
127                yield doc
128        except Exception:
129            self.__exit__(None, None, None)
130            raise

Takes an iterator of Documents and applies the Compose filter to each Document in a parallel manner. This is a generator method that yields processed Documents.

Args: docs (Iterator[hojichar.Document]): An iterator of Documents to be processed.

Raises: RuntimeError: If the Parallel instance is not properly initialized. This generally happens when the method is called outside of a 'with' statement. Exception: If any exceptions are raised within the worker processes.

Yields: Iterator[hojichar.Document]: An iterator that yields processed Documents.

def get_total_statistics(self) -> List[hojichar.core.models.Statistics]:
147    def get_total_statistics(self) -> List[Statistics]:
148        """
149        Returns a statistics object of the total statistical
150        values processed within the Parallel block.
151
152        Returns:
153            StatsContainer: Statistics object
154        """
155        if self._pid_stats:
156            total_stats = functools.reduce(
157                lambda x, y: Statistics.add_list_of_stats(x, y), self._pid_stats.values()
158            )
159            return total_stats
160        else:
161            return []

Returns a statistics object of the total statistical values processed within the Parallel block.

Returns: StatsContainer: Statistics object

def get_total_statistics_map(self) -> List[dict]:
163    def get_total_statistics_map(self) -> List[dict]:
164        return [stat.to_dict() for stat in self.get_total_statistics()]

Returns the statistics object of the Parallel instance. This is a StatsContainer object which contains the statistics of the Parallel instance and sub filters.

Returns: StatsContainer: Statistics object