"Fossies" - the Fresh Open Source Software Archive

Member "manila-8.1.3/manila/tests/share/drivers/dell_emc/plugins/unity/test_client.py" (20 Jul 2020, 10005 Bytes) of package /linux/misc/openstack/manila-8.1.3.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_client.py": 8.1.2_vs_8.1.3.

    1 # Copyright (c) 2016 EMC Corporation.
    2 # All Rights Reserved.
    3 #
    4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 #    not use this file except in compliance with the License. You may obtain
    6 #    a copy of the License at
    7 #
    8 #         http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 #    Unless required by applicable law or agreed to in writing, software
   11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 #    License for the specific language governing permissions and limitations
   14 #    under the License.
   15 
   16 import ddt
   17 import mock
   18 from oslo_utils import units
   19 
   20 from manila import exception
   21 from manila import test
   22 from manila.tests.share.drivers.dell_emc.plugins.unity import fake_exceptions
   23 from manila.tests.share.drivers.dell_emc.plugins.unity import res_mock
   24 
   25 
   26 @ddt.ddt
   27 class TestClient(test.TestCase):
   28     @res_mock.mock_client_input
   29     @res_mock.patch_client
   30     def test_create_cifs_share__existed_expt(self, client, mocked_input):
   31         resource = mocked_input['filesystem']
   32         share = mocked_input['cifs_share']
   33 
   34         new_share = client.create_cifs_share(resource, share.name)
   35 
   36         self.assertEqual(share.name, new_share.name)
   37 
   38     @res_mock.mock_client_input
   39     @res_mock.patch_client
   40     def test_create_nfs_share__existed_expt(self, client, mocked_input):
   41         resource = mocked_input['filesystem']
   42         share = mocked_input['nfs_share']
   43         new_share = client.create_nfs_share(resource, share.name)
   44 
   45         self.assertEqual(share.name, new_share.name)
   46 
   47     @res_mock.mock_client_input
   48     @res_mock.patch_client
   49     def test_create_nfs_filesystem_and_share(self, client, mocked_input):
   50         pool = mocked_input['pool']
   51         nas_server = mocked_input['nas_server']
   52         share = mocked_input['nfs_share']
   53 
   54         client.create_nfs_filesystem_and_share(
   55             pool, nas_server, share.name,
   56             share.size)
   57 
   58     @res_mock.mock_client_input
   59     @res_mock.patch_client
   60     def test_get_share_with_invalid_proto(self, client, mocked_input):
   61         share = mocked_input['share']
   62 
   63         self.assertRaises(exception.BadConfigurationException,
   64                           client.get_share,
   65                           share.name,
   66                           'fake_proto')
   67 
   68     @res_mock.mock_client_input
   69     @res_mock.patch_client
   70     def test_create_filesystem__existed_expt(self, client, mocked_input):
   71         pool = mocked_input['pool']
   72         nas_server = mocked_input['nas_server']
   73         filesystem = mocked_input['filesystem']
   74 
   75         new_filesystem = client.create_filesystem(pool,
   76                                                   nas_server,
   77                                                   filesystem.name,
   78                                                   filesystem.size,
   79                                                   filesystem.proto)
   80 
   81         self.assertEqual(filesystem.name, new_filesystem.name)
   82 
   83     @res_mock.mock_client_input
   84     @res_mock.patch_client
   85     def test_delete_filesystem__nonexistent_expt(self, client, mocked_input):
   86         filesystem = mocked_input['filesystem']
   87 
   88         client.delete_filesystem(filesystem)
   89 
   90     @res_mock.mock_client_input
   91     @res_mock.patch_client
   92     def test_create_nas_server__existed_expt(self, client, mocked_input):
   93         sp = mocked_input['sp']
   94         pool = mocked_input['pool']
   95         nas_server = mocked_input['nas_server']
   96 
   97         new_nas_server = client.create_nas_server(nas_server.name, sp, pool)
   98 
   99         self.assertEqual(nas_server.name, new_nas_server.name)
  100 
  101     @res_mock.mock_client_input
  102     @res_mock.patch_client
  103     def test_delete_nas_server__nonexistent_expt(self, client, mocked_input):
  104         nas_server = mocked_input['nas_server']
  105 
  106         client.delete_nas_server(nas_server.name)
  107 
  108     @res_mock.mock_client_input
  109     @res_mock.patch_client
  110     def test_create_dns_server__existed_expt(self, client, mocked_input):
  111         nas_server = mocked_input['nas_server']
  112 
  113         client.create_dns_server(nas_server, 'fake_domain', 'fake_dns_ip')
  114 
  115     @res_mock.mock_client_input
  116     @res_mock.patch_client
  117     def test_create_interface__existed_expt(self, client, mocked_input):
  118         nas_server = mocked_input['nas_server']
  119         self.assertRaises(exception.IPAddressInUse, client.create_interface,
  120                           nas_server, 'fake_ip_addr', 'fake_mask',
  121                           'fake_gateway', port_id='fake_port_id')
  122 
  123     @res_mock.mock_client_input
  124     @res_mock.patch_client
  125     def test_enable_cifs_service__existed_expt(self, client, mocked_input):
  126         nas_server = mocked_input['nas_server']
  127 
  128         client.enable_cifs_service(
  129             nas_server, 'domain_name', 'fake_user', 'fake_passwd')
  130 
  131     @res_mock.mock_client_input
  132     @res_mock.patch_client
  133     def test_enable_nfs_service__existed_expt(self, client, mocked_input):
  134         nas_server = mocked_input['nas_server']
  135 
  136         client.enable_nfs_service(nas_server)
  137 
  138     @res_mock.mock_client_input
  139     @res_mock.patch_client
  140     def test_create_snapshot__existed_expt(self, client, mocked_input):
  141         nas_server = mocked_input['filesystem']
  142         exp_snap = mocked_input['snapshot']
  143 
  144         client.create_snapshot(nas_server, exp_snap.name)
  145 
  146     @res_mock.mock_client_input
  147     @res_mock.patch_client
  148     def test_create_snap_of_snap__existed_expt(self, client, mocked_input):
  149         snapshot = mocked_input['src_snapshot']
  150         dest_snap = mocked_input['dest_snapshot']
  151 
  152         new_snap = client.create_snap_of_snap(snapshot, dest_snap.name)
  153 
  154         self.assertEqual(dest_snap.name, new_snap.name)
  155 
  156     @res_mock.mock_client_input
  157     @res_mock.patch_client
  158     def test_delete_snapshot__nonexistent_expt(self, client, mocked_input):
  159         snapshot = mocked_input['snapshot']
  160 
  161         client.delete_snapshot(snapshot)
  162 
  163     @res_mock.patch_client
  164     def test_cifs_deny_access__nonexistentuser_expt(self, client):
  165         try:
  166             client.cifs_deny_access('fake_share_name', 'fake_username')
  167         except fake_exceptions.UnityAclUserNotFoundError:
  168             self.fail("UnityAclUserNotFoundError raised unexpectedly!")
  169 
  170     @res_mock.patch_client
  171     def test_nfs_deny_access__nonexistent_expt(self, client):
  172         client.nfs_deny_access('fake_share_name', 'fake_ip_addr')
  173 
  174     @res_mock.patch_client
  175     def test_get_storage_processor(self, client):
  176         sp = client.get_storage_processor(sp_id='SPA')
  177 
  178         self.assertEqual('SPA', sp.name)
  179 
  180     @res_mock.mock_client_input
  181     @res_mock.patch_client
  182     def test_extend_filesystem(self, client, mocked_input):
  183         fs = mocked_input['fs']
  184 
  185         size = client.extend_filesystem(fs, 5)
  186 
  187         self.assertEqual(5 * units.Gi, size)
  188 
  189     @res_mock.mock_client_input
  190     @res_mock.patch_client
  191     def test_shrink_filesystem(self, client, mocked_input):
  192         fs = mocked_input['fs']
  193 
  194         size = client.shrink_filesystem('fake_share_id_1', fs, 4)
  195 
  196         self.assertEqual(4 * units.Gi, size)
  197 
  198     @res_mock.mock_client_input
  199     @res_mock.patch_client
  200     def test_shrink_filesystem_size_too_small(self, client, mocked_input):
  201         fs = mocked_input['fs']
  202 
  203         self.assertRaises(exception.ShareShrinkingPossibleDataLoss,
  204                           client.shrink_filesystem, 'fake_share_id_2', fs, 4)
  205 
  206     @res_mock.patch_client
  207     def test_get_file_ports(self, client):
  208         ports = client.get_file_ports()
  209         self.assertEqual(2, len(ports))
  210 
  211     @res_mock.patch_client
  212     def test_get_tenant(self, client):
  213         tenant = client.get_tenant('test', 5)
  214         self.assertEqual('tenant_1', tenant.id)
  215 
  216     @res_mock.patch_client
  217     def test_get_tenant_preexist(self, client):
  218         tenant = client.get_tenant('test', 6)
  219         self.assertEqual('tenant_1', tenant.id)
  220 
  221     @res_mock.patch_client
  222     def test_get_tenant_name_inuse_but_vlan_not_used(self, client):
  223         self.assertRaises(fake_exceptions.UnityTenantNameInUseError,
  224                           client.get_tenant, 'test', 7)
  225 
  226     @res_mock.patch_client
  227     def test_get_tenant_for_vlan_0(self, client):
  228         tenant = client.get_tenant('tenant', 0)
  229         self.assertIsNone(tenant)
  230 
  231     @res_mock.patch_client
  232     def test_get_tenant_for_vlan_already_has_interfaces(self, client):
  233         tenant = client.get_tenant('tenant', 3)
  234         self.assertEqual('tenant_1', tenant.id)
  235 
  236     @res_mock.mock_client_input
  237     @res_mock.patch_client
  238     def test_create_file_interface_ipv6(self, client, mocked_input):
  239         mock_nas_server = mock.Mock()
  240         mock_nas_server.create_file_interface = mock.Mock(return_value=None)
  241         mock_file_interface = mocked_input['file_interface']
  242         mock_port_id = mock.Mock()
  243         client.create_interface(mock_nas_server,
  244                                 mock_file_interface.ip_addr,
  245                                 netmask=None,
  246                                 gateway=mock_file_interface.gateway,
  247                                 port_id=mock_port_id,
  248                                 vlan_id=mock_file_interface.vlan_id,
  249                                 prefix_length=mock_file_interface.prefix_length
  250                                 )
  251         mock_nas_server.create_file_interface.assert_called_once_with(
  252             mock_port_id,
  253             mock_file_interface.ip_addr,
  254             netmask=None,
  255             v6_prefix_length=mock_file_interface.prefix_length,
  256             gateway=mock_file_interface.gateway,
  257             vlan_id=mock_file_interface.vlan_id)
  258 
  259     @res_mock.patch_client
  260     def test_get_snapshot(self, client):
  261         snapshot = client.get_snapshot('Snapshot_1')
  262         self.assertEqual('snapshot_1', snapshot.id)
  263 
  264     @res_mock.patch_client
  265     def test_restore_snapshot(self, client):
  266         snapshot = client.get_snapshot('Snapshot_1')
  267         rst = client.restore_snapshot(snapshot.name)
  268         self.assertIs(True, rst)
  269         snapshot.restore.assert_called_once_with(delete_backup=True)