"Fossies" - the Fresh Open Source Software Archive

Member "monasca-api-3.1.0/monasca_api/tests/test_log_publisher.py" (27 Sep 2019, 10258 Bytes) of package /linux/misc/openstack/monasca-api-3.1.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the last Fossies "Diffs" side-by-side code changes report for "test_log_publisher.py": 3.1.0_vs_4.0.0.

    1 # Copyright 2015 kornicameister@gmail.com
    2 # Copyright 2016-2017 FUJITSU LIMITED
    3 #
    4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 # not use this file except in compliance with the License. You may obtain
    6 # a copy of the License at
    7 #
    8 #      http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 # Unless required by applicable law or agreed to in writing, software
   11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 # License for the specific language governing permissions and limitations
   14 # under the License.
   15 
   16 import copy
   17 import datetime
   18 import random
   19 
   20 import mock
   21 from oslo_config import cfg
   22 from oslo_log import log
   23 import six
   24 import ujson
   25 import unittest
   26 
   27 from monasca_api.api.core.log import log_publisher
   28 from monasca_api.api.core.log import model
   29 from monasca_api.tests import base
   30 
   31 
   32 LOG = log.getLogger(__name__)
   33 EPOCH_START = datetime.datetime(1970, 1, 1)
   34 
   35 
   36 class TestSendMessage(base.BaseTestCase):
   37 
   38     @mock.patch('monasca_api.api.core.log.log_publisher.producer'
   39                 '.KafkaProducer')
   40     def test_should_not_send_empty_message(self, _):
   41         instance = log_publisher.LogPublisher()
   42 
   43         instance._kafka_publisher = mock.Mock()
   44         instance.send_message({})
   45 
   46         self.assertFalse(instance._kafka_publisher.publish.called)
   47 
   48     @unittest.expectedFailure
   49     def test_should_not_send_message_not_dict(self):
   50         instance = log_publisher.LogPublisher()
   51         not_dict_value = 123
   52         instance.send_message(not_dict_value)
   53 
   54     @mock.patch('monasca_api.api.core.log.log_publisher.producer'
   55                 '.KafkaProducer')
   56     def test_should_not_send_message_missing_keys(self, _):
   57         # checks every combination of missing keys
   58         # test does not rely on those keys having a value or not,
   59         # it simply assumes that values are set but important
   60         # message (i.e. envelope) properties are missing entirely
   61         # that's why there are two loops instead of three
   62 
   63         instance = log_publisher.LogPublisher()
   64         keys = ['log', 'creation_time', 'meta']
   65 
   66         for key_1 in keys:
   67             diff = keys[:]
   68             diff.remove(key_1)
   69             for key_2 in diff:
   70                 message = {
   71                     key_1: random.randint(10, 20),
   72                     key_2: random.randint(30, 50)
   73                 }
   74                 self.assertRaises(log_publisher.InvalidMessageException,
   75                                   instance.send_message,
   76                                   message)
   77 
   78     @mock.patch('monasca_api.api.core.log.log_publisher.producer'
   79                 '.KafkaProducer')
   80     def test_should_not_send_message_missing_values(self, _):
   81         # original message assumes that every property has value
   82         # test modify each property one by one by removing that value
   83         # (i.e. creating false-like value)
   84         instance = log_publisher.LogPublisher()
   85         message = {
   86             'log': {
   87                 'message': '11'
   88             },
   89             'creation_time': 123456,
   90             'meta': {
   91                 'region': 'pl'
   92             }
   93         }
   94 
   95         for key in message:
   96             tmp_message = message
   97             tmp_message[key] = None
   98             self.assertRaises(log_publisher.InvalidMessageException,
   99                               instance.send_message,
  100                               tmp_message)
  101 
  102     @mock.patch('monasca_api.api.core.log.log_publisher.producer'
  103                 '.KafkaProducer')
  104     def test_should_send_message(self, kafka_producer):
  105         instance = log_publisher.LogPublisher()
  106         instance._kafka_publisher = kafka_producer
  107         instance.send_message({})
  108 
  109         creation_time = ((datetime.datetime.utcnow() - EPOCH_START)
  110                          .total_seconds())
  111         application_type = 'monasca-log-api'
  112         dimension_1_name = 'disk_usage'
  113         dimension_1_value = '50'
  114         dimension_2_name = 'cpu_time'
  115         dimension_2_value = '60'
  116 
  117         msg = model.Envelope(
  118             log={
  119                 'message': '1',
  120                 'application_type': application_type,
  121                 'dimensions': {
  122                     dimension_1_name: dimension_1_value,
  123                     dimension_2_name: dimension_2_value
  124                 }
  125             },
  126             meta={
  127                 'tenantId': '1'
  128             }
  129         )
  130         msg['creation_time'] = creation_time
  131         instance.send_message(msg)
  132 
  133         instance._kafka_publisher.publish.assert_called_once_with(
  134             cfg.CONF.kafka.logs_topics[0],
  135             [ujson.dumps(msg, ensure_ascii=False).encode('utf-8')])
  136 
  137     @mock.patch('monasca_api.api.core.log.log_publisher.producer'
  138                 '.KafkaProducer')
  139     def test_should_send_message_multiple_topics(self, _):
  140         topics = ['logs_topics', 'analyzer', 'tester']
  141         self.conf_override(logs_topics=topics,
  142                            group='kafka')
  143         self.conf_override(max_message_size=5000,
  144                            group='log_publisher')
  145 
  146         instance = log_publisher.LogPublisher()
  147         instance._kafka_publisher = mock.Mock()
  148         instance.send_message({})
  149 
  150         creation_time = ((datetime.datetime.utcnow() - EPOCH_START)
  151                          .total_seconds())
  152         dimension_1_name = 'disk_usage'
  153         dimension_1_value = '50'
  154         dimension_2_name = 'cpu_time'
  155         dimension_2_value = '60'
  156         application_type = 'monasca-log-api'
  157         msg = model.Envelope(
  158             log={
  159                 'message': '1',
  160                 'application_type': application_type,
  161                 'dimensions': {
  162                     dimension_1_name: dimension_1_value,
  163                     dimension_2_name: dimension_2_value
  164                 }
  165             },
  166             meta={
  167                 'tenantId': '1'
  168             }
  169         )
  170         msg['creation_time'] = creation_time
  171         json_msg = ujson.dumps(msg, ensure_ascii=False)
  172 
  173         instance.send_message(msg)
  174 
  175         self.assertEqual(len(topics),
  176                          instance._kafka_publisher.publish.call_count)
  177         for topic in topics:
  178             instance._kafka_publisher.publish.assert_any_call(
  179                 topic,
  180                 [json_msg.encode('utf-8')])
  181 
  182     @mock.patch('monasca_api.api.core.log.log_publisher.producer'
  183                 '.KafkaProducer')
  184     def test_should_send_unicode_message(self, kp):
  185         instance = log_publisher.LogPublisher()
  186         instance._kafka_publisher = kp
  187 
  188         for um in base.UNICODE_MESSAGES:
  189             case, msg = um.values()
  190             try:
  191                 envelope = model.Envelope(
  192                     log={
  193                         'message': msg,
  194                         'application_type': 'test',
  195                         'dimensions': {
  196                             'test': 'test_log_publisher',
  197                             'case': 'test_should_send_unicode_message'
  198                         }
  199                     },
  200                     meta={
  201                         'tenantId': 1
  202                     }
  203                 )
  204                 instance.send_message(envelope)
  205 
  206                 expected_message = ujson.dumps(envelope, ensure_ascii=False)
  207 
  208                 if six.PY3:
  209                     expected_message = expected_message.encode('utf-8')
  210 
  211                 instance._kafka_publisher.publish.assert_called_with(
  212                     cfg.CONF.kafka.logs_topics[0],
  213                     [expected_message]
  214                 )
  215             except Exception:
  216                 LOG.exception('Failed to evaluate unicode case %s', case)
  217                 raise
  218 
  219 
  220 @mock.patch(
  221     'monasca_api.api.core.log.log_publisher.producer'
  222     '.KafkaProducer')
  223 class TestTruncation(base.BaseTestCase):
  224     EXTRA_CHARS_SIZE = len(bytearray(ujson.dumps({
  225         'log': {
  226             'message': None
  227         }
  228     }), 'utf8')) - 2
  229 
  230     def test_should_not_truncate_message_if_size_is_smaller(self, _):
  231         diff_size = random.randint(1, 100)
  232         self._run_truncate_test(log_size_factor=-diff_size,
  233                                 truncate_by=0)
  234 
  235     def test_should_not_truncate_message_if_size_equal_to_max(self, _):
  236         self._run_truncate_test(log_size_factor=0,
  237                                 truncate_by=0)
  238 
  239     def test_should_truncate_too_big_message(self, _):
  240         diff_size = random.randint(1, 100)
  241         max_size = 1000
  242         truncate_by = ((max_size -
  243                         (max_size - log_publisher._TRUNCATED_PROPERTY_SIZE)) +
  244                        log_publisher._TRUNCATION_SAFE_OFFSET + diff_size)
  245         self._run_truncate_test(max_message_size=1000,
  246                                 log_size_factor=diff_size,
  247                                 truncate_by=truncate_by)
  248 
  249     def _run_truncate_test(self,
  250                            max_message_size=1000,
  251                            log_size_factor=0,
  252                            truncate_by=0,
  253                            gen_fn=base.generate_unique_message):
  254 
  255         log_size = (max_message_size -
  256                     TestTruncation.EXTRA_CHARS_SIZE -
  257                     log_publisher._KAFKA_META_DATA_SIZE -
  258                     log_publisher._TIMESTAMP_KEY_SIZE +
  259                     log_size_factor)
  260 
  261         expected_log_message_size = log_size - truncate_by
  262 
  263         self.conf_override(
  264             group='log_publisher',
  265             max_message_size=max_message_size
  266         )
  267 
  268         log_msg = gen_fn(log_size)
  269         envelope = {
  270             'log': {
  271                 'message': log_msg
  272             }
  273         }
  274 
  275         instance = log_publisher.LogPublisher()
  276 
  277         envelope_copy = copy.deepcopy(envelope)
  278         json_envelope = instance._truncate(envelope_copy)
  279 
  280         parsed_envelope = ujson.loads(json_envelope)
  281 
  282         parsed_log_message = parsed_envelope['log']['message']
  283         parsed_log_message_len = len(parsed_log_message)
  284 
  285         if truncate_by > 0:
  286             self.assertNotEqual(envelope['log']['message'],
  287                                 parsed_log_message)
  288         else:
  289             self.assertEqual(envelope['log']['message'],
  290                              parsed_log_message)
  291 
  292         self.assertEqual(expected_log_message_size, parsed_log_message_len)