"Fossies" - the Fresh Open Source Software Archive

Member "panko-8.1.0/panko/tests/functional/storage/test_impl_mongodb.py" (4 Aug 2021, 2279 Bytes) of package /linux/misc/openstack/panko-8.1.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file.

    1 #
    2 # Copyright 2012 New Dream Network, LLC (DreamHost)
    3 #
    4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 # not use this file except in compliance with the License. You may obtain
    6 # a copy of the License at
    7 #
    8 #      http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 # Unless required by applicable law or agreed to in writing, software
   11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 # License for the specific language governing permissions and limitations
   14 # under the License.
   15 """Tests for panko/storage/impl_mongodb.py
   16 
   17 .. note::
   18   In order to run the tests against another MongoDB server set the
   19   environment variable PANKO_TEST_MONGODB_URL to point to a MongoDB
   20   server before running the tests.
   21 
   22 """
   23 
   24 from panko.storage import impl_mongodb
   25 from panko.tests import base as test_base
   26 from panko.tests import db as tests_db
   27 
   28 
   29 @tests_db.run_with('mongodb')
   30 class IndexTest(tests_db.TestBase):
   31 
   32     def test_event_ttl_index_absent(self):
   33         # create a fake index and check it is deleted
   34         self.conn.clear_expired_data(-1, 0)
   35         self.assertNotIn("event_ttl",
   36                          self.conn.db.event.index_information())
   37 
   38         self.conn.clear_expired_data(456789, 0)
   39         self.assertEqual(456789,
   40                          self.conn.db.event.index_information()
   41                          ["event_ttl"]['expireAfterSeconds'])
   42 
   43     def test_event_ttl_index_present(self):
   44         self.conn.clear_expired_data(456789, 0)
   45         self.assertEqual(456789,
   46                          self.conn.db.event.index_information()
   47                          ["event_ttl"]['expireAfterSeconds'])
   48 
   49         self.conn.clear_expired_data(-1, 0)
   50         self.assertNotIn("event_ttl",
   51                          self.conn.db.event.index_information())
   52 
   53 
   54 class CapabilitiesTest(test_base.BaseTestCase):
   55     # Check the returned capabilities list, which is specific to each DB
   56     # driver
   57 
   58     def test_capabilities(self):
   59         expected_capabilities = {
   60             'events': {'query': {'simple': True}},
   61         }
   62         actual_capabilities = impl_mongodb.Connection.get_capabilities()
   63         self.assertEqual(expected_capabilities, actual_capabilities)