"Fossies" - the Fresh Open Source Software Archive

Member "monasca-api-4.0.0/monasca_api/api/core/log/log_publisher.py" (13 May 2020, 6995 Bytes) of package /linux/misc/openstack/monasca-api-4.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "log_publisher.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 3.1.0_vs_4.0.0.

    1 # Copyright 2015 kornicameister@gmail.com
    2 # Copyright 2016-2017 FUJITSU LIMITED
    3 #
    4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 # not use this file except in compliance with the License. You may obtain
    6 # a copy of the License at
    7 #
    8 #      http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 # Unless required by applicable law or agreed to in writing, software
   11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 # License for the specific language governing permissions and limitations
   14 # under the License.
   15 
   16 import time
   17 
   18 import falcon
   19 from monasca_api.common.rest import utils as rest_utils
   20 from monasca_common.kafka import client_factory
   21 from oslo_log import log
   22 from oslo_utils import encodeutils
   23 
   24 from monasca_api.api.core.log import model
   25 from monasca_api import conf
   26 
   27 LOG = log.getLogger(__name__)
   28 CONF = conf.CONF
   29 
   30 _RETRY_AFTER = 60
   31 _TIMESTAMP_KEY_SIZE = len(
   32     bytearray(str(int(time.time() * 1000)).encode('utf-8')))
   33 _TRUNCATED_PROPERTY_SIZE = len(
   34     bytearray('"truncated": true'.encode('utf-8')))
   35 _KAFKA_META_DATA_SIZE = 32
   36 _TRUNCATION_SAFE_OFFSET = 1
   37 
   38 
   39 class InvalidMessageException(Exception):
   40     pass
   41 
   42 
   43 class LogPublisher(object):
   44     """Publishes log data to Kafka
   45 
   46     LogPublisher is able to send single message to multiple configured topic.
   47     It uses following configuration written in conf file ::
   48 
   49         [log_publisher]
   50         topics = 'logs'
   51         kafka_url = 'localhost:8900'
   52 
   53     Note:
   54         Uses :py:class:`monasca_common.kafka.producer.KafkaProducer`
   55         to ship logs to kafka. For more details
   56         see `monasca_common`_ github repository.
   57 
   58     .. _monasca_common: https://github.com/openstack/monasca-common
   59 
   60     """
   61 
   62     def __init__(self):
   63         self._topics = CONF.kafka.logs_topics
   64         self.max_message_size = CONF.log_publisher.max_message_size
   65 
   66         self._kafka_publisher = client_factory.get_kafka_producer(
   67             CONF.kafka.uri, CONF.kafka.legacy_kafka_client_enabled)
   68 
   69         LOG.info('Initializing LogPublisher <%s>', self)
   70 
   71     def send_message(self, messages):
   72         """Sends message to each configured topic.
   73 
   74         Note:
   75             Falsy messages (i.e. empty) are not shipped to kafka
   76 
   77         See also
   78             * :py:class:`monasca_log_api.common.model.Envelope`
   79             * :py:meth:`._is_message_valid`
   80 
   81         :param dict|list messages: instance (or instances) of log envelope
   82         """
   83 
   84         if not messages:
   85             return
   86         if not isinstance(messages, list):
   87             messages = [messages]
   88 
   89         num_of_msgs = len(messages)
   90 
   91         LOG.debug('About to publish %d messages to %s topics',
   92                   num_of_msgs, self._topics)
   93 
   94         try:
   95             send_messages = []
   96 
   97             for message in messages:
   98                 msg = self._transform_message(message)
   99                 send_messages.append(msg)
  100                 self._publish(send_messages)
  101 
  102         except Exception as ex:
  103             LOG.exception('Failure in publishing messages to kafka')
  104             raise ex
  105 
  106     def _transform_message(self, message):
  107         """Transforms message into JSON.
  108 
  109         Method executes transformation operation for
  110         single element. Operation is set of following
  111         operations:
  112 
  113         * checking if message is valid
  114             (:py:func:`.LogPublisher._is_message_valid`)
  115         * truncating message if necessary
  116             (:py:func:`.LogPublisher._truncate`)
  117 
  118         :param model.Envelope message: instance of message
  119         :return: serialized message
  120         :rtype: str
  121         """
  122         if not self._is_message_valid(message):
  123             raise InvalidMessageException()
  124         truncated = self._truncate(message)
  125         return encodeutils.safe_encode(truncated, incoming='utf-8')
  126 
  127     def _truncate(self, envelope):
  128         """Truncates the message if needed.
  129 
  130         Each message send to kafka is verified.
  131         Method checks if message serialized to json
  132         exceeds maximum allowed size that can be posted to kafka
  133         queue. If so, method truncates message property of the log
  134         by difference between message and allowed size.
  135 
  136         :param Envelope envelope: original envelope
  137         :return: serialized message
  138         :rtype: str
  139         """
  140 
  141         msg_str = model.serialize_envelope(envelope)
  142         envelope_size = ((len(bytearray(msg_str, 'utf-8', 'replace')) +
  143                           _TIMESTAMP_KEY_SIZE +
  144                           _KAFKA_META_DATA_SIZE)
  145                          if msg_str is not None else -1)
  146 
  147         diff_size = ((envelope_size - self.max_message_size) +
  148                      _TRUNCATION_SAFE_OFFSET)
  149 
  150         if diff_size > 1:
  151             truncated_by = diff_size + _TRUNCATED_PROPERTY_SIZE
  152 
  153             LOG.warning(('Detected message that exceeds %d bytes,'
  154                          'message will be truncated by %d bytes'),
  155                         self.max_message_size,
  156                         truncated_by)
  157 
  158             log_msg = envelope['log']['message']
  159             truncated_log_msg = log_msg[:-truncated_by]
  160 
  161             envelope['log']['truncated'] = True
  162             envelope['log']['message'] = truncated_log_msg
  163 
  164             msg_str = rest_utils.as_json(envelope)
  165 
  166         return msg_str
  167 
  168     def _publish(self, messages):
  169         """Publishes messages to kafka.
  170 
  171         :param list messages: list of messages
  172         """
  173         num_of_msg = len(messages)
  174 
  175         LOG.debug('Publishing %d messages', num_of_msg)
  176 
  177         try:
  178             for topic in self._topics:
  179                 self._kafka_publisher.publish(
  180                     topic,
  181                     messages
  182                 )
  183                 LOG.debug('Sent %d messages to topic %s', num_of_msg, topic)
  184         except Exception as ex:
  185             raise falcon.HTTPServiceUnavailable('Service unavailable',
  186                                                 str(ex), 60)
  187 
  188     @staticmethod
  189     def _is_message_valid(message):
  190         """Validates message before sending.
  191 
  192         Methods checks if message is :py:class:`model.Envelope`.
  193         By being instance of this class it is ensured that all required
  194         keys are found and they will have their values.
  195 
  196         """
  197         return message and isinstance(message, model.Envelope)
  198 
  199     def _after_publish(self, send_count, to_send_count):
  200         """Executed after publishing to sent metrics.
  201 
  202         :param int send_count: how many messages have been sent
  203         :param int to_send_count: how many messages should be sent
  204 
  205         """
  206 
  207         failed_to_send = to_send_count - send_count
  208 
  209         if failed_to_send == 0:
  210             LOG.debug('Successfully published all [%d] messages',
  211                       send_count)
  212         else:
  213             error_str = ('Failed to send all messages, %d '
  214                          'messages out of %d have not been published')
  215             LOG.error(error_str, failed_to_send, to_send_count)