"Fossies" - the Fresh Open Source Software Archive

Member "panko-8.1.0/panko/tests/functional/test_bin.py" (4 Aug 2021, 2933 Bytes) of package /linux/misc/openstack/panko-8.1.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_bin.py": 8.0.0_vs_8.1.0.

    1 # Copyright 2012 eNovance <licensing@enovance.com>
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 
   15 import os
   16 import subprocess
   17 
   18 from oslo_utils import fileutils
   19 import six
   20 
   21 from panko.tests import base
   22 
   23 
   24 class BinTestCase(base.BaseTestCase):
   25     def setUp(self):
   26         super(BinTestCase, self).setUp()
   27         content = ("[database]\n"
   28                    "connection=log://localhost\n")
   29         if six.PY3:
   30             content = content.encode('utf-8')
   31         self.tempfile = fileutils.write_to_tempfile(content=content,
   32                                                     prefix='panko',
   33                                                     suffix='.conf')
   34 
   35     def tearDown(self):
   36         super(BinTestCase, self).tearDown()
   37         os.remove(self.tempfile)
   38 
   39     def test_dbsync_run(self):
   40         subp = subprocess.Popen(['panko-dbsync',
   41                                  "--config-file=%s" % self.tempfile])
   42         self.assertEqual(0, subp.wait())
   43 
   44     def test_run_expirer_ttl_disabled(self):
   45         subp = subprocess.Popen(['panko-expirer',
   46                                  '-d',
   47                                  "--config-file=%s" % self.tempfile],
   48                                 stdout=subprocess.PIPE)
   49         out, __ = subp.communicate()
   50         self.assertEqual(0, subp.poll())
   51         self.assertIn(b"Nothing to clean, database event "
   52                       b"time to live is disabled", out)
   53 
   54     def _test_run_expirer_ttl_enabled(self, ttl_name, data_name):
   55         content = ("[database]\n"
   56                    "%s=1\n"
   57                    "connection=log://localhost\n" % ttl_name)
   58         if six.PY3:
   59             content = content.encode('utf-8')
   60         self.tempfile = fileutils.write_to_tempfile(content=content,
   61                                                     prefix='panko',
   62                                                     suffix='.conf')
   63         subp = subprocess.Popen(['panko-expirer',
   64                                  '-d',
   65                                  "--config-file=%s" % self.tempfile],
   66                                 stdout=subprocess.PIPE)
   67         out, __ = subp.communicate()
   68         self.assertEqual(0, subp.poll())
   69         msg = "Dropping 100 %ss data with TTL 1" % data_name
   70         if six.PY3:
   71             msg = msg.encode('utf-8')
   72         self.assertIn(msg, out)
   73 
   74     def test_run_expirer_ttl_enabled(self):
   75         self._test_run_expirer_ttl_enabled('event_time_to_live', 'event')