"Fossies" - the Fresh Open Source Software Archive

Member "manila-8.1.3/manila/tests/share/drivers/test_helpers.py" (20 Jul 2020, 33758 Bytes) of package /linux/misc/openstack/manila-8.1.3.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_helpers.py": 8.1.2_vs_8.1.3.

    1 # Copyright 2015 Mirantis Inc.
    2 # All Rights Reserved.
    3 #
    4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 # not use this file except in compliance with the License. You may obtain
    6 # a copy of the License at
    7 #
    8 # http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 # Unless required by applicable law or agreed to in writing, software
   11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 # License for the specific language governing permissions and limitations
   14 # under the License.
   15 
   16 import os
   17 
   18 import ddt
   19 import mock
   20 from oslo_config import cfg
   21 
   22 from manila.common import constants as const
   23 from manila import exception
   24 import manila.share.configuration
   25 from manila.share.drivers import helpers
   26 from manila import test
   27 from manila.tests import fake_compute
   28 from manila.tests import fake_utils
   29 from manila.tests.share.drivers import test_generic
   30 
   31 
   32 CONF = cfg.CONF
   33 
   34 
   35 @ddt.ddt
   36 class NFSHelperTestCase(test.TestCase):
   37     """Test case for NFS helper."""
   38 
   39     def setUp(self):
   40         super(NFSHelperTestCase, self).setUp()
   41         fake_utils.stub_out_utils_execute(self)
   42         self.fake_conf = manila.share.configuration.Configuration(None)
   43         self._ssh_exec = mock.Mock(return_value=('', ''))
   44         self._execute = mock.Mock(return_value=('', ''))
   45         self._helper = helpers.NFSHelper(self._execute, self._ssh_exec,
   46                                          self.fake_conf)
   47         ip = '10.254.0.3'
   48         self.server = fake_compute.FakeServer(
   49             ip=ip, public_address=ip, instance_id='fake_instance_id')
   50         self.share_name = 'fake_share_name'
   51 
   52     def test_init_helper(self):
   53 
   54         # mocks
   55         self.mock_object(
   56             self._helper, '_ssh_exec',
   57             mock.Mock(side_effect=exception.ProcessExecutionError(
   58                 stderr='command not found')))
   59 
   60         # run
   61         self.assertRaises(exception.ManilaException,
   62                           self._helper.init_helper, self.server)
   63 
   64         # asserts
   65         self._helper._ssh_exec.assert_called_once_with(
   66             self.server, ['sudo', 'exportfs'])
   67 
   68     def test_init_helper_log(self):
   69 
   70         # mocks
   71         self.mock_object(
   72             self._helper, '_ssh_exec',
   73             mock.Mock(side_effect=exception.ProcessExecutionError(
   74                 stderr='fake')))
   75 
   76         # run
   77         self._helper.init_helper(self.server)
   78 
   79         # asserts
   80         self._helper._ssh_exec.assert_called_once_with(
   81             self.server, ['sudo', 'exportfs'])
   82 
   83     @ddt.data(
   84         {"server": {"public_address": "1.2.3.4"}, "version": 4},
   85         {"server": {"public_address": "1001::1002"}, "version": 6},
   86         {"server": {"public_address": "1.2.3.4", "admin_ip": "5.6.7.8"},
   87          "version": 4},
   88         {"server": {"public_address": "1.2.3.4", "ip": "9.10.11.12"},
   89          "version": 4},
   90         {"server": {"public_address": "1001::1001", "ip": "1001::1002"},
   91          "version": 6},
   92         {"server": {"public_address": "1001::1002", "admin_ip": "1001::1002"},
   93          "version": 6},
   94         {"server": {"public_addresses": ["1001::1002"]}, "version": 6},
   95         {"server": {"public_addresses": ["1.2.3.4", "1001::1002"]},
   96          "version": {"1.2.3.4": 4, "1001::1002": 6}},
   97     )
   98     @ddt.unpack
   99     def test_create_exports(self, server, version):
  100         result = self._helper.create_exports(server, self.share_name)
  101 
  102         expected_export_locations = []
  103         path = os.path.join(CONF.share_mount_path, self.share_name)
  104         service_address = server.get("admin_ip", server.get("ip"))
  105         version_copy = version
  106 
  107         def convert_address(address, version):
  108             if version == 4:
  109                 return address
  110             return "[%s]" % address
  111 
  112         if 'public_addresses' in server:
  113             pairs = list(map(lambda addr: (addr, False),
  114                              server['public_addresses']))
  115         else:
  116             pairs = [(server['public_address'], False)]
  117 
  118         service_address = server.get("admin_ip", server.get("ip"))
  119         if service_address:
  120             pairs.append((service_address, True))
  121 
  122         for ip, is_admin in pairs:
  123             if isinstance(version_copy, dict):
  124                 version = version_copy.get(ip)
  125 
  126             expected_export_locations.append({
  127                 "path": "%s:%s" % (convert_address(ip, version), path),
  128                 "is_admin_only": is_admin,
  129                 "metadata": {
  130                     "export_location_metadata_example": "example",
  131                 },
  132             })
  133         self.assertEqual(expected_export_locations, result)
  134 
  135     @ddt.data(const.ACCESS_LEVEL_RW, const.ACCESS_LEVEL_RO)
  136     def test_update_access(self, access_level):
  137         expected_mount_options = '%s,no_subtree_check,no_root_squash'
  138         self.mock_object(self._helper, '_sync_nfs_temp_and_perm_files')
  139         local_path = os.path.join(CONF.share_mount_path, self.share_name)
  140         exec_result = ' '.join([local_path, '2.2.2.3'])
  141         self.mock_object(self._helper, '_ssh_exec',
  142                          mock.Mock(return_value=(exec_result, '')))
  143         access_rules = [
  144             test_generic.get_fake_access_rule('1.1.1.1', access_level),
  145             test_generic.get_fake_access_rule('2.2.2.2', access_level),
  146             test_generic.get_fake_access_rule('2.2.2.3', access_level)]
  147         add_rules = [
  148             test_generic.get_fake_access_rule('2.2.2.2', access_level),
  149             test_generic.get_fake_access_rule('2.2.2.3', access_level),
  150             test_generic.get_fake_access_rule('5.5.5.0/24', access_level)]
  151         delete_rules = [
  152             test_generic.get_fake_access_rule('3.3.3.3', access_level),
  153             test_generic.get_fake_access_rule('4.4.4.4', access_level, 'user'),
  154             test_generic.get_fake_access_rule('0.0.0.0/0', access_level)]
  155         self._helper.update_access(self.server, self.share_name, access_rules,
  156                                    add_rules=add_rules,
  157                                    delete_rules=delete_rules)
  158         local_path = os.path.join(CONF.share_mount_path, self.share_name)
  159         self._helper._ssh_exec.assert_has_calls([
  160             mock.call(self.server, ['sudo', 'exportfs']),
  161             mock.call(self.server, ['sudo', 'exportfs', '-u',
  162                                     ':'.join(['3.3.3.3', local_path])]),
  163             mock.call(self.server, ['sudo', 'exportfs', '-u',
  164                                     ':'.join(['*',
  165                                               local_path])]),
  166             mock.call(self.server, ['sudo', 'exportfs', '-o',
  167                                     expected_mount_options % access_level,
  168                                     ':'.join(['2.2.2.2', local_path])]),
  169             mock.call(self.server, ['sudo', 'exportfs', '-o',
  170                                     expected_mount_options % access_level,
  171                                     ':'.join(['5.5.5.0/24',
  172                                               local_path])]),
  173         ])
  174         self._helper._sync_nfs_temp_and_perm_files.assert_has_calls([
  175             mock.call(self.server), mock.call(self.server)])
  176 
  177     @ddt.data({'access': '10.0.0.1', 'result': '10.0.0.1'},
  178               {'access': '10.0.0.1/32', 'result': '10.0.0.1'},
  179               {'access': '10.0.0.0/24', 'result': '10.0.0.0/24'},
  180               {'access': '1001::1001', 'result': '[1001::1001]'},
  181               {'access': '1001::1000/128', 'result': '[1001::1000]'},
  182               {'access': '1001::1000/124', 'result': '[1001::1000]/124'})
  183     @ddt.unpack
  184     def test__get_parsed_address_or_cidr(self, access, result):
  185         self.assertEqual(result,
  186                          self._helper._get_parsed_address_or_cidr(access))
  187 
  188     @ddt.data('10.0.0.265', '10.0.0.1/33', '1001::10069', '1001::1000/129')
  189     def test__get_parsed_address_or_cidr_with_invalid_access(self, access):
  190         self.assertRaises(ValueError,
  191                           self._helper._get_parsed_address_or_cidr,
  192                           access)
  193 
  194     def test_update_access_invalid_type(self):
  195         access_rules = [test_generic.get_fake_access_rule(
  196             '2.2.2.2', const.ACCESS_LEVEL_RW, access_type='fake'), ]
  197         self.assertRaises(
  198             exception.InvalidShareAccess,
  199             self._helper.update_access,
  200             self.server,
  201             self.share_name,
  202             access_rules,
  203             [],
  204             [])
  205 
  206     def test_update_access_invalid_level(self):
  207         access_rules = [test_generic.get_fake_access_rule(
  208             '2.2.2.2', 'fake_level', access_type='ip'), ]
  209         self.assertRaises(
  210             exception.InvalidShareAccessLevel,
  211             self._helper.update_access,
  212             self.server,
  213             self.share_name,
  214             access_rules,
  215             [],
  216             [])
  217 
  218     def test_update_access_delete_invalid_rule(self):
  219         delete_rules = [test_generic.get_fake_access_rule(
  220             'lala', 'fake_level', access_type='user'), ]
  221         self.mock_object(self._helper, '_sync_nfs_temp_and_perm_files')
  222         self._helper.update_access(self.server, self.share_name, [],
  223                                    [], delete_rules)
  224         self._helper._sync_nfs_temp_and_perm_files.assert_called_with(
  225             self.server)
  226 
  227     def test_get_host_list(self):
  228         fake_exportfs = ('/shares/share-1\n\t\t20.0.0.3\n'
  229                          '/shares/share-1\n\t\t20.0.0.6\n'
  230                          '/shares/share-2\n\t\t10.0.0.2\n'
  231                          '/shares/share-2\n\t\t10.0.0.5\n'
  232                          '/shares/share-3\n\t\t30.0.0.4\n'
  233                          '/shares/share-3\n\t\t30.0.0.7\n')
  234         expected = ['20.0.0.3', '20.0.0.6']
  235         result = self._helper.get_host_list(fake_exportfs, '/shares/share-1')
  236         self.assertEqual(expected, result)
  237 
  238     @ddt.data({"level": const.ACCESS_LEVEL_RW, "ip": "1.1.1.1",
  239                "expected": "1.1.1.1"},
  240               {"level": const.ACCESS_LEVEL_RO, "ip": "1.1.1.1",
  241                "expected": "1.1.1.1"},
  242               {"level": const.ACCESS_LEVEL_RW, "ip": "fd12:abcd::10",
  243                "expected": "[fd12:abcd::10]"},
  244               {"level": const.ACCESS_LEVEL_RO, "ip": "fd12:abcd::10",
  245                "expected": "[fd12:abcd::10]"})
  246     @ddt.unpack
  247     def test_update_access_recovery_mode(self, level, ip, expected):
  248         expected_mount_options = '%s,no_subtree_check,no_root_squash'
  249         access_rules = [test_generic.get_fake_access_rule(
  250             ip, level), ]
  251         self.mock_object(self._helper, '_sync_nfs_temp_and_perm_files')
  252         self.mock_object(self._helper, 'get_host_list',
  253                          mock.Mock(return_value=[ip]))
  254         self._helper.update_access(self.server, self.share_name, access_rules,
  255                                    [], [])
  256         local_path = os.path.join(CONF.share_mount_path, self.share_name)
  257         self._ssh_exec.assert_has_calls([
  258             mock.call(self.server, ['sudo', 'exportfs']),
  259             mock.call(
  260                 self.server, ['sudo', 'exportfs', '-u',
  261                               ':'.join([expected,
  262                                         local_path])]),
  263             mock.call(self.server, ['sudo', 'exportfs', '-o',
  264                                     expected_mount_options % level,
  265                                     ':'.join([expected, local_path])]),
  266         ])
  267         self._helper._sync_nfs_temp_and_perm_files.assert_called_with(
  268             self.server)
  269 
  270     def test_sync_nfs_temp_and_perm_files(self):
  271         self._helper._sync_nfs_temp_and_perm_files(self.server)
  272         self._helper._ssh_exec.assert_has_calls(
  273             [mock.call(self.server, mock.ANY) for i in range(1)])
  274 
  275     @ddt.data('/foo/bar', '5.6.7.8:/bar/quuz', '5.6.7.9:/foo/quuz',
  276               '[1001::1001]:/foo/bar', '[1001::1000]/:124:/foo/bar')
  277     def test_get_exports_for_share_single_ip(self, export_location):
  278         server = dict(public_address='1.2.3.4')
  279 
  280         result = self._helper.get_exports_for_share(server, export_location)
  281 
  282         path = export_location.split(':')[-1]
  283         expected_export_locations = [
  284             {"is_admin_only": False,
  285              "path": "%s:%s" % (server["public_address"], path),
  286              "metadata": {"export_location_metadata_example": "example"}}
  287         ]
  288         self.assertEqual(expected_export_locations, result)
  289 
  290     @ddt.data('/foo/bar', '5.6.7.8:/bar/quuz', '5.6.7.9:/foo/quuz')
  291     def test_get_exports_for_share_multi_ip(self, export_location):
  292         server = dict(public_addresses=['1.2.3.4', '1.2.3.5'])
  293 
  294         result = self._helper.get_exports_for_share(server, export_location)
  295 
  296         path = export_location.split(':')[-1]
  297         expected_export_locations = list(map(
  298             lambda addr: {
  299                 "is_admin_only": False,
  300                 "path": "%s:%s" % (addr, path),
  301                 "metadata": {"export_location_metadata_example": "example"}
  302             },
  303             server['public_addresses'])
  304         )
  305         self.assertEqual(expected_export_locations, result)
  306 
  307     @ddt.data(
  308         {'public_address_with_suffix': 'foo'},
  309         {'with_prefix_public_address': 'bar'},
  310         {'with_prefix_public_address_and_with_suffix': 'quuz'}, {})
  311     def test_get_exports_for_share_with_error(self, server):
  312         export_location = '1.2.3.4:/foo/bar'
  313 
  314         self.assertRaises(
  315             exception.ManilaException,
  316             self._helper.get_exports_for_share, server, export_location)
  317 
  318     @ddt.data('/foo/bar', '5.6.7.8:/foo/bar', '5.6.7.88:fake:/foo/bar',
  319               '[1001::1002]:/foo/bar', '[1001::1000]/124:/foo/bar')
  320     def test_get_share_path_by_export_location(self, export_location):
  321         result = self._helper.get_share_path_by_export_location(
  322             dict(), export_location)
  323 
  324         self.assertEqual('/foo/bar', result)
  325 
  326     @ddt.data(
  327         ('/shares/fake_share1\n\t\t1.1.1.10\n'
  328          '/shares/fake_share2\n\t\t1.1.1.16\n'
  329          '/mnt/fake_share1 1.1.1.11', False),
  330         ('/shares/fake_share_name\n\t\t1.1.1.10\n'
  331          '/shares/fake_share_name\n\t\t1.1.1.16\n'
  332          '/mnt/fake_share1\n\t\t1.1.1.11', True),
  333         ('/mnt/fake_share_name\n\t\t1.1.1.11\n'
  334          '/shares/fake_share_name\n\t\t1.1.1.10\n'
  335          '/shares/fake_share_name\n\t\t1.1.1.16\n', True))
  336     @ddt.unpack
  337     def test_disable_access_for_maintenance(self, output, hosts_match):
  338         fake_maintenance_path = "fake.path"
  339         self._helper.configuration.share_mount_path = '/shares'
  340         local_path = os.path.join(self._helper.configuration.share_mount_path,
  341                                   self.share_name)
  342 
  343         def fake_ssh_exec(*args, **kwargs):
  344             if 'exportfs' in args[1] and '-u' not in args[1]:
  345                 return output, ''
  346             else:
  347                 return '', ''
  348 
  349         self.mock_object(self._helper, '_ssh_exec',
  350                          mock.Mock(side_effect=fake_ssh_exec))
  351 
  352         self.mock_object(self._helper, '_sync_nfs_temp_and_perm_files')
  353         self.mock_object(self._helper, '_get_maintenance_file_path',
  354                          mock.Mock(return_value=fake_maintenance_path))
  355 
  356         self._helper.disable_access_for_maintenance(
  357             self.server, self.share_name)
  358 
  359         self._helper._ssh_exec.assert_any_call(
  360             self.server,
  361             ['cat', const.NFS_EXPORTS_FILE,
  362              '|', 'grep', self.share_name,
  363              '|', 'sudo', 'tee', fake_maintenance_path]
  364         )
  365         self._helper._ssh_exec.assert_has_calls([
  366             mock.call(self.server, ['sudo', 'exportfs']),
  367         ])
  368 
  369         if hosts_match:
  370             self._helper._ssh_exec.assert_has_calls([
  371                 mock.call(self.server, ['sudo', 'exportfs', '-u',
  372                                         ':'.join(['1.1.1.10', local_path])]),
  373                 mock.call(self.server, ['sudo', 'exportfs', '-u',
  374                                         ':'.join(['1.1.1.16', local_path])]),
  375             ])
  376 
  377         self._helper._sync_nfs_temp_and_perm_files.assert_called_once_with(
  378             self.server
  379         )
  380 
  381     def test_restore_access_after_maintenance(self):
  382         fake_maintenance_path = "fake.path"
  383         self.mock_object(self._helper, '_get_maintenance_file_path',
  384                          mock.Mock(return_value=fake_maintenance_path))
  385         self.mock_object(self._helper, '_ssh_exec')
  386 
  387         self._helper.restore_access_after_maintenance(
  388             self.server, self.share_name)
  389 
  390         self._helper._ssh_exec.assert_called_once_with(
  391             self.server,
  392             ['cat', fake_maintenance_path,
  393              '|', 'sudo', 'tee', '-a', const.NFS_EXPORTS_FILE,
  394              '&&', 'sudo', 'exportfs', '-r', '&&', 'sudo', 'rm', '-f',
  395              fake_maintenance_path]
  396         )
  397 
  398 
  399 @ddt.ddt
  400 class CIFSHelperIPAccessTestCase(test.TestCase):
  401     """Test case for CIFS helper with IP access."""
  402 
  403     def setUp(self):
  404         super(CIFSHelperIPAccessTestCase, self).setUp()
  405         self.server_details = {'instance_id': 'fake',
  406                                'public_address': '1.2.3.4', }
  407         self.share_name = 'fake_share_name'
  408         self.fake_conf = manila.share.configuration.Configuration(None)
  409         self._ssh_exec = mock.Mock(return_value=('', ''))
  410         self._execute = mock.Mock(return_value=('', ''))
  411         self._helper = helpers.CIFSHelperIPAccess(self._execute,
  412                                                   self._ssh_exec,
  413                                                   self.fake_conf)
  414         self.access = dict(
  415             access_level=const.ACCESS_LEVEL_RW,
  416             access_type='ip',
  417             access_to='1.1.1.1')
  418 
  419     def test_init_helper(self):
  420         self._helper.init_helper(self.server_details)
  421         self._helper._ssh_exec.assert_called_once_with(
  422             self.server_details,
  423             ['sudo', 'net', 'conf', 'list'],
  424         )
  425 
  426     def test_create_export_share_does_not_exist(self):
  427         def fake_ssh_exec(*args, **kwargs):
  428             if 'showshare' in args[1]:
  429                 raise exception.ProcessExecutionError()
  430             else:
  431                 return '', ''
  432 
  433         self.mock_object(self._helper, '_ssh_exec',
  434                          mock.Mock(side_effect=fake_ssh_exec))
  435 
  436         ret = self._helper.create_exports(self.server_details, self.share_name)
  437 
  438         expected_location = [{
  439             "is_admin_only": False,
  440             "path": "\\\\%s\\%s" % (
  441                 self.server_details['public_address'], self.share_name),
  442             "metadata": {"export_location_metadata_example": "example"}
  443         }]
  444         self.assertEqual(expected_location, ret)
  445         share_path = os.path.join(
  446             self._helper.configuration.share_mount_path,
  447             self.share_name)
  448         self._helper._ssh_exec.assert_has_calls([
  449             mock.call(
  450                 self.server_details,
  451                 ['sudo', 'net', 'conf', 'showshare', self.share_name, ]
  452             ),
  453             mock.call(
  454                 self.server_details,
  455                 [
  456                     'sudo', 'net', 'conf', 'addshare', self.share_name,
  457                     share_path, 'writeable=y', 'guest_ok=y',
  458                 ]
  459             ),
  460             mock.call(self.server_details, mock.ANY),
  461         ])
  462 
  463     def test_create_export_share_does_not_exist_exception(self):
  464 
  465         self.mock_object(self._helper, '_ssh_exec',
  466                          mock.Mock(
  467                              side_effect=[exception.ProcessExecutionError(),
  468                                           Exception('')]
  469                          ))
  470 
  471         self.assertRaises(
  472             exception.ManilaException, self._helper.create_exports,
  473             self.server_details, self.share_name)
  474 
  475     def test_create_exports_share_exist_recreate_true(self):
  476         ret = self._helper.create_exports(
  477             self.server_details, self.share_name, recreate=True)
  478 
  479         expected_location = [{
  480             "is_admin_only": False,
  481             "path": "\\\\%s\\%s" % (
  482                 self.server_details['public_address'], self.share_name),
  483             "metadata": {"export_location_metadata_example": "example"}
  484         }]
  485         self.assertEqual(expected_location, ret)
  486         share_path = os.path.join(
  487             self._helper.configuration.share_mount_path,
  488             self.share_name)
  489         self._helper._ssh_exec.assert_has_calls([
  490             mock.call(
  491                 self.server_details,
  492                 ['sudo', 'net', 'conf', 'showshare', self.share_name, ]
  493             ),
  494             mock.call(
  495                 self.server_details,
  496                 ['sudo', 'net', 'conf', 'delshare', self.share_name, ]
  497             ),
  498             mock.call(
  499                 self.server_details,
  500                 [
  501                     'sudo', 'net', 'conf', 'addshare', self.share_name,
  502                     share_path, 'writeable=y', 'guest_ok=y',
  503                 ]
  504             ),
  505             mock.call(self.server_details, mock.ANY),
  506         ])
  507 
  508     def test_create_export_share_exist_recreate_false(self):
  509         self.assertRaises(
  510             exception.ShareBackendException,
  511             self._helper.create_exports,
  512             self.server_details,
  513             self.share_name,
  514             recreate=False,
  515         )
  516         self._helper._ssh_exec.assert_has_calls([
  517             mock.call(
  518                 self.server_details,
  519                 ['sudo', 'net', 'conf', 'showshare', self.share_name, ]
  520             ),
  521         ])
  522 
  523     def test_remove_exports(self):
  524         self._helper.remove_exports(self.server_details, self.share_name)
  525 
  526         self._helper._ssh_exec.assert_called_once_with(
  527             self.server_details,
  528             ['sudo', 'net', 'conf', 'delshare', self.share_name],
  529         )
  530 
  531     def test_remove_export_forcibly(self):
  532         delshare_command = ['sudo', 'net', 'conf', 'delshare', self.share_name]
  533 
  534         def fake_ssh_exec(*args, **kwargs):
  535             if delshare_command == args[1]:
  536                 raise exception.ProcessExecutionError()
  537             else:
  538                 return ('', '')
  539 
  540         self.mock_object(self._helper, '_ssh_exec',
  541                          mock.Mock(side_effect=fake_ssh_exec))
  542 
  543         self._helper.remove_exports(self.server_details, self.share_name)
  544 
  545         self._helper._ssh_exec.assert_has_calls([
  546             mock.call(
  547                 self.server_details,
  548                 ['sudo', 'net', 'conf', 'delshare', self.share_name],
  549             ),
  550             mock.call(
  551                 self.server_details,
  552                 ['sudo', 'smbcontrol', 'all', 'close-share', self.share_name],
  553             ),
  554         ])
  555 
  556     def test_update_access_wrong_access_level(self):
  557         access_rules = [test_generic.get_fake_access_rule(
  558             '2.2.2.2', const.ACCESS_LEVEL_RO), ]
  559         self.assertRaises(
  560             exception.InvalidShareAccessLevel,
  561             self._helper.update_access,
  562             self.server_details,
  563             self.share_name,
  564             access_rules,
  565             [],
  566             [])
  567 
  568     def test_update_access_wrong_access_type(self):
  569         access_rules = [test_generic.get_fake_access_rule(
  570             '2.2.2.2', const.ACCESS_LEVEL_RW, access_type='fake'), ]
  571         self.assertRaises(
  572             exception.InvalidShareAccess,
  573             self._helper.update_access,
  574             self.server_details,
  575             self.share_name,
  576             access_rules,
  577             [],
  578             [])
  579 
  580     def test_update_access(self):
  581         access_rules = [test_generic.get_fake_access_rule(
  582             '1.1.1.1', const.ACCESS_LEVEL_RW), ]
  583 
  584         self._helper.update_access(self.server_details, self.share_name,
  585                                    access_rules, [], [])
  586         self._helper._ssh_exec.assert_called_once_with(
  587             self.server_details, ['sudo', 'net', 'conf', 'setparm',
  588                                   self.share_name, 'hosts allow',
  589                                   '1.1.1.1'])
  590 
  591     def test_get_allow_hosts(self):
  592         self.mock_object(self._helper, '_ssh_exec',
  593                          mock.Mock(
  594                              return_value=('1.1.1.1 2.2.2.2 3.3.3.3', '')))
  595         expected = ['1.1.1.1', '2.2.2.2', '3.3.3.3']
  596         result = self._helper._get_allow_hosts(
  597             self.server_details, self.share_name)
  598         self.assertEqual(expected, result)
  599         cmd = ['sudo', 'net', 'conf', 'getparm', self.share_name,
  600                'hosts allow']
  601         self._helper._ssh_exec.assert_called_once_with(
  602             self.server_details, cmd)
  603 
  604     @ddt.data(
  605         '', '1.2.3.4:/nfs/like/export', '/1.2.3.4/foo', '\\1.2.3.4\\foo',
  606         '//1.2.3.4\\mixed_slashes_and_backslashes_one',
  607         '\\\\1.2.3.4/mixed_slashes_and_backslashes_two')
  608     def test__get_share_group_name_from_export_location(self, export_location):
  609         self.assertRaises(
  610             exception.InvalidShare,
  611             self._helper._get_share_group_name_from_export_location,
  612             export_location)
  613 
  614     @ddt.data('//5.6.7.8/foo', '\\\\5.6.7.8\\foo')
  615     def test_get_exports_for_share(self, export_location):
  616         server = dict(public_address='1.2.3.4')
  617         self.mock_object(
  618             self._helper, '_get_share_group_name_from_export_location',
  619             mock.Mock(side_effect=(
  620                 self._helper._get_share_group_name_from_export_location)))
  621 
  622         result = self._helper.get_exports_for_share(server, export_location)
  623 
  624         expected_export_location = [{
  625             "is_admin_only": False,
  626             "path": "\\\\%s\\foo" % server['public_address'],
  627             "metadata": {"export_location_metadata_example": "example"}
  628         }]
  629         self.assertEqual(expected_export_location, result)
  630         (self._helper._get_share_group_name_from_export_location.
  631             assert_called_once_with(export_location))
  632 
  633     @ddt.data(
  634         {'public_address_with_suffix': 'foo'},
  635         {'with_prefix_public_address': 'bar'},
  636         {'with_prefix_public_address_and_with_suffix': 'quuz'}, {})
  637     def test_get_exports_for_share_with_exception(self, server):
  638         export_location = '1.2.3.4:/foo/bar'
  639 
  640         self.assertRaises(
  641             exception.ManilaException,
  642             self._helper.get_exports_for_share, server, export_location)
  643 
  644     @ddt.data('//5.6.7.8/foo', '\\\\5.6.7.8\\foo')
  645     def test_get_share_path_by_export_location(self, export_location):
  646         fake_path = ' /bar/quuz\n '
  647         fake_server = dict()
  648         self.mock_object(
  649             self._helper, '_ssh_exec',
  650             mock.Mock(return_value=(fake_path, 'fake')))
  651         self.mock_object(
  652             self._helper, '_get_share_group_name_from_export_location',
  653             mock.Mock(side_effect=(
  654                 self._helper._get_share_group_name_from_export_location)))
  655 
  656         result = self._helper.get_share_path_by_export_location(
  657             fake_server, export_location)
  658 
  659         self.assertEqual('/bar/quuz', result)
  660         self._helper._ssh_exec.assert_called_once_with(
  661             fake_server, ['sudo', 'net', 'conf', 'getparm', 'foo', 'path'])
  662         (self._helper._get_share_group_name_from_export_location.
  663             assert_called_once_with(export_location))
  664 
  665     def test_disable_access_for_maintenance(self):
  666         allowed_hosts = ['test', 'test2']
  667         maintenance_path = os.path.join(
  668             self._helper.configuration.share_mount_path,
  669             "%s.maintenance" % self.share_name)
  670         self.mock_object(self._helper, '_set_allow_hosts')
  671         self.mock_object(self._helper, '_get_allow_hosts',
  672                          mock.Mock(return_value=allowed_hosts))
  673 
  674         self._helper.disable_access_for_maintenance(
  675             self.server_details, self.share_name)
  676 
  677         self._helper._get_allow_hosts.assert_called_once_with(
  678             self.server_details, self.share_name)
  679         self._helper._set_allow_hosts.assert_called_once_with(
  680             self.server_details, [], self.share_name)
  681         kickoff_user_cmd = ['sudo', 'smbstatus', '-S']
  682         self._helper._ssh_exec.assert_any_call(
  683             self.server_details, kickoff_user_cmd)
  684         valid_cmd = ['echo', "'test test2'", '|', 'sudo', 'tee',
  685                      maintenance_path]
  686         self._helper._ssh_exec.assert_any_call(
  687             self.server_details, valid_cmd)
  688 
  689     def test__kick_out_users_success(self):
  690         smbstatus_return = """Service      pid     machine       Connected at
  691 -------------------------------------------------------
  692 fake_share_name   1001   fake_machine1    Thu Sep 14 14:59:07 2017
  693 fake_share_name   1002   fake_machine2    Thu Sep 14 14:59:07 2017
  694 """
  695         self.mock_object(self._helper, '_ssh_exec', mock.Mock(
  696             side_effect=[(smbstatus_return, "fake_stderr"), ("fake", "fake")]))
  697         self._helper._kick_out_users(self.server_details, self.share_name)
  698         self._helper._ssh_exec.assert_any_call(
  699             self.server_details, ['sudo', 'smbstatus', '-S'])
  700         self._helper._ssh_exec.assert_any_call(
  701             self.server_details, ["sudo", "kill", "-15", "1001", "1002"])
  702 
  703     def test__kick_out_users_failed(self):
  704         smbstatus_return = """Service      pid     machine       Connected at
  705 -------------------------------------------------------
  706 fake line
  707 """
  708         self.mock_object(self._helper, '_ssh_exec', mock.Mock(
  709             return_value=(smbstatus_return, "fake_stderr")))
  710         self.assertRaises(exception.ShareBackendException,
  711                           self._helper._kick_out_users, self.server_details,
  712                           self.share_name)
  713 
  714     def test_restore_access_after_maintenance(self):
  715         fake_maintenance_path = "test.path"
  716         self.mock_object(self._helper, '_set_allow_hosts')
  717         self.mock_object(self._helper, '_get_maintenance_file_path',
  718                          mock.Mock(return_value=fake_maintenance_path))
  719         self.mock_object(self._helper, '_ssh_exec',
  720                          mock.Mock(side_effect=[("fake fake2", 0), "fake"]))
  721 
  722         self._helper.restore_access_after_maintenance(
  723             self.server_details, self.share_name)
  724 
  725         self._helper._set_allow_hosts.assert_called_once_with(
  726             self.server_details, ['fake', 'fake2'], self.share_name)
  727         self._helper._ssh_exec.assert_any_call(
  728             self.server_details, ['cat', fake_maintenance_path])
  729         self._helper._ssh_exec.assert_any_call(
  730             self.server_details, ['sudo', 'rm', '-f', fake_maintenance_path])
  731 
  732 
  733 @ddt.ddt
  734 class CIFSHelperUserAccessTestCase(test.TestCase):
  735     """Test case for CIFS helper with user access."""
  736     access_rw = dict(
  737         access_level=const.ACCESS_LEVEL_RW,
  738         access_type='user',
  739         access_to='manila-user')
  740     access_ro = dict(
  741         access_level=const.ACCESS_LEVEL_RO,
  742         access_type='user',
  743         access_to='manila-user')
  744 
  745     def setUp(self):
  746         super(CIFSHelperUserAccessTestCase, self).setUp()
  747         self.server_details = {'instance_id': 'fake',
  748                                'public_address': '1.2.3.4', }
  749         self.share_name = 'fake_share_name'
  750         self.fake_conf = manila.share.configuration.Configuration(None)
  751         self._ssh_exec = mock.Mock(return_value=('', ''))
  752         self._execute = mock.Mock(return_value=('', ''))
  753         self._helper = helpers.CIFSHelperUserAccess(
  754             self._execute, self._ssh_exec, self.fake_conf)
  755 
  756     def test_update_access_exception_type(self):
  757         access_rules = [test_generic.get_fake_access_rule(
  758             'user1', const.ACCESS_LEVEL_RW, access_type='ip')]
  759         self.assertRaises(exception.InvalidShareAccess,
  760                           self._helper.update_access, self.server_details,
  761                           self.share_name, access_rules, [], [])
  762 
  763     def test_update_access(self):
  764         access_list = [test_generic.get_fake_access_rule(
  765             'user1', const.ACCESS_LEVEL_RW, access_type='user'),
  766             test_generic.get_fake_access_rule(
  767                 'user2', const.ACCESS_LEVEL_RO, access_type='user')]
  768         self._helper.update_access(self.server_details, self.share_name,
  769                                    access_list, [], [])
  770 
  771         self._helper._ssh_exec.assert_has_calls([
  772             mock.call(self.server_details,
  773                       ['sudo', 'net', 'conf', 'setparm', self.share_name,
  774                        'valid users', 'user1']),
  775             mock.call(self.server_details,
  776                       ['sudo', 'net', 'conf', 'setparm', self.share_name,
  777                        'read list', 'user2'])
  778         ])
  779 
  780     def test_update_access_exception_level(self):
  781         access_rules = [test_generic.get_fake_access_rule(
  782             'user1', 'fake_level', access_type='user'), ]
  783         self.assertRaises(
  784             exception.InvalidShareAccessLevel,
  785             self._helper.update_access,
  786             self.server_details,
  787             self.share_name,
  788             access_rules,
  789             [],
  790             [])
  791 
  792 
  793 @ddt.ddt
  794 class NFSSynchronizedTestCase(test.TestCase):
  795 
  796     @helpers.nfs_synchronized
  797     def wrapped_method(self, server, share_name):
  798         return server['instance_id'] + share_name
  799 
  800     @ddt.data(
  801         ({'lock_name': 'FOO', 'instance_id': 'QUUZ'}, 'nfs-FOO'),
  802         ({'instance_id': 'QUUZ'}, 'nfs-QUUZ'),
  803     )
  804     @ddt.unpack
  805     def test_with_lock_name(self, server, expected_lock_name):
  806         share_name = 'fake_share_name'
  807         self.mock_object(
  808             helpers.utils, 'synchronized',
  809             mock.Mock(side_effect=helpers.utils.synchronized))
  810 
  811         result = self.wrapped_method(server, share_name)
  812 
  813         self.assertEqual(server['instance_id'] + share_name, result)
  814         helpers.utils.synchronized.assert_called_once_with(
  815             expected_lock_name, external=True)