"Fossies" - the Fresh Open Source Software Archive

Member "monasca-log-api-2.9.0/monasca_log_api/healthcheck/kafka_check.py" (1 Apr 2019, 3135 Bytes) of package /linux/misc/openstack/monasca-log-api-2.9.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "kafka_check.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 2.7.0_vs_2.9.0.

    1 # Copyright 2015-2017 FUJITSU LIMITED
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 
   15 import collections
   16 
   17 from monasca_common.kafka_lib import client
   18 from oslo_log import log
   19 from six import PY3
   20 
   21 from monasca_log_api import conf
   22 
   23 LOG = log.getLogger(__name__)
   24 CONF = conf.CONF
   25 
   26 
   27 CheckResult = collections.namedtuple('CheckResult', ['healthy', 'message'])
   28 """Result from the healthcheck, contains healthy(boolean) and message"""
   29 
   30 
   31 # TODO(feature) monasca-common candidate
   32 class KafkaHealthCheck(object):
   33     """Evaluates kafka health
   34 
   35     Healthcheck verifies if:
   36 
   37     * kafka server is up and running
   38     * there is a configured topic in kafka
   39 
   40     If following conditions are met healthcheck returns healthy status.
   41     Otherwise unhealthy status is returned with explanation.
   42 
   43      Example of middleware configuration:
   44 
   45     .. code-block:: ini
   46 
   47       [kafka_healthcheck]
   48       kafka_url = localhost:8900
   49       kafka_topics = log
   50 
   51     Note:
   52         It is possible to specify multiple topics if necessary.
   53         Just separate them with ,
   54 
   55     """
   56 
   57     def healthcheck(self):
   58         url = CONF.kafka_healthcheck.kafka_url
   59 
   60         try:
   61             kafka_client = client.KafkaClient(hosts=url)
   62         except client.KafkaUnavailableError as ex:
   63             LOG.error(repr(ex))
   64             error_str = 'Could not connect to kafka at %s' % url
   65             return CheckResult(healthy=False, message=error_str)
   66 
   67         result = self._verify_topics(kafka_client)
   68         self._disconnect_gracefully(kafka_client)
   69 
   70         return result
   71 
   72     # noinspection PyMethodMayBeStatic
   73     def _verify_topics(self, kafka_client):
   74         topics = CONF.kafka_healthcheck.kafka_topics
   75 
   76         if PY3:
   77             topics = tuple(topic.encode('utf-8') for topic in topics)
   78 
   79         for t in topics:
   80             # kafka client loads metadata for topics as fast
   81             # as possible (happens in __init__), therefore this
   82             # topic_partitions is sure to be filled
   83             for_topic = t in kafka_client.topic_partitions
   84             if not for_topic:
   85                 error_str = 'Kafka: Topic %s not found' % t
   86                 LOG.error(error_str)
   87                 return CheckResult(healthy=False, message=error_str)
   88 
   89         return CheckResult(healthy=True, message='OK')
   90 
   91     # noinspection PyMethodMayBeStatic
   92     def _disconnect_gracefully(self, kafka_client):
   93         # at this point, client is connected so it must be closed
   94         # regardless of topic existence
   95         try:
   96             kafka_client.close()
   97         except Exception as ex:
   98             # log that something went wrong and move on
   99             LOG.error(repr(ex))