"Fossies" - the Fresh Open Source Software Archive

Member "cinder-14.0.2/cinder/tests/unit/group/test_groups_manager_replication.py" (4 Oct 2019, 5560 Bytes) of package /linux/misc/openstack/cinder-14.0.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the last Fossies "Diffs" side-by-side code changes report for "test_groups_manager_replication.py": 14.0.2_vs_15.0.0.

    1 # Copyright (C) 2017 Dell Inc. or its subsidiaries.
    2 # All Rights Reserved.
    3 #
    4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 #    not use this file except in compliance with the License. You may obtain
    6 #    a copy of the License at
    7 #
    8 #         http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 #    Unless required by applicable law or agreed to in writing, software
   11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 #    License for the specific language governing permissions and limitations
   14 #    under the License.
   15 
   16 import ddt
   17 import mock
   18 from oslo_config import cfg
   19 from oslo_utils import importutils
   20 
   21 from cinder import context
   22 from cinder import exception
   23 from cinder import objects
   24 from cinder.objects import fields
   25 from cinder import quota
   26 from cinder import test
   27 from cinder.tests.unit import fake_constants as fake
   28 from cinder.tests.unit import utils as tests_utils
   29 from cinder.volume import api as volume_api
   30 from cinder.volume import configuration as conf
   31 from cinder.volume import driver
   32 from cinder.volume import utils as volutils
   33 
   34 GROUP_QUOTAS = quota.GROUP_QUOTAS
   35 CONF = cfg.CONF
   36 
   37 
   38 @ddt.ddt
   39 class GroupManagerTestCase(test.TestCase):
   40 
   41     def setUp(self):
   42         super(GroupManagerTestCase, self).setUp()
   43         self.volume = importutils.import_object(CONF.volume_manager)
   44         self.configuration = mock.Mock(conf.Configuration)
   45         self.context = context.get_admin_context()
   46         self.context.user_id = fake.USER_ID
   47         self.project_id = fake.PROJECT3_ID
   48         self.context.project_id = self.project_id
   49         self.volume.driver.set_initialized()
   50         self.volume.stats = {'allocated_capacity_gb': 0,
   51                              'pools': {}}
   52         self.volume_api = volume_api.API()
   53 
   54     @mock.patch.object(GROUP_QUOTAS, "reserve",
   55                        return_value=["RESERVATION"])
   56     @mock.patch.object(GROUP_QUOTAS, "commit")
   57     @mock.patch.object(GROUP_QUOTAS, "rollback")
   58     @mock.patch.object(driver.VolumeDriver,
   59                        "delete_group",
   60                        return_value=({'status': (
   61                            fields.GroupStatus.DELETED)}, []))
   62     @mock.patch.object(driver.VolumeDriver,
   63                        "enable_replication",
   64                        return_value=(None, []))
   65     @mock.patch.object(driver.VolumeDriver,
   66                        "disable_replication",
   67                        return_value=(None, []))
   68     @mock.patch.object(driver.VolumeDriver,
   69                        "failover_replication",
   70                        return_value=(None, []))
   71     def test_replication_group(self, fake_failover_rep, fake_disable_rep,
   72                                fake_enable_rep, fake_delete_grp,
   73                                fake_rollback, fake_commit, fake_reserve):
   74         """Test enable, disable, and failover replication for group."""
   75 
   76         def fake_driver_create_grp(context, group):
   77             """Make sure that the pool is part of the host."""
   78             self.assertIn('host', group)
   79             host = group.host
   80             pool = volutils.extract_host(host, level='pool')
   81             self.assertEqual('fakepool', pool)
   82             return {'status': fields.GroupStatus.AVAILABLE,
   83                     'replication_status': fields.ReplicationStatus.DISABLING}
   84 
   85         self.mock_object(self.volume.driver, 'create_group',
   86                          fake_driver_create_grp)
   87 
   88         group = tests_utils.create_group(
   89             self.context,
   90             availability_zone=CONF.storage_availability_zone,
   91             volume_type_ids=[fake.VOLUME_TYPE_ID],
   92             host='fakehost@fakedrv#fakepool',
   93             group_type_id=fake.GROUP_TYPE_ID)
   94         group = objects.Group.get_by_id(self.context, group.id)
   95         self.volume.create_group(self.context, group)
   96         self.assertEqual(
   97             group.id,
   98             objects.Group.get_by_id(context.get_admin_context(),
   99                                     group.id).id)
  100 
  101         self.volume.disable_replication(self.context, group)
  102         group = objects.Group.get_by_id(
  103             context.get_admin_context(), group.id)
  104         self.assertEqual(fields.ReplicationStatus.DISABLED,
  105                          group.replication_status)
  106 
  107         group.replication_status = fields.ReplicationStatus.ENABLING
  108         group.save()
  109         self.volume.enable_replication(self.context, group)
  110         group = objects.Group.get_by_id(
  111             context.get_admin_context(), group.id)
  112         self.assertEqual(fields.ReplicationStatus.ENABLED,
  113                          group.replication_status)
  114 
  115         group.replication_status = fields.ReplicationStatus.FAILING_OVER
  116         group.save()
  117         self.volume.failover_replication(self.context, group)
  118         group = objects.Group.get_by_id(
  119             context.get_admin_context(), group.id)
  120         self.assertEqual(fields.ReplicationStatus.FAILED_OVER,
  121                          group.replication_status)
  122 
  123         targets = self.volume.list_replication_targets(self.context, group)
  124         self.assertIn('replication_targets', targets)
  125 
  126         self.volume.delete_group(self.context, group)
  127         grp = objects.Group.get_by_id(
  128             context.get_admin_context(read_deleted='yes'), group.id)
  129         self.assertEqual(fields.GroupStatus.DELETED, grp.status)
  130         self.assertRaises(exception.NotFound,
  131                           objects.Group.get_by_id,
  132                           self.context,
  133                           group.id)