Source code for resilient_exporters.exporters.exporter_file

import os
import json
import logging
from typing import Optional, Text, Any, Union, Iterable
from resilient_exporters.exporters import Exporter, ExportResult
from resilient_exporters.exceptions import ExportError

logger = logging.getLogger(__name__)

[docs]class FileExporter(Exporter): """Exporter for a text file. Args: target_file (str): the path of the file. The user must have write rights on the file. max_lines (int): the maximum number of lines in the file, incl. the lines already present. If `None`, there's no limit. append (bool): if `True`, will append the data to the file. Otherwise, it will overwrite the file. **kwargs : the keyword arguments to pass down to parent's class Exporter .. admonition:: Example .. code-block:: python import os from resilient_exporters.exporters import FileExporter exporter = FileExporter("local_file.txt", max_lines=1000, append=False, use_memory=False, # see exporters.Exporter save_unsent_data=True) data = {"name": "Richard Feynman", "age": 69} exporter.send(data) """ def __init__(self, target_file: Text, max_lines: Optional[int] = None, append: bool = True, **kwargs): super(FileExporter, self).__init__(**kwargs) #Create file if it doesn't exist if not os.path.isfile(target_file): open(target_file, "x").close() self.__filename = target_file self.__max_lines = max_lines #max lines in file self.__remaining_lines = -1 self.start() # Some logic in case there's a max lines if self.__max_lines is not None: # Count number of lines in file tmp_count = len(self.target_file.readlines()) # Alert if the maximum is already reached if tmp_count >= self.__max_lines: logger.warning(f"""there's already {tmp_count} lines in file, you asked for a maximum of {self.__max_lines}. Nothing will be added to the file.""") # Set remaining lines self.__remaining_lines = self.__max_lines - tmp_count @property def remaining_lines(self) -> int: """It is the amount of lines the file can still contain before reaching the limit passed at initialisation (cf `max_lines` keyword argument at init). Returns 0 if there's no limit or the limit has been reached.""" return max(self.__remaining_lines, 0) def write_lines(self, data: Union[Text, dict]): if not isinstance(data, list): data = [data] for piece in data: if self.__remaining_lines != 0: line = piece if not isinstance(piece, dict) else json.dumps(piece) print(line, file=self.target_file) self.__remaining_lines -= 1 else: logger.warning(f"Can't write more data in file {self.__filename}") """remove_lines: Removing by copying the file (except lines of given indices) into a new file, then replacing the old file with the new file. By default, removes the first line. """ def remove_lines(self, indices: Iterable[int] = [0]) -> bool: new_filename = self.__filename + ".new" failed = False try: new_file = open(new_filename, "w") old_file = self.target_file for count, line in enumerate(self.target_file): if count not in indices: new_file.write(line) except: failed = True finally: self.target_file.close() self.target_file = new_file if not failed: shutil.copyfile(new_filename, self.__filename) os.remove(new_filename) return not True return failed
[docs] def send(self, data: Union[Text, dict]) -> ExportResult: """Writes the data into the file. Each call adds a new line in the file. Args: data (Union[Text, dict]): a string or `dict` with the data to write into the file. If a `dict`, it will be converted into a json document. Returns: ExportResult: if the operation was successful, returns (None, True), otherwise (None, False) Raises: ExportError: if the IO stream is closed. """ if self.target_file.closed: logger.error(f"IO stream closed for FileExporter {self.name}") raise ExportError(self, "Stream closed. \ Has the FileExporter been stopped?") if self.__max_lines is None: self.write_lines(data) return ExportResult(None, True) elif self.__remaining_lines > 0: self.write_lines(data) return ExportResult(None, True) # @TODO a should new data replace old data if the file is full? #elif self.keep_new_data: # self.remove_lines() # self.write_lines(data) else: logger.warning(f"File {self.__filename} is full.") return ExportResult(None, False) return ExportResult(None, True)
[docs] def start(self, append: bool = True): """Restarts the exporter by reopening an IO stream to the file. If there were no stream yet, it will create one. Args: append (bool): if True, opens the file in 'append' mode, else in "write" mode. Default is True. """ try: if self.target_file.closed: logger.debug(f"Restarting the FileExporter {self.name}") self._create_stream_for_file(append) except AttributeError: # There's then no self.target_file and we should create it self._create_stream_for_file(append)
[docs] def stop(self): """Stops the exporter, by closing the IO stream. The exporter must be stopped for another process to be able to read the file.""" logger.debug(f"Stopping the FileExporter {self.name}") self.target_file.close()
def _create_stream_for_file(self, append: bool = True): self.target_file = open(self.__filename, "a" if append else "w") def __del__(self): self.stop()