"Fossies" - the Fresh Open Source Software Archive

Member "cinder-17.1.0/cinder/tests/unit/volume/drivers/dell_emc/powerstore/test_volume_attach_detach.py" (8 Mar 2021, 7689 Bytes) of package /linux/misc/openstack/cinder-17.1.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_volume_attach_detach.py": 17.0.1_vs_17.1.0.

    1 # Copyright (c) 2020 Dell Inc. or its subsidiaries.
    2 # All Rights Reserved.
    3 #
    4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 # not use this file except in compliance with the License. You may obtain
    6 # a copy of the License at
    7 #
    8 #      http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 # Unless required by applicable law or agreed to in writing, software
   11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 # License for the specific language governing permissions and limitations
   14 # under the License.
   15 
   16 from unittest import mock
   17 
   18 from cinder import exception
   19 from cinder.objects import fields
   20 from cinder.objects import volume_attachment
   21 from cinder.tests.unit import fake_volume
   22 from cinder.tests.unit.volume.drivers.dell_emc import powerstore
   23 from cinder.volume.drivers.dell_emc.powerstore import utils
   24 
   25 
   26 class TestVolumeAttachDetach(powerstore.TestPowerStoreDriver):
   27     @mock.patch("cinder.volume.drivers.dell_emc.powerstore.client."
   28                 "PowerStoreClient.get_chap_config")
   29     @mock.patch("cinder.volume.drivers.dell_emc.powerstore.client."
   30                 "PowerStoreClient.get_appliance_id_by_name")
   31     def setUp(self, mock_appliance, mock_chap):
   32         super(TestVolumeAttachDetach, self).setUp()
   33         mock_appliance.return_value = "A1"
   34         mock_chap.return_value = {"mode": "Single"}
   35         self.iscsi_driver.check_for_setup_error()
   36         self.fc_driver.check_for_setup_error()
   37         self.volume = fake_volume.fake_volume_obj(
   38             {},
   39             host="host@backend#test-appliance",
   40             provider_id="fake_id",
   41             size=8
   42         )
   43         self.volume.volume_attachment = (
   44             volume_attachment.VolumeAttachmentList()
   45         )
   46         self.volume.volume_attachment.objects = [
   47             volume_attachment.VolumeAttachment(
   48                 attach_status=fields.VolumeAttachStatus.ATTACHED,
   49                 attached_host=self.volume.host
   50             ),
   51             volume_attachment.VolumeAttachment(
   52                 attach_status=fields.VolumeAttachStatus.ATTACHED,
   53                 attached_host=self.volume.host
   54             )
   55         ]
   56         fake_iscsi_targets_response = [
   57             {
   58                 "address": "1.2.3.4",
   59                 "ip_port": {
   60                     "target_iqn":
   61                         "iqn.2020-07.com.dell:dellemc-powerstore-test-iqn-1"
   62                 },
   63             },
   64             {
   65                 "address": "5.6.7.8",
   66                 "ip_port": {
   67                     "target_iqn":
   68                         "iqn.2020-07.com.dell:dellemc-powerstore-test-iqn-1"
   69                 },
   70             },
   71         ]
   72         fake_fc_wwns_response = [
   73             {
   74                 "wwn": "58:cc:f0:98:49:21:07:02"
   75             },
   76             {
   77                 "wwn": "58:cc:f0:98:49:23:07:02"
   78             },
   79         ]
   80         self.fake_connector = {
   81             "host": self.volume.host,
   82             "wwpns": ["58:cc:f0:98:49:21:07:02", "58:cc:f0:98:49:23:07:02"],
   83             "initiator": "fake_initiator",
   84         }
   85         self.iscsi_targets_mock = self.mock_object(
   86             self.iscsi_driver.adapter.client,
   87             "get_ip_pool_address",
   88             return_value=fake_iscsi_targets_response
   89         )
   90         self.fc_wwns_mock = self.mock_object(
   91             self.fc_driver.adapter.client,
   92             "get_fc_port",
   93             return_value=fake_fc_wwns_response
   94         )
   95 
   96     def test_initialize_connection_chap_enabled(self):
   97         self.iscsi_driver.adapter.use_chap_auth = True
   98         with mock.patch.object(self.iscsi_driver.adapter,
   99                                "_create_host_and_attach",
  100                                return_value=(
  101                                    utils.get_chap_credentials(),
  102                                    1
  103                                )):
  104             connection_properties = self.iscsi_driver.initialize_connection(
  105                 self.volume,
  106                 self.fake_connector
  107             )
  108             self.assertIn("auth_username", connection_properties["data"])
  109             self.assertIn("auth_password", connection_properties["data"])
  110 
  111     def test_initialize_connection_chap_disabled(self):
  112         self.iscsi_driver.adapter.use_chap_auth = False
  113         with mock.patch.object(self.iscsi_driver.adapter,
  114                                "_create_host_and_attach",
  115                                return_value=(
  116                                    utils.get_chap_credentials(),
  117                                    1
  118                                )):
  119             connection_properties = self.iscsi_driver.initialize_connection(
  120                 self.volume,
  121                 self.fake_connector
  122             )
  123             self.assertNotIn("auth_username", connection_properties["data"])
  124             self.assertNotIn("auth_password", connection_properties["data"])
  125 
  126     def test_get_fc_targets(self):
  127         wwns = self.fc_driver.adapter._get_fc_targets("A1")
  128         self.assertEqual(2, len(wwns))
  129 
  130     def test_get_fc_targets_filtered(self):
  131         self.fc_driver.adapter.allowed_ports = ["58:cc:f0:98:49:23:07:02"]
  132         wwns = self.fc_driver.adapter._get_fc_targets("A1")
  133         self.assertEqual(1, len(wwns))
  134         self.assertFalse(
  135             utils.fc_wwn_to_string("58:cc:f0:98:49:21:07:02") in wwns
  136         )
  137 
  138     def test_get_fc_targets_filtered_no_matched_ports(self):
  139         self.fc_driver.adapter.allowed_ports = ["fc_wwn_1", "fc_wwn_2"]
  140         error = self.assertRaises(exception.VolumeBackendAPIException,
  141                                   self.fc_driver.adapter._get_fc_targets,
  142                                   "A1")
  143         self.assertIn("There are no accessible Fibre Channel targets on the "
  144                       "system.", error.msg)
  145 
  146     def test_get_iscsi_targets(self):
  147         iqns, portals = self.iscsi_driver.adapter._get_iscsi_targets("A1")
  148         self.assertTrue(len(iqns) == len(portals))
  149         self.assertEqual(2, len(portals))
  150 
  151     def test_get_iscsi_targets_filtered(self):
  152         self.iscsi_driver.adapter.allowed_ports = ["1.2.3.4"]
  153         iqns, portals = self.iscsi_driver.adapter._get_iscsi_targets("A1")
  154         self.assertTrue(len(iqns) == len(portals))
  155         self.assertEqual(1, len(portals))
  156         self.assertFalse(
  157             "iqn.2020-07.com.dell:dellemc-powerstore-test-iqn-2" in iqns
  158         )
  159 
  160     def test_get_iscsi_targets_filtered_no_matched_ports(self):
  161         self.iscsi_driver.adapter.allowed_ports = ["1.1.1.1", "2.2.2.2"]
  162         error = self.assertRaises(exception.VolumeBackendAPIException,
  163                                   self.iscsi_driver.adapter._get_iscsi_targets,
  164                                   "A1")
  165         self.assertIn("There are no accessible iSCSI targets on the system.",
  166                       error.msg)
  167 
  168     @mock.patch("cinder.volume.drivers.dell_emc.powerstore.adapter."
  169                 "CommonAdapter._detach_volume_from_hosts")
  170     @mock.patch("cinder.volume.drivers.dell_emc.powerstore.adapter."
  171                 "CommonAdapter._filter_hosts_by_initiators")
  172     def test_detach_multiattached_volume(self, mock_filter_hosts, mock_detach):
  173         self.iscsi_driver.terminate_connection(self.volume,
  174                                                self.fake_connector)
  175         mock_filter_hosts.assert_not_called()
  176         mock_detach.assert_not_called()
  177         self.volume.volume_attachment.objects.pop()
  178         self.iscsi_driver.terminate_connection(self.volume,
  179                                                self.fake_connector)
  180         mock_filter_hosts.assert_called_once()
  181         mock_detach.assert_called_once()