"Fossies" - the Fresh Open Source Software Archive 
Member "monasca-log-api-2.9.0/monasca_log_api/healthcheck/kafka_check.py" (1 Apr 2019, 3135 Bytes) of package /linux/misc/openstack/monasca-log-api-2.9.0.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "kafka_check.py" see the
Fossies "Dox" file reference documentation and the latest
Fossies "Diffs" side-by-side code changes report:
2.7.0_vs_2.9.0.
1 # Copyright 2015-2017 FUJITSU LIMITED
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14
15 import collections
16
17 from monasca_common.kafka_lib import client
18 from oslo_log import log
19 from six import PY3
20
21 from monasca_log_api import conf
22
23 LOG = log.getLogger(__name__)
24 CONF = conf.CONF
25
26
27 CheckResult = collections.namedtuple('CheckResult', ['healthy', 'message'])
28 """Result from the healthcheck, contains healthy(boolean) and message"""
29
30
31 # TODO(feature) monasca-common candidate
32 class KafkaHealthCheck(object):
33 """Evaluates kafka health
34
35 Healthcheck verifies if:
36
37 * kafka server is up and running
38 * there is a configured topic in kafka
39
40 If following conditions are met healthcheck returns healthy status.
41 Otherwise unhealthy status is returned with explanation.
42
43 Example of middleware configuration:
44
45 .. code-block:: ini
46
47 [kafka_healthcheck]
48 kafka_url = localhost:8900
49 kafka_topics = log
50
51 Note:
52 It is possible to specify multiple topics if necessary.
53 Just separate them with ,
54
55 """
56
57 def healthcheck(self):
58 url = CONF.kafka_healthcheck.kafka_url
59
60 try:
61 kafka_client = client.KafkaClient(hosts=url)
62 except client.KafkaUnavailableError as ex:
63 LOG.error(repr(ex))
64 error_str = 'Could not connect to kafka at %s' % url
65 return CheckResult(healthy=False, message=error_str)
66
67 result = self._verify_topics(kafka_client)
68 self._disconnect_gracefully(kafka_client)
69
70 return result
71
72 # noinspection PyMethodMayBeStatic
73 def _verify_topics(self, kafka_client):
74 topics = CONF.kafka_healthcheck.kafka_topics
75
76 if PY3:
77 topics = tuple(topic.encode('utf-8') for topic in topics)
78
79 for t in topics:
80 # kafka client loads metadata for topics as fast
81 # as possible (happens in __init__), therefore this
82 # topic_partitions is sure to be filled
83 for_topic = t in kafka_client.topic_partitions
84 if not for_topic:
85 error_str = 'Kafka: Topic %s not found' % t
86 LOG.error(error_str)
87 return CheckResult(healthy=False, message=error_str)
88
89 return CheckResult(healthy=True, message='OK')
90
91 # noinspection PyMethodMayBeStatic
92 def _disconnect_gracefully(self, kafka_client):
93 # at this point, client is connected so it must be closed
94 # regardless of topic existence
95 try:
96 kafka_client.close()
97 except Exception as ex:
98 # log that something went wrong and move on
99 LOG.error(repr(ex))