Source code for resilient_exporters.utils
import os
import shutil
import json
import requests
import shelve
import time
import queue
import random
import string
from typing import Optional, Text
[docs]def generate_rand_name() -> str:
"""Generate a random name of the form "export_XXXXXX" where XXXXXX are
6 random characters.
Returns:
str: the generated name
"""
suf = "".join(random.choices(string.ascii_uppercase + string.digits, k=6))
return f"exporters_{suf}"
[docs]def is_able_to_connect(url: Optional[Text] = None) -> bool:
"""Runs a HTTP GET request to the url.
Args:
url (str): an URL (with its schema)
Returns:
bool: if a ConnectionError or Timeout are raised, returns `False`.
`True`, otherwise.
"""
if url is None:
url = "https://www.google.com"
try:
request = requests.get(url, timeout=.5)
return True
except (requests.ConnectionError, requests.Timeout):
return False
class _DataStore:
__instantiated = 0
__used_filenames = []
def __new__(cls, *args, **kwargs):
if "shelf_filename" in kwargs.keys():
if kwargs["shelf_filename"] in cls.__used_filenames:
raise ValueError(
f"File {kwargs['db_filename']} is already being used.")
return super(_DataStore, cls).__new__(cls)
def __init__(self,
use_memory: bool = True,
shelf_filename: Optional[Text] = None,
max_size: int = 100 * 100 * 100,
*args, **kwargs):
super().__init__(*args, **kwargs)
self.__use_memory = use_memory
self.__filename = generate_rand_name() if shelf_filename is None \
else shelf_filename
self.__used_filenames.append(self.__filename)
self.__size = 0
self.max_size = max_size
self.__queue = queue.Queue() if use_memory else None
self.__shelf = None if use_memory else shelve.open(self.__filename)
@property
def size(self):
return self.__size
@property
def use_memory(self) -> bool:
return self.__use_memory
@use_memory.setter
def use_memory(self, new_val: bool) -> bool:
if self.__use_memory and new_val == False:
self.export_queue_to_file()
self.__use_memory = new_val
self.__queue = None
elif self.__use_memory == False and new_val:
self.__queue = self.import_queue_from_file()
self.__use_memory = new_val
self.__shelf.close()
self.__shelf = None
else:
print(f"WARNING - use_memory is already set to {new_val}")
return new_val
def put(self, data):
if self.size >= self.max_size:
print("Cannot save more data.")
return None
if self.use_memory:
self.__put_in_memory(data)
else:
self.__put_in_shelf(data)
self.__size += 1
def __put_in_memory(self, data):
self.__queue.put(data)
def __put_in_shelf(self, data):
self.__shelf[str(time.time())] = data
def get(self):
if self.size <= 0:
raise Exception("No saved data left.")
if self.use_memory:
data = self.__get_from_memory()
else:
data = self.__get_from_shelf()
self.__size -=1
return data
def __get_from_memory(self):
return self.__queue.get()
def __get_from_shelf(self):
generator = iter(self.__shelf.keys())
try:
key = next(generator)
except StopIteration:
return None
else:
res = self.__shelf[key].copy()
del self.__shelf[key]
return res
def export_queue_to_shelf(self):
self.__shelf = shelve.open(self.__filename, "n")
while not self.__queue.empty():
data = self.__queue.get()
self.__shelf[str(time.time())] = data
def import_queue_from_file(self):
q = queue.Queue()
generator = iter(self.__shelf.keys())
for key in generator:
q.put(file[key])
return q
def __len__(self):
return self.size
def __iter__(self):
return self
def __next__(self):
if self.size:
return self.get()
raise StopIteration
def __del__(self):
if self.__shelf:
self.__shelf.close()