"Fossies" - the Fresh Open Source Software Archive

Member "monasca-log-api-2.9.0/monasca_log_api/app/base/log_publisher.py" (1 Apr 2019, 8414 Bytes) of package /linux/misc/openstack/monasca-log-api-2.9.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "log_publisher.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 2.7.0_vs_2.9.0.

    1 # Copyright 2015 kornicameister@gmail.com
    2 # Copyright 2016-2017 FUJITSU LIMITED
    3 #
    4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 # not use this file except in compliance with the License. You may obtain
    6 # a copy of the License at
    7 #
    8 #      http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 # Unless required by applicable law or agreed to in writing, software
   11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 # License for the specific language governing permissions and limitations
   14 # under the License.
   15 
   16 import time
   17 
   18 import falcon
   19 from monasca_common.kafka import producer
   20 from monasca_common.rest import utils as rest_utils
   21 from oslo_log import log
   22 from oslo_utils import encodeutils
   23 
   24 from monasca_log_api.app.base import model
   25 from monasca_log_api import conf
   26 from monasca_log_api.monitoring import client
   27 from monasca_log_api.monitoring import metrics
   28 
   29 LOG = log.getLogger(__name__)
   30 CONF = conf.CONF
   31 
   32 _RETRY_AFTER = 60
   33 _TIMESTAMP_KEY_SIZE = len(
   34     bytearray(str(int(time.time() * 1000)).encode('utf-8')))
   35 _TRUNCATED_PROPERTY_SIZE = len(
   36     bytearray('"truncated": true'.encode('utf-8')))
   37 _KAFKA_META_DATA_SIZE = 32
   38 _TRUNCATION_SAFE_OFFSET = 1
   39 
   40 
   41 class InvalidMessageException(Exception):
   42     pass
   43 
   44 
   45 class LogPublisher(object):
   46     """Publishes log data to Kafka
   47 
   48     LogPublisher is able to send single message to multiple configured topic.
   49     It uses following configuration written in conf file ::
   50 
   51         [log_publisher]
   52         topics = 'logs'
   53         kafka_url = 'localhost:8900'
   54 
   55     Note:
   56         Uses :py:class:`monasca_common.kafka.producer.KafkaProducer`
   57         to ship logs to kafka. For more details
   58         see `monasca_common`_ github repository.
   59 
   60     .. _monasca_common: https://github.com/openstack/monasca-common
   61 
   62     """
   63 
   64     def __init__(self):
   65         self._topics = CONF.log_publisher.topics
   66         self.max_message_size = CONF.log_publisher.max_message_size
   67 
   68         self._kafka_publisher = producer.KafkaProducer(
   69             url=CONF.log_publisher.kafka_url
   70         )
   71         if CONF.monitoring.enable:
   72             self._statsd = client.get_client()
   73 
   74             # setup counter, gauges etc
   75             self._logs_published_counter = self._statsd.get_counter(
   76                 metrics.LOGS_PUBLISHED_METRIC
   77             )
   78             self._publish_time_ms = self._statsd.get_timer(
   79                 metrics.LOGS_PUBLISH_TIME_METRIC
   80             )
   81             self._logs_lost_counter = self._statsd.get_counter(
   82                 metrics.LOGS_PUBLISHED_LOST_METRIC
   83             )
   84             self._logs_truncated_gauge = self._statsd.get_gauge(
   85                 metrics.LOGS_TRUNCATED_METRIC
   86             )
   87 
   88         LOG.info('Initializing LogPublisher <%s>', self)
   89 
   90     def send_message(self, messages):
   91         """Sends message to each configured topic.
   92 
   93         Note:
   94             Falsy messages (i.e. empty) are not shipped to kafka
   95 
   96         See also
   97             * :py:class:`monasca_log_api.common.model.Envelope`
   98             * :py:meth:`._is_message_valid`
   99 
  100         :param dict|list messages: instance (or instances) of log envelope
  101         """
  102 
  103         if not messages:
  104             return
  105         if not isinstance(messages, list):
  106             messages = [messages]
  107 
  108         sent_counter = 0
  109         num_of_msgs = len(messages)
  110 
  111         LOG.debug('About to publish %d messages to %s topics',
  112                   num_of_msgs, self._topics)
  113 
  114         try:
  115             send_messages = []
  116 
  117             for message in messages:
  118                 msg = self._transform_message(message)
  119                 send_messages.append(msg)
  120             if CONF.monitoring.enable:
  121                 with self._publish_time_ms.time(name=None):
  122                     self._publish(send_messages)
  123             else:
  124                 self._publish(send_messages)
  125 
  126             sent_counter = len(send_messages)
  127         except Exception as ex:
  128             LOG.exception('Failure in publishing messages to kafka')
  129             raise ex
  130         finally:
  131             self._after_publish(sent_counter, num_of_msgs)
  132 
  133     def _transform_message(self, message):
  134         """Transforms message into JSON.
  135 
  136         Method executes transformation operation for
  137         single element. Operation is set of following
  138         operations:
  139 
  140         * checking if message is valid
  141             (:py:func:`.LogPublisher._is_message_valid`)
  142         * truncating message if necessary
  143             (:py:func:`.LogPublisher._truncate`)
  144 
  145         :param model.Envelope message: instance of message
  146         :return: serialized message
  147         :rtype: str
  148         """
  149         if not self._is_message_valid(message):
  150             raise InvalidMessageException()
  151         truncated = self._truncate(message)
  152         return encodeutils.safe_encode(truncated, incoming='utf-8')
  153 
  154     def _truncate(self, envelope):
  155         """Truncates the message if needed.
  156 
  157         Each message send to kafka is verified.
  158         Method checks if message serialized to json
  159         exceeds maximum allowed size that can be posted to kafka
  160         queue. If so, method truncates message property of the log
  161         by difference between message and allowed size.
  162 
  163         :param Envelope envelope: original envelope
  164         :return: serialized message
  165         :rtype: str
  166         """
  167 
  168         msg_str = model.serialize_envelope(envelope)
  169         envelope_size = ((len(bytearray(msg_str, 'utf-8', 'replace')) +
  170                           _TIMESTAMP_KEY_SIZE +
  171                           _KAFKA_META_DATA_SIZE)
  172                          if msg_str is not None else -1)
  173 
  174         diff_size = ((envelope_size - self.max_message_size) +
  175                      _TRUNCATION_SAFE_OFFSET)
  176 
  177         if diff_size > 1:
  178             truncated_by = diff_size + _TRUNCATED_PROPERTY_SIZE
  179 
  180             LOG.warning(('Detected message that exceeds %d bytes,'
  181                          'message will be truncated by %d bytes'),
  182                         self.max_message_size,
  183                         truncated_by)
  184 
  185             log_msg = envelope['log']['message']
  186             truncated_log_msg = log_msg[:-truncated_by]
  187 
  188             envelope['log']['truncated'] = True
  189             envelope['log']['message'] = truncated_log_msg
  190             if CONF.monitoring.enable:
  191                 self._logs_truncated_gauge.send(name=None, value=truncated_by)
  192 
  193             msg_str = rest_utils.as_json(envelope)
  194         else:
  195             if CONF.monitoring.enable:
  196                 self._logs_truncated_gauge.send(name=None, value=0)
  197 
  198         return msg_str
  199 
  200     def _publish(self, messages):
  201         """Publishes messages to kafka.
  202 
  203         :param list messages: list of messages
  204         """
  205         num_of_msg = len(messages)
  206 
  207         LOG.debug('Publishing %d messages', num_of_msg)
  208 
  209         try:
  210             for topic in self._topics:
  211                 self._kafka_publisher.publish(
  212                     topic,
  213                     messages
  214                 )
  215                 LOG.debug('Sent %d messages to topic %s', num_of_msg, topic)
  216         except Exception as ex:
  217             raise falcon.HTTPServiceUnavailable('Service unavailable',
  218                                                 str(ex), 60)
  219 
  220     @staticmethod
  221     def _is_message_valid(message):
  222         """Validates message before sending.
  223 
  224         Methods checks if message is :py:class:`model.Envelope`.
  225         By being instance of this class it is ensured that all required
  226         keys are found and they will have their values.
  227 
  228         """
  229         return message and isinstance(message, model.Envelope)
  230 
  231     def _after_publish(self, send_count, to_send_count):
  232         """Executed after publishing to sent metrics.
  233 
  234         :param int send_count: how many messages have been sent
  235         :param int to_send_count: how many messages should be sent
  236 
  237         """
  238 
  239         failed_to_send = to_send_count - send_count
  240 
  241         if failed_to_send == 0:
  242             LOG.debug('Successfully published all [%d] messages',
  243                       send_count)
  244         else:
  245             error_str = ('Failed to send all messages, %d '
  246                          'messages out of %d have not been published')
  247             LOG.error(error_str, failed_to_send, to_send_count)
  248         if CONF.monitoring.enable:
  249             self._logs_published_counter.increment(value=send_count)
  250             self._logs_lost_counter.increment(value=failed_to_send)