hojichar.core.parallel

  1from __future__ import annotations
  2
  3import functools
  4import logging
  5import multiprocessing
  6import os
  7import signal
  8from copy import copy
  9from typing import Iterator
 10
 11import hojichar
 12from hojichar.core.inspection import StatsContainer
 13
 14logger = logging.getLogger(__name__)
 15
 16
 17PARALLEL_BASE_FILTER: hojichar.Compose
 18WORKER_PARAM_IGNORE_ERRORS: bool
 19
 20
 21def _init_worker(filter: hojichar.Compose, ignore_errors: bool) -> None:
 22    signal.signal(signal.SIGINT, signal.SIG_IGN)
 23    global PARALLEL_BASE_FILTER, WORKER_PARAM_IGNORE_ERRORS
 24    PARALLEL_BASE_FILTER = hojichar.Compose(copy(filter.filters))  # TODO random state treatment
 25    WORKER_PARAM_IGNORE_ERRORS = ignore_errors
 26
 27
 28def _worker(
 29    doc: hojichar.Document,
 30) -> tuple[hojichar.Document, int, StatsContainer, str | None]:
 31    global PARALLEL_BASE_FILTER, WORKER_PARAM_IGNORE_ERRORS
 32    ignore_errors = WORKER_PARAM_IGNORE_ERRORS
 33    error_message = None
 34    try:
 35        result = PARALLEL_BASE_FILTER.apply(doc)
 36    except Exception as e:
 37        if ignore_errors:
 38            logger.error(e)
 39            error_message = str(e)
 40            result = hojichar.Document("", is_rejected=True)
 41        else:
 42            raise e  # If we're not ignoring errors, let this one propagate
 43    return result, os.getpid(), PARALLEL_BASE_FILTER.statistics_obj, error_message
 44
 45
 46class Parallel:
 47    """
 48    The Parallel class provides a way to apply a hojichar.Compose filter
 49    to an iterator of documents in a parallel manner using a specified
 50    number of worker processes. This class should be used as a context
 51    manager with a 'with' statement.
 52
 53    Example:
 54
 55    doc_iter = (hojichar.Document(d) for d in open("my_text.txt"))
 56    with Parallel(my_filter, num_jobs=8) as pfilter:
 57        for doc in pfilter.imap_apply(doc_iter):
 58            pass  # Process the filtered document as needed.
 59    """
 60
 61    def __init__(
 62        self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False
 63    ):
 64        """
 65        Initializes a new instance of the Parallel class.
 66
 67        Args:
 68            filter (hojichar.Compose): A composed filter object that specifies the
 69                processing operations to apply to each document in parallel.
 70                A copy of the filter is made within a 'with' statement. When the 'with'
 71                block terminates,the statistical information obtained through `filter.statistics`
 72                or`filter.statistics_obj` is replaced with the total value of the statistical
 73                information processed within the 'with' block.
 74
 75            num_jobs (int | None, optional): The number of worker processes to use.
 76                If None, then the number returned by os.cpu_count() is used. Defaults to None.
 77            ignore_errors (bool, optional): If set to True, any exceptions thrown during
 78                the processing of a document will be caught and logged, but will not
 79                stop the processing of further documents. If set to False, the first
 80                exception thrown will terminate the entire parallel processing operation.
 81                Defaults to False.
 82        """
 83        self.filter = filter
 84        self.num_jobs = num_jobs
 85        self.ignore_errors = ignore_errors
 86
 87        self._pool: multiprocessing.pool.Pool | None = None
 88        self._pid_stats: dict[int, StatsContainer] | None = None
 89
 90    def __enter__(self) -> Parallel:
 91        self._pool = multiprocessing.Pool(
 92            processes=self.num_jobs,
 93            initializer=_init_worker,
 94            initargs=(self.filter, self.ignore_errors),
 95        )
 96        self._pid_stats = dict()
 97        return self
 98
 99    def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]:
100        """
101        Takes an iterator of Documents and applies the Compose filter to
102        each Document in a parallel manner. This is a generator method
103        that yields processed Documents.
104
105        Args:
106            docs (Iterator[hojichar.Document]): An iterator of Documents to be processed.
107
108        Raises:
109            RuntimeError: If the Parallel instance is not properly initialized. This
110                generally happens when the method is called outside of a 'with' statement.
111            Exception: If any exceptions are raised within the worker processes.
112
113        Yields:
114            Iterator[hojichar.Document]: An iterator that yields processed Documents.
115        """
116        if self._pool is None or self._pid_stats is None:
117            raise RuntimeError(
118                "Parallel instance not properly initialized. Use within a 'with' statement."
119            )
120        try:
121            for doc, pid, stats_obj, err_msg in self._pool.imap_unordered(_worker, docs):
122                self._pid_stats[pid] = stats_obj
123                if err_msg is not None:
124                    logger.error(f"Error in worker {pid}: {err_msg}")
125                yield doc
126        except Exception:
127            self.__exit__(None, None, None)
128            raise
129
130    def __exit__(self, exc_type, exc_value, traceback) -> None:  # type: ignore
131        if self._pool:
132            self._pool.terminate()
133            self._pool.join()
134        if self._pid_stats:
135            self.filter._statistics.stats = self.filter._statistics.stats + functools.reduce(
136                lambda x, y: x + y, self._pid_stats.values()
137            )
138
139    @property
140    def statistics_obj(self) -> StatsContainer:
141        """
142        Returns a statistics object of the total statistical
143        values processed within the Parallel block.
144
145        Returns:
146            StatsContainer: Statistics object
147        """
148        if self._pid_stats:
149            stats: StatsContainer = functools.reduce(lambda x, y: x + y, self._pid_stats.values())
150        else:
151            stats = copy(self.filter.statistics_obj).reset()
152        return stats
153
154    @property
155    def statistics(self) -> dict:
156        """
157        Returns a statistics dict which friendly with human of the total statistical
158        values processed within the Parallel block.
159
160        Returns:
161            dict: Human readable statistics values
162        """
163        return self.statistics_obj.get_human_readable_values()
class Parallel:
 47class Parallel:
 48    """
 49    The Parallel class provides a way to apply a hojichar.Compose filter
 50    to an iterator of documents in a parallel manner using a specified
 51    number of worker processes. This class should be used as a context
 52    manager with a 'with' statement.
 53
 54    Example:
 55
 56    doc_iter = (hojichar.Document(d) for d in open("my_text.txt"))
 57    with Parallel(my_filter, num_jobs=8) as pfilter:
 58        for doc in pfilter.imap_apply(doc_iter):
 59            pass  # Process the filtered document as needed.
 60    """
 61
 62    def __init__(
 63        self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False
 64    ):
 65        """
 66        Initializes a new instance of the Parallel class.
 67
 68        Args:
 69            filter (hojichar.Compose): A composed filter object that specifies the
 70                processing operations to apply to each document in parallel.
 71                A copy of the filter is made within a 'with' statement. When the 'with'
 72                block terminates,the statistical information obtained through `filter.statistics`
 73                or`filter.statistics_obj` is replaced with the total value of the statistical
 74                information processed within the 'with' block.
 75
 76            num_jobs (int | None, optional): The number of worker processes to use.
 77                If None, then the number returned by os.cpu_count() is used. Defaults to None.
 78            ignore_errors (bool, optional): If set to True, any exceptions thrown during
 79                the processing of a document will be caught and logged, but will not
 80                stop the processing of further documents. If set to False, the first
 81                exception thrown will terminate the entire parallel processing operation.
 82                Defaults to False.
 83        """
 84        self.filter = filter
 85        self.num_jobs = num_jobs
 86        self.ignore_errors = ignore_errors
 87
 88        self._pool: multiprocessing.pool.Pool | None = None
 89        self._pid_stats: dict[int, StatsContainer] | None = None
 90
 91    def __enter__(self) -> Parallel:
 92        self._pool = multiprocessing.Pool(
 93            processes=self.num_jobs,
 94            initializer=_init_worker,
 95            initargs=(self.filter, self.ignore_errors),
 96        )
 97        self._pid_stats = dict()
 98        return self
 99
100    def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]:
101        """
102        Takes an iterator of Documents and applies the Compose filter to
103        each Document in a parallel manner. This is a generator method
104        that yields processed Documents.
105
106        Args:
107            docs (Iterator[hojichar.Document]): An iterator of Documents to be processed.
108
109        Raises:
110            RuntimeError: If the Parallel instance is not properly initialized. This
111                generally happens when the method is called outside of a 'with' statement.
112            Exception: If any exceptions are raised within the worker processes.
113
114        Yields:
115            Iterator[hojichar.Document]: An iterator that yields processed Documents.
116        """
117        if self._pool is None or self._pid_stats is None:
118            raise RuntimeError(
119                "Parallel instance not properly initialized. Use within a 'with' statement."
120            )
121        try:
122            for doc, pid, stats_obj, err_msg in self._pool.imap_unordered(_worker, docs):
123                self._pid_stats[pid] = stats_obj
124                if err_msg is not None:
125                    logger.error(f"Error in worker {pid}: {err_msg}")
126                yield doc
127        except Exception:
128            self.__exit__(None, None, None)
129            raise
130
131    def __exit__(self, exc_type, exc_value, traceback) -> None:  # type: ignore
132        if self._pool:
133            self._pool.terminate()
134            self._pool.join()
135        if self._pid_stats:
136            self.filter._statistics.stats = self.filter._statistics.stats + functools.reduce(
137                lambda x, y: x + y, self._pid_stats.values()
138            )
139
140    @property
141    def statistics_obj(self) -> StatsContainer:
142        """
143        Returns a statistics object of the total statistical
144        values processed within the Parallel block.
145
146        Returns:
147            StatsContainer: Statistics object
148        """
149        if self._pid_stats:
150            stats: StatsContainer = functools.reduce(lambda x, y: x + y, self._pid_stats.values())
151        else:
152            stats = copy(self.filter.statistics_obj).reset()
153        return stats
154
155    @property
156    def statistics(self) -> dict:
157        """
158        Returns a statistics dict which friendly with human of the total statistical
159        values processed within the Parallel block.
160
161        Returns:
162            dict: Human readable statistics values
163        """
164        return self.statistics_obj.get_human_readable_values()

The Parallel class provides a way to apply a hojichar.Compose filter to an iterator of documents in a parallel manner using a specified number of worker processes. This class should be used as a context manager with a 'with' statement.

Example:

doc_iter = (hojichar.Document(d) for d in open("my_text.txt")) with Parallel(my_filter, num_jobs=8) as pfilter: for doc in pfilter.imap_apply(doc_iter): pass # Process the filtered document as needed.

Parallel( filter: hojichar.core.composition.Compose, num_jobs: 'int | None' = None, ignore_errors: bool = False)
62    def __init__(
63        self, filter: hojichar.Compose, num_jobs: int | None = None, ignore_errors: bool = False
64    ):
65        """
66        Initializes a new instance of the Parallel class.
67
68        Args:
69            filter (hojichar.Compose): A composed filter object that specifies the
70                processing operations to apply to each document in parallel.
71                A copy of the filter is made within a 'with' statement. When the 'with'
72                block terminates,the statistical information obtained through `filter.statistics`
73                or`filter.statistics_obj` is replaced with the total value of the statistical
74                information processed within the 'with' block.
75
76            num_jobs (int | None, optional): The number of worker processes to use.
77                If None, then the number returned by os.cpu_count() is used. Defaults to None.
78            ignore_errors (bool, optional): If set to True, any exceptions thrown during
79                the processing of a document will be caught and logged, but will not
80                stop the processing of further documents. If set to False, the first
81                exception thrown will terminate the entire parallel processing operation.
82                Defaults to False.
83        """
84        self.filter = filter
85        self.num_jobs = num_jobs
86        self.ignore_errors = ignore_errors
87
88        self._pool: multiprocessing.pool.Pool | None = None
89        self._pid_stats: dict[int, StatsContainer] | None = None

Initializes a new instance of the Parallel class.

Args: filter (hojichar.Compose): A composed filter object that specifies the processing operations to apply to each document in parallel. A copy of the filter is made within a 'with' statement. When the 'with' block terminates,the statistical information obtained through filter.statistics orfilter.statistics_obj is replaced with the total value of the statistical information processed within the 'with' block.

num_jobs (int | None, optional): The number of worker processes to use.
    If None, then the number returned by os.cpu_count() is used. Defaults to None.
ignore_errors (bool, optional): If set to True, any exceptions thrown during
    the processing of a document will be caught and logged, but will not
    stop the processing of further documents. If set to False, the first
    exception thrown will terminate the entire parallel processing operation.
    Defaults to False.
def imap_apply( self, docs: Iterator[hojichar.core.models.Document]) -> Iterator[hojichar.core.models.Document]:
100    def imap_apply(self, docs: Iterator[hojichar.Document]) -> Iterator[hojichar.Document]:
101        """
102        Takes an iterator of Documents and applies the Compose filter to
103        each Document in a parallel manner. This is a generator method
104        that yields processed Documents.
105
106        Args:
107            docs (Iterator[hojichar.Document]): An iterator of Documents to be processed.
108
109        Raises:
110            RuntimeError: If the Parallel instance is not properly initialized. This
111                generally happens when the method is called outside of a 'with' statement.
112            Exception: If any exceptions are raised within the worker processes.
113
114        Yields:
115            Iterator[hojichar.Document]: An iterator that yields processed Documents.
116        """
117        if self._pool is None or self._pid_stats is None:
118            raise RuntimeError(
119                "Parallel instance not properly initialized. Use within a 'with' statement."
120            )
121        try:
122            for doc, pid, stats_obj, err_msg in self._pool.imap_unordered(_worker, docs):
123                self._pid_stats[pid] = stats_obj
124                if err_msg is not None:
125                    logger.error(f"Error in worker {pid}: {err_msg}")
126                yield doc
127        except Exception:
128            self.__exit__(None, None, None)
129            raise

Takes an iterator of Documents and applies the Compose filter to each Document in a parallel manner. This is a generator method that yields processed Documents.

Args: docs (Iterator[hojichar.Document]): An iterator of Documents to be processed.

Raises: RuntimeError: If the Parallel instance is not properly initialized. This generally happens when the method is called outside of a 'with' statement. Exception: If any exceptions are raised within the worker processes.

Yields: Iterator[hojichar.Document]: An iterator that yields processed Documents.

Returns a statistics object of the total statistical values processed within the Parallel block.

Returns: StatsContainer: Statistics object

statistics: dict

Returns a statistics dict which friendly with human of the total statistical values processed within the Parallel block.

Returns: dict: Human readable statistics values