"Fossies" - the Fresh Open Source Software Archive

Member "monasca-events-api-2.0.0/monasca_events_api/app/controller/v1/bulk_processor.py" (14 Oct 2020, 3179 Bytes) of package /linux/misc/openstack/monasca-events-api-2.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "bulk_processor.py" see the Fossies "Dox" file reference documentation.

    1 # Copyright 2018 FUJITSU LIMITED
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 
   15 from monasca_common.rest import utils as rest_utils
   16 from monasca_events_api.app.common import events_publisher
   17 from monasca_events_api.app.model import envelope
   18 from monasca_events_api import conf
   19 from oslo_log import log
   20 from oslo_utils import encodeutils
   21 
   22 LOG = log.getLogger(__name__)
   23 CONF = conf.CONF
   24 
   25 
   26 class EventsBulkProcessor(events_publisher.EventPublisher):
   27     """BulkProcessor for effective events processing and publishing.
   28 
   29     BulkProcessor is customized version of
   30     :py:class:`monasca_events_api.app.base.event_publisher.EventPublisher`
   31     that utilizes processing of bulk request inside single loop.
   32 
   33     """
   34 
   35     def send_message(self, events, event_project_id=None):
   36         """Sends bulk package to kafka
   37 
   38         :param list events: received events
   39         :param str event_project_id: project id
   40         """
   41 
   42         num_of_msgs = len(events) if events else 0
   43         to_send_msgs = []
   44 
   45         LOG.debug('Bulk package <events=%d>',
   46                   num_of_msgs)
   47 
   48         for ev_el in events:
   49             try:
   50                 t_el = self._transform_message(ev_el, event_project_id)
   51                 if t_el:
   52                     to_send_msgs.append(t_el)
   53             except Exception as ex:
   54                 LOG.error('Failed to transform message to json. '
   55                           'message: {} Exception {}'.format(ev_el, str(ex)))
   56 
   57         sent_count = len(to_send_msgs)
   58         try:
   59             self._publish(to_send_msgs)
   60         except Exception as ex:
   61             LOG.error('Failed to send bulk package <events=%d, dimensions=%s>',
   62                       num_of_msgs)
   63             LOG.exception(ex)
   64             raise ex
   65         finally:
   66             self._check_if_all_messages_was_publish(num_of_msgs, sent_count)
   67 
   68     def _transform_message(self, event_element, event_project_id):
   69         """Transform the message
   70 
   71         :param dict event_element: original event element
   72         :param str event_project_id: project id
   73         :return: message payload
   74         """
   75         try:
   76             msg_json = rest_utils.as_json(event_element)
   77             msg_json = encodeutils.safe_encode(msg_json, 'utf-8')
   78 
   79             event_envelope = envelope.Envelope.new_envelope(
   80                 event=msg_json,
   81                 project_id=event_project_id,
   82             )
   83 
   84             msg_payload = (super(EventsBulkProcessor, self)
   85                            ._transform_message(event_envelope))
   86             return msg_payload
   87 
   88         except Exception as ex:
   89             LOG.error("Event transformation failed, rejecting event")
   90             LOG.exception(ex)
   91             return None