"Fossies" - the Fresh Open Source Software Archive

Member "monasca-api-4.0.0/monasca_api/common/messaging/kafka_publisher.py" (13 May 2020, 2483 Bytes) of package /linux/misc/openstack/monasca-api-4.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "kafka_publisher.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 3.1.0_vs_4.0.0.

    1 # Copyright 2014,2017 Hewlett-Packard
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 
   15 from monasca_common.kafka import client_factory
   16 import monasca_common.kafka_lib.common as kafka_common
   17 from oslo_config import cfg
   18 from oslo_log import log
   19 
   20 from monasca_api.common.messaging import exceptions
   21 from monasca_api.common.messaging import publisher
   22 
   23 LOG = log.getLogger(__name__)
   24 
   25 
   26 class KafkaPublisher(publisher.Publisher):
   27     def __init__(self, topic):
   28         if not cfg.CONF.kafka.uri:
   29             raise Exception('Kafka is not configured correctly! '
   30                             'Use configuration file to specify Kafka '
   31                             'uri, for example: '
   32                             'uri=192.168.1.191:9092')
   33 
   34         self.uri = cfg.CONF.kafka.uri
   35         self.topic = topic
   36         self.group = cfg.CONF.kafka.group
   37         self.wait_time = cfg.CONF.kafka.wait_time
   38         self.is_async = cfg.CONF.kafka.is_async
   39         self.ack_time = cfg.CONF.kafka.ack_time
   40         self.max_retry = cfg.CONF.kafka.max_retry
   41         self.auto_commit = cfg.CONF.kafka.auto_commit
   42         self.compact = cfg.CONF.kafka.compact
   43         self.partitions = cfg.CONF.kafka.partitions
   44         self.drop_data = cfg.CONF.kafka.drop_data
   45 
   46         config = {'queue.buffering.max.messages':
   47                   cfg.CONF.kafka.queue_buffering_max_messages}
   48         self._producer = client_factory.get_kafka_producer(
   49             self.uri, cfg.CONF.kafka.legacy_kafka_client_enabled, **config)
   50 
   51     def close(self):
   52         pass
   53 
   54     def send_message(self, message):
   55         try:
   56             self._producer.publish(self.topic, message)
   57 
   58         except (kafka_common.KafkaUnavailableError,
   59                 kafka_common.LeaderNotAvailableError):
   60             LOG.exception('Error occurred while posting data to Kafka.')
   61             raise exceptions.MessageQueueException()
   62         except Exception:
   63             LOG.exception('Unknown error.')
   64             raise exceptions.MessageQueueException()