Source code for resilient_exporters.exporters.exporter_elasticsearch
import os
import logging
from typing import Optional, Text, List, Iterable
from resilient_exporters.exporters import Exporter, ExportResult
from resilient_exporters.exceptions import MissingModuleError, \
MissingConfigError, \
InvalidConfigError
logger = logging.getLogger(__name__)
try:
import elasticsearch
except ModuleNotFoundError:
logger.error("""Elasticsearch not available. Install using:
pip install resilient-transmitter[elastic]""")
raise MissingModuleError
[docs]class ElasticSearchExporter(Exporter):
"""Exporter for ElasticSearch.
Args:
target_ip (str): an IP address of a ElasticSearch server.
target_port (int): the port to connect to. Default to 9300.
username (str): the username for authentication.
password (str): the password as plain text for authentication.
Use an environement variable for security.
cluster_hosts (Iterable[Text]): cluster of hosts, passed to ES's client
application.
cloud_id (str): cloud id used to connect to a Elastic Cloud server.
A username and password is most likely required to be
able to connect.
api_key (str): a base64 encoded token to authenticate to a ElasticSearch
server.
sniff_on_start (bool): see Elasticsearch documentation.
default_index (str): a default index to use when ``send`` is called. If
None, an index will have to be provided as an argument
when calling ``send``.
**kwargs : the keyword arguments to pass down to parent's class Exporter
.. admonition:: Warning
If ``target_ip`` is provided, it will supercede ``cluster_hosts``.
"""
def __init__(self,
target_ip: Text = None,
target_port: int = 9300,
username: Text = None,
password: Text = None,
cluster_hosts: Iterable[Text] = None,
cloud_id: Text = None,
api_key: Text = None, #base 64 encoded token
sniff_on_start: bool = True,
default_index: Optional[Text] = None,
use_ssl: bool = False,
ssl_certfile: Text = None,
ssl_ca_certs: Text = None,
**kwargs):
super(ElasticSearchExporter, self).__init__(**kwargs)
self.target_ip = target_ip
self.target_port = target_port
self.cluster_hosts = cluster_hosts
self.cloud_id = cloud_id
self.api_key = api_key
self.sniff_on_start = sniff_on_start
self.default_index = default_index
self.use_ssl = use_ssl
self.ssl_certfile = ssl_certfile
self.ssl_ca_certs = ssl_ca_certs
# Need to provide an address
if self.target_ip is None \
and self.cluster_hosts is None \
and self.cloud_id is None:
logger.error("No target address provided.")
raise ValueError
kwargs = {}
if username and password:
kwargs["http_auth"] = (username, password)
#kwargs["scheme"] = "https"
if self.cloud_id:
kwargs["cloud_id"] = self.cloud_id
if self.api_key:
kwargs["api_key"] = self.api_key
if self.sniff_on_start:
kwargs["sniff_on_start"] = self.sniff_on_start
if self.use_ssl:
kwargs["use_ssl"] = self.use_ssl
if self.ssl_certfile:
kwargs["client_cert"] = self.ssl_certfile
if self.ssl_ca_certs:
kwargs["ca_certs"] = self.ssl_ca_certs
hosts = None
if self.target_ip:
hosts = {"host": self.target_ip}
if self.target_port:
hosts["port"] = self.target_port
elif self.cluster_hosts:
hosts = self.cluster_hosts
if hosts:
self.__client = elasticsearch.Elasticsearch(hosts, **kwargs)
else:
self.__client = elasticsearch.Elasticsearch(**kwargs)
@property
def client(self) -> elasticsearch.Elasticsearch:
"""The Elasticsearch client. It cannot be replaced."""
return self.__client
[docs] def send(self, data: dict, index: Optional[Text] = None) -> ExportResult:
"""Indexes the data into an ElasicSearch index.
Args:
data (dict): the data, as a dict, to index.
index (str): the index name. If `None`, it uses the default value
provided at initialisation.
Returns:
ExportResult: (Object, True) if successful, (None, False) otherwise.
Raises:
MissingConfigError: if no index is found.
InvalidConfigError: if cannot send data because of a configuration
issue (authentication or other type of issues).
"""
if index is None:
index = self.default_index
if index is None:
raise MissingConfigError(self, "No index found.")
try:
res = self.__client.index(index=index, body=data)
return ExportResult(res, True)
except elasticsearch.exceptions.ConnectionError:
logger.warning("elasticsearch.exceptions.ConnectionError")
except elasticsearch.exceptions.RequestError:
logger.error("elasticsearch.exceptions.RequestError")
raise InvalidConfigError(self, "elasticsearch.exceptions.RequestError")
return ExportResult(None, False)