kafka_check.py (monasca-log-api-2.7.0) | : | kafka_check.py (monasca-log-api-2.9.0) | ||
---|---|---|---|---|
skipping to change at line 19 | skipping to change at line 19 | |||
# Unless required by applicable law or agreed to in writing, software | # Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |||
# License for the specific language governing permissions and limitations | # License for the specific language governing permissions and limitations | |||
# under the License. | # under the License. | |||
import collections | import collections | |||
from monasca_common.kafka_lib import client | from monasca_common.kafka_lib import client | |||
from oslo_log import log | from oslo_log import log | |||
from six import PY3 | ||||
from monasca_log_api import conf | from monasca_log_api import conf | |||
LOG = log.getLogger(__name__) | LOG = log.getLogger(__name__) | |||
CONF = conf.CONF | CONF = conf.CONF | |||
CheckResult = collections.namedtuple('CheckResult', ['healthy', 'message']) | CheckResult = collections.namedtuple('CheckResult', ['healthy', 'message']) | |||
"""Result from the healthcheck, contains healthy(boolean) and message""" | """Result from the healthcheck, contains healthy(boolean) and message""" | |||
# TODO(feature) monasca-common candidate | # TODO(feature) monasca-common candidate | |||
skipping to change at line 73 | skipping to change at line 74 | |||
result = self._verify_topics(kafka_client) | result = self._verify_topics(kafka_client) | |||
self._disconnect_gracefully(kafka_client) | self._disconnect_gracefully(kafka_client) | |||
return result | return result | |||
# noinspection PyMethodMayBeStatic | # noinspection PyMethodMayBeStatic | |||
def _verify_topics(self, kafka_client): | def _verify_topics(self, kafka_client): | |||
topics = CONF.kafka_healthcheck.kafka_topics | topics = CONF.kafka_healthcheck.kafka_topics | |||
if PY3: | ||||
topics = tuple(topic.encode('utf-8') for topic in topics) | ||||
for t in topics: | for t in topics: | |||
# kafka client loads metadata for topics as fast | # kafka client loads metadata for topics as fast | |||
# as possible (happens in __init__), therefore this | # as possible (happens in __init__), therefore this | |||
# topic_partitions is sure to be filled | # topic_partitions is sure to be filled | |||
for_topic = t in kafka_client.topic_partitions | for_topic = t in kafka_client.topic_partitions | |||
if not for_topic: | if not for_topic: | |||
error_str = 'Kafka: Topic %s not found' % t | error_str = 'Kafka: Topic %s not found' % t | |||
LOG.error(error_str) | LOG.error(error_str) | |||
return CheckResult(healthy=False, message=error_str) | return CheckResult(healthy=False, message=error_str) | |||
End of changes. 2 change blocks. | ||||
0 lines changed or deleted | 4 lines changed or added |