Source code for resilient_exporters.exporters.exporter_mongodb

import os
import urllib
import logging
from typing import Union, Iterable, Optional, Text
from resilient_exporters.exporters import Exporter, ExportResult
from resilient_exporters.exceptions import MissingConfigError, \
                                           InvalidConfigError, \
                                           MissingModuleError

logger = logging.getLogger(__name__)

try:
    import pymongo
except ModuleNotFoundError:
    logger.error("""PyMongo not available. Install using:
                    pip install resilient-transmitter[mongo]""")
    raise MissingModuleError(self)

[docs]class MongoDBExporter(Exporter): """Exporter for MongoDB. Args: target_ip (str): target_port (int): username (str): password (str): default_db (str): default_collection (str): **kwargs : the keyword arguments to pass down to parent's class Exporter Raises: InvalidConfigError: if it cannot retrieve the server information, which is likely due an invalid configuration of the target. .. admonition:: Example .. code-block:: python import os from resilient_exporters.exporters import MongoDBExporter exporter = MongoDBExporter(target_ip="127.0.0.1", username=os.environ["MONGO_USERNAME"], password=os.environ["MONGO_PASSWORD"], default_db="profiles", default_db="scientists") data = {"name": "Richard Feynman", "age": 69} exporter.send(data) """ def __init__(self, target_ip: Text, target_port: int = 27017, username: Text = None, password: Text = None, default_db: Text = None, default_collection: Text = None, **kwargs): super(MongoDBExporter, self).__init__(**kwargs) self.target_ip = target_ip self.target_port = target_port self.username = username self.default_db = default_db self.default_collection = default_collection if self.target_ip is None: logger.error("No IP address provided.") raise ValueError("No IP address provided") # Parse username and password if username: username = urllib.parse.quote_plus(self.username) if password: password = urllib.parse.quote_plus(password) # Create kwargs for MongoClient if target_port: kwargs["port"] = self.target_port if username: kwargs["username"] = username if password: kwargs["password"] = password # Create MongoClient self.__client = pymongo.MongoClient(self.target_ip, **kwargs) # Test connection try: self.__client.server_info() except pymongo.errors.ServerSelectionTimeoutError as e: logging.debug(e) raise InvalidConfigError(self, "Cannot retrieve server info. Is \ the target valid?") # Keeping default database/collection if provided self.db = None if self.default_db: self.db = self.__client[self.default_db] self.collection = None if self.default_collection: self.collection = self.db[self.default_collection] @property def client(self) -> pymongo.MongoClient: """The MongoDB client. It cannot be replaced.""" return self.__client
[docs] def send(self, data: dict, db: Text = None, collection: Text = None) -> ExportResult: """Inserts data into a collection. Reuses default database and collection names, if provided at initialisation. Args: data (dict): a dict representing the document to insert into the collection. db (str): name of the target database. If `None`, will use the default value. Default is `None`. collection (str): name of the target colleciton. If `None`, will use the default value. Default is `None`. Returns: ExportResult: the result in the form (ObjectId, True) if successful, (None, False) otherwise. Raises: MissingConfigError: if it cannot find a database and/or collection in the arguments and default values. """ data = data.copy() if db is None and self.db is not None: if collection is None and self.collection is not None: try: self.collection.insert_one(data) except Exception as e: logger.error(e) return ExportResult(False, None) return ExportResult(data["_id"], True) elif collection is not None: try: self.db[collection].insert_one(data) except Exception as e: logger.error(e) return ExportResult(False, None) return ExportResult(data["_id"], True) else: raise MissingConfigError(self, "No reference to a database \ and collection found.") elif db is not None and collection is not None: try: self.__client[db][collection].insert_one(data) except Exception as e: logger.error(e) return ExportResult(None, False) return ExportResult(data["_id"], True) return ExportResult(None, False)