Source code for resilient_exporters.exporters.exporter_pool

import queue
import pathlib
import logging
from concurrent.futures import ThreadPoolExecutor
from sys import getsizeof
from typing import Callable, Union, Optional, List, Iterable, Text, BinaryIO, Any
from resilient_exporters.exporters import Exporter, ExportResult
from resilient_exporters.exceptions import InvalidConfigError
from resilient_exporters.utils import generate_rand_name, is_able_to_connect

logger = logging.getLogger(__name__)

[docs]class ExporterPool(Exporter): """Enables pooling of exporters for improved efficiency and performance. All the exporters will be managed by the pool, including the saving of unsent data, with only one ``send`` call to be used. It also offers a multithreading option to run the exporters' ``send`` functions in parallel, to speed up the execution of ``ExporterPool.send``. Args: exporters (Iterable[Exporter]): a list of ``resilient_exporters.exporters.Exporter``. transform (Callable): a function to be invoked at each ``send`` call on the passed data. It must return data or exceptions will be raised. num_threads (int): the number of threads to use for ``send`` calls. Must be greater than 1. If 1, multithreading is disabled. Default to 1. wait_for_result (bool): value to decide if the instance has to wait for the results of ``send`` calls or not when multithreading is enabled. manual_reexport (bool): if True, the user is responsible to call the function ``send_unsent_data`` when appropriate. If False, the instance will manage that automatically by assessing the necessity to call the function at each ``send`` call; the criterias are 1) there's unsent data, 2) there's an Internet connection. Default to False. .. admonition:: Note If using parallelism, the default behaviour is to wait for the results of all calls. One can disable this behaviour with `wait_for_result` set to ``False``, and ``ExporterPool.send`` will be then non-blocking, but will return ``None``. Raises: InvalidConfigError: if ``num_threads`` is < 1. .. admonition:: Example .. code-block:: python import resilient_exporters as rex exporter1 = rex.exporters.FileExporter("local_file.txt") exporter2 = rex.exporters.FileExporter("/path/to/network/file.txt") pool = ExporterPool([exporter1, exporter2], num_threads=2) line = "A string to be written in a file" pool.send(line) Attributes: num_threads (int): number of threads used by the instance. wait_for_result (bool): value to decide if the instance has to wait for the results of ``send`` calls or not when multithreading is enabled. """ __futures = [] __instantiated = 0 def __init__(self, exporters: Optional[Iterable[Exporter]], transform: Optional[Callable] = None, num_threads: int = 1, wait_for_result: bool = True, use_memory: bool = True, manual_reexport: bool = False, *, tmp_file: Union[Text, pathlib.Path, BinaryIO, None] = None, save_unsent_data: bool = True, name: Optional[Text] = None): super(ExporterPool, self).__init__(transform=transform, use_memory=use_memory, tmp_file=tmp_file, manual_reexport=manual_reexport, save_unsent_data=save_unsent_data) if num_threads < 1: raise InvalidConfigError(self, 'num_threads must be >= 1;' 'use num_threads=1 to disable multithreading.') self.num_threads = num_threads self.wait_for_result = wait_for_result self.__exporters = {} if exporters is not None: for exporter in exporters: exporter._replace_datastore(self._datastore) if self._run_transform: exporter._run_transform = False exporter._save_unsent_data = not self._save_unsent_data self.__exporters[exporter.name] = exporter self.__instantiated += 1 self.name = f"exporterpool_{ExporterPool.__instantiated}" @property def exporters(self) -> dict: """A dictionary of the contained exporters.""" return self.__exporters
[docs] def add_exporter(self, exporter: Exporter) -> None: """Adds an exporter to the pool. Use this function to add an exporter after the pool has been initialised. It removes the responsability to run the `transform` method from the exporter, and to save unsent data. Args: exporter (Exporter): an exporter. """ exporter.use_memory = self.use_memory if self._run_transform: exporter._run_transform = False exporter._save_unsent_data = not self.save_unsent_data self.__exporters.append(exporter)
[docs] def send(self, data: Any, **kwargs) -> List[ExportResult]: """Runs the `send` method of all its exporters. If the pool's `num_threads` attribute is > 1, it will execute all the calls in separate threads. .. admonition:: Note The key arguments passed at the call of the method will be passed down to all the exporters. Make sure they all have different keywords. Args: data (Any): the data to export. **kwargs (Any): the keyword arguments to pass down to the exporters' `send` methods. Returns: List[ExportResult]: a list of the exporters' results. """ results = [] if self.num_threads <= 1: for exporter in self: results.append(exporter.send(data, **kwargs)) else: # use multithreading futures = [] with ThreadPoolExecutor(max_workers=self.num_threads) as executor: for exporter in self: future = executor.submit(exporter.send, data, **kwargs) futures.append(future) def save_if_failed(future): if future.result() == False: self.save_unsent_data(data, kwargs, exporter.name) future.add_done_callback(save_if_failed) if self.wait_for_result: results += [f.result() for f in futures] else: self.__futures += futures return results
def _process_result(self, results: Iterable[ExportResult], data: Any, kwargs: dict) -> Union[bool, List[bool]]: summed_res = sum([r.successful for r in results]) if summed_res == len(results): # all expeditions have been successful if not self.manual_reexport \ and is_able_to_connect(self.TEST_URL) \ and self.has_unsent_data(): logger.info("Attempt to send previously unsent data.") return result + self.send_unsent_data() elif summed_res == 0 and self._save_unsent_data: # all have failed self.save_unsent_data(data, kwargs, self.name) else: #mixed results for exporter, res in zip(self, results): if not res.successful and self._save_unsent_data: self.save_unsent_data(data, kwargs, exporter.name) return results
[docs] def send_unsent_data(self) -> List[ExportResult]: """Tries to send the previously saved, unsent data. Returns: List[ExportResult]: list of the results of the export jobs. """ self.__is_sending_unsent_data = True results = [self.send(d["data"], d["exporter"], **d["kwargs"]) \ for d in self._datastore] self.__is_sending_unsent_data = False return results
def __len__(self) -> int: return len(self.__exporters) def __iter__(self): self.__iterator_count = 0 self.__exporters_as_list = list(self.exporters.values()) return self def __next__(self) -> Optional[Exporter]: if self.__iterator_count < len(self): res = self.__exporters_as_list[self.__iterator_count] self.__iterator_count += 1 return res del self.__exporters_as_list raise StopIteration def __del__(self): if not self.wait_for_result: res = [f.result() for f in self.__futures] del res del self.__futures