"Fossies" - the Fresh Open Source Software Archive

Member "manila-8.1.4/manila/tests/share/drivers/test_helpers.py" (19 Nov 2020, 34944 Bytes) of package /linux/misc/openstack/manila-8.1.4.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_helpers.py": 8.1.3_vs_8.1.4.

    1 # Copyright 2015 Mirantis Inc.
    2 # All Rights Reserved.
    3 #
    4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 # not use this file except in compliance with the License. You may obtain
    6 # a copy of the License at
    7 #
    8 # http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 # Unless required by applicable law or agreed to in writing, software
   11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 # License for the specific language governing permissions and limitations
   14 # under the License.
   15 
   16 import os
   17 
   18 import ddt
   19 import mock
   20 from oslo_config import cfg
   21 
   22 from manila.common import constants as const
   23 from manila import exception
   24 import manila.share.configuration
   25 from manila.share.drivers import helpers
   26 from manila import test
   27 from manila.tests import fake_compute
   28 from manila.tests import fake_utils
   29 from manila.tests.share.drivers import test_generic
   30 
   31 
   32 CONF = cfg.CONF
   33 
   34 
   35 @ddt.ddt
   36 class NFSHelperTestCase(test.TestCase):
   37     """Test case for NFS helper."""
   38 
   39     def setUp(self):
   40         super(NFSHelperTestCase, self).setUp()
   41         fake_utils.stub_out_utils_execute(self)
   42         self.fake_conf = manila.share.configuration.Configuration(None)
   43         self._ssh_exec = mock.Mock(return_value=('', ''))
   44         self._execute = mock.Mock(return_value=('', ''))
   45         self._helper = helpers.NFSHelper(self._execute, self._ssh_exec,
   46                                          self.fake_conf)
   47         ip = '10.254.0.3'
   48         self.server = fake_compute.FakeServer(
   49             ip=ip, public_address=ip, instance_id='fake_instance_id')
   50         self.share_name = 'fake_share_name'
   51 
   52     def test_init_helper(self):
   53 
   54         # mocks
   55         self.mock_object(
   56             self._helper, '_ssh_exec',
   57             mock.Mock(side_effect=exception.ProcessExecutionError(
   58                 stderr='command not found')))
   59 
   60         # run
   61         self.assertRaises(exception.ManilaException,
   62                           self._helper.init_helper, self.server)
   63 
   64         # asserts
   65         self._helper._ssh_exec.assert_called_once_with(
   66             self.server, ['sudo', 'exportfs'])
   67 
   68     def test_init_helper_log(self):
   69 
   70         # mocks
   71         self.mock_object(
   72             self._helper, '_ssh_exec',
   73             mock.Mock(side_effect=exception.ProcessExecutionError(
   74                 stderr='fake')))
   75 
   76         # run
   77         self._helper.init_helper(self.server)
   78 
   79         # asserts
   80         self._helper._ssh_exec.assert_called_once_with(
   81             self.server, ['sudo', 'exportfs'])
   82 
   83     @ddt.data(
   84         {"server": {"public_address": "1.2.3.4"}, "version": 4},
   85         {"server": {"public_address": "1001::1002"}, "version": 6},
   86         {"server": {"public_address": "1.2.3.4", "admin_ip": "5.6.7.8"},
   87          "version": 4},
   88         {"server": {"public_address": "1.2.3.4", "ip": "9.10.11.12"},
   89          "version": 4},
   90         {"server": {"public_address": "1001::1001", "ip": "1001::1002"},
   91          "version": 6},
   92         {"server": {"public_address": "1001::1002", "admin_ip": "1001::1002"},
   93          "version": 6},
   94         {"server": {"public_addresses": ["1001::1002"]}, "version": 6},
   95         {"server": {"public_addresses": ["1.2.3.4", "1001::1002"]},
   96          "version": {"1.2.3.4": 4, "1001::1002": 6}},
   97     )
   98     @ddt.unpack
   99     def test_create_exports(self, server, version):
  100         result = self._helper.create_exports(server, self.share_name)
  101 
  102         expected_export_locations = []
  103         path = os.path.join(CONF.share_mount_path, self.share_name)
  104         service_address = server.get("admin_ip", server.get("ip"))
  105         version_copy = version
  106 
  107         def convert_address(address, version):
  108             if version == 4:
  109                 return address
  110             return "[%s]" % address
  111 
  112         if 'public_addresses' in server:
  113             pairs = list(map(lambda addr: (addr, False),
  114                              server['public_addresses']))
  115         else:
  116             pairs = [(server['public_address'], False)]
  117 
  118         service_address = server.get("admin_ip", server.get("ip"))
  119         if service_address:
  120             pairs.append((service_address, True))
  121 
  122         for ip, is_admin in pairs:
  123             if isinstance(version_copy, dict):
  124                 version = version_copy.get(ip)
  125 
  126             expected_export_locations.append({
  127                 "path": "%s:%s" % (convert_address(ip, version), path),
  128                 "is_admin_only": is_admin,
  129                 "metadata": {
  130                     "export_location_metadata_example": "example",
  131                 },
  132             })
  133         self.assertEqual(expected_export_locations, result)
  134 
  135     @ddt.data(const.ACCESS_LEVEL_RW, const.ACCESS_LEVEL_RO)
  136     def test_update_access(self, access_level):
  137         expected_mount_options = '%s,no_subtree_check,no_root_squash'
  138         self.mock_object(self._helper, '_sync_nfs_temp_and_perm_files')
  139         local_path = os.path.join(CONF.share_mount_path, self.share_name)
  140         exec_result = ' '.join([local_path, '2.2.2.3'])
  141         self.mock_object(self._helper, '_ssh_exec',
  142                          mock.Mock(return_value=(exec_result, '')))
  143         access_rules = [
  144             test_generic.get_fake_access_rule('1.1.1.1', access_level),
  145             test_generic.get_fake_access_rule('2.2.2.2', access_level),
  146             test_generic.get_fake_access_rule('2.2.2.3', access_level)]
  147         add_rules = [
  148             test_generic.get_fake_access_rule('2.2.2.2', access_level),
  149             test_generic.get_fake_access_rule('2.2.2.3', access_level),
  150             test_generic.get_fake_access_rule('5.5.5.0/24', access_level)]
  151         delete_rules = [
  152             test_generic.get_fake_access_rule('3.3.3.3', access_level),
  153             test_generic.get_fake_access_rule('4.4.4.4', access_level, 'user'),
  154             test_generic.get_fake_access_rule('0.0.0.0/0', access_level)]
  155         self._helper.update_access(self.server, self.share_name, access_rules,
  156                                    add_rules=add_rules,
  157                                    delete_rules=delete_rules)
  158         local_path = os.path.join(CONF.share_mount_path, self.share_name)
  159         self._helper._ssh_exec.assert_has_calls([
  160             mock.call(self.server, ['sudo', 'exportfs']),
  161             mock.call(self.server, ['sudo', 'exportfs', '-u',
  162                                     ':'.join(['3.3.3.3', local_path])]),
  163             mock.call(self.server, ['sudo', 'exportfs', '-u',
  164                                     ':'.join(['*',
  165                                               local_path])]),
  166             mock.call(self.server, ['sudo', 'exportfs', '-o',
  167                                     expected_mount_options % access_level,
  168                                     ':'.join(['2.2.2.2', local_path])]),
  169             mock.call(self.server, ['sudo', 'exportfs', '-o',
  170                                     expected_mount_options % access_level,
  171                                     ':'.join(['5.5.5.0/24',
  172                                               local_path])]),
  173         ])
  174         self._helper._sync_nfs_temp_and_perm_files.assert_has_calls([
  175             mock.call(self.server), mock.call(self.server)])
  176 
  177     @ddt.data({'access': '10.0.0.1', 'result': '10.0.0.1'},
  178               {'access': '10.0.0.1/32', 'result': '10.0.0.1'},
  179               {'access': '10.0.0.0/24', 'result': '10.0.0.0/24'},
  180               {'access': '1001::1001', 'result': '[1001::1001]'},
  181               {'access': '1001::1000/128', 'result': '[1001::1000]'},
  182               {'access': '1001::1000/124', 'result': '[1001::1000]/124'})
  183     @ddt.unpack
  184     def test__get_parsed_address_or_cidr(self, access, result):
  185         self.assertEqual(result,
  186                          self._helper._get_parsed_address_or_cidr(access))
  187 
  188     @ddt.data('10.0.0.265', '10.0.0.1/33', '1001::10069', '1001::1000/129')
  189     def test__get_parsed_address_or_cidr_with_invalid_access(self, access):
  190         self.assertRaises(ValueError,
  191                           self._helper._get_parsed_address_or_cidr,
  192                           access)
  193 
  194     def test_update_access_invalid_type(self):
  195         access_rules = [test_generic.get_fake_access_rule(
  196             '2.2.2.2', const.ACCESS_LEVEL_RW, access_type='fake'), ]
  197         self.assertRaises(
  198             exception.InvalidShareAccess,
  199             self._helper.update_access,
  200             self.server,
  201             self.share_name,
  202             access_rules,
  203             [],
  204             [])
  205 
  206     def test_update_access_invalid_level(self):
  207         access_rules = [test_generic.get_fake_access_rule(
  208             '2.2.2.2', 'fake_level', access_type='ip'), ]
  209         self.assertRaises(
  210             exception.InvalidShareAccessLevel,
  211             self._helper.update_access,
  212             self.server,
  213             self.share_name,
  214             access_rules,
  215             [],
  216             [])
  217 
  218     @ddt.data({'access_to': 'lala', 'access_type': 'user'},
  219               {'access_to': '203.0.113.29'},
  220               {'access_to': '2001:0DB8:7d18:c63e:5f0a:871f:83b8:d244',
  221                'access_level': 'ro'})
  222     @ddt.unpack
  223     def test_update_access_delete_invalid_rule(
  224             self, access_to, access_level='rw', access_type='ip'):
  225         mount_path = '%s:/shares/%s' % (access_to, self.share_name)
  226         if access_type == 'ip':
  227             self._helper._get_parsed_address_or_cidr = mock.Mock(
  228                 return_value=access_to)
  229             not_found_msg = (
  230                 "exportfs: Could not find '%s' to unexport.\n" % mount_path
  231             )
  232             exc = exception.ProcessExecutionError
  233             self.mock_object(
  234                 self._helper,
  235                 '_ssh_exec',
  236                 mock.Mock(side_effect=[(0, 0), exc(stderr=not_found_msg)]))
  237 
  238         delete_rules = [
  239             test_generic.get_fake_access_rule(access_to,
  240                                               access_level,
  241                                               access_type),
  242         ]
  243         self.mock_object(self._helper, '_sync_nfs_temp_and_perm_files')
  244 
  245         self._helper.update_access(self.server, self.share_name, [],
  246                                    [], delete_rules)
  247 
  248         if access_type == 'ip':
  249             self._helper._ssh_exec.assert_has_calls([
  250                 mock.call(self.server, ['sudo', 'exportfs']),
  251                 mock.call(self.server,
  252                           ['sudo', 'exportfs', '-u', mount_path])])
  253         self._helper._sync_nfs_temp_and_perm_files.assert_called_with(
  254             self.server)
  255 
  256     def test_get_host_list(self):
  257         fake_exportfs = ('/shares/share-1\n\t\t20.0.0.3\n'
  258                          '/shares/share-1\n\t\t20.0.0.6\n'
  259                          '/shares/share-2\n\t\t10.0.0.2\n'
  260                          '/shares/share-2\n\t\t10.0.0.5\n'
  261                          '/shares/share-3\n\t\t30.0.0.4\n'
  262                          '/shares/share-3\n\t\t30.0.0.7\n')
  263         expected = ['20.0.0.3', '20.0.0.6']
  264         result = self._helper.get_host_list(fake_exportfs, '/shares/share-1')
  265         self.assertEqual(expected, result)
  266 
  267     @ddt.data({"level": const.ACCESS_LEVEL_RW, "ip": "1.1.1.1",
  268                "expected": "1.1.1.1"},
  269               {"level": const.ACCESS_LEVEL_RO, "ip": "1.1.1.1",
  270                "expected": "1.1.1.1"},
  271               {"level": const.ACCESS_LEVEL_RW, "ip": "fd12:abcd::10",
  272                "expected": "[fd12:abcd::10]"},
  273               {"level": const.ACCESS_LEVEL_RO, "ip": "fd12:abcd::10",
  274                "expected": "[fd12:abcd::10]"})
  275     @ddt.unpack
  276     def test_update_access_recovery_mode(self, level, ip, expected):
  277         expected_mount_options = '%s,no_subtree_check,no_root_squash'
  278         access_rules = [test_generic.get_fake_access_rule(
  279             ip, level), ]
  280         self.mock_object(self._helper, '_sync_nfs_temp_and_perm_files')
  281         self.mock_object(self._helper, 'get_host_list',
  282                          mock.Mock(return_value=[ip]))
  283         self._helper.update_access(self.server, self.share_name, access_rules,
  284                                    [], [])
  285         local_path = os.path.join(CONF.share_mount_path, self.share_name)
  286         self._ssh_exec.assert_has_calls([
  287             mock.call(self.server, ['sudo', 'exportfs']),
  288             mock.call(
  289                 self.server, ['sudo', 'exportfs', '-u',
  290                               ':'.join([expected,
  291                                         local_path])]),
  292             mock.call(self.server, ['sudo', 'exportfs', '-o',
  293                                     expected_mount_options % level,
  294                                     ':'.join([expected, local_path])]),
  295         ])
  296         self._helper._sync_nfs_temp_and_perm_files.assert_called_with(
  297             self.server)
  298 
  299     def test_sync_nfs_temp_and_perm_files(self):
  300         self._helper._sync_nfs_temp_and_perm_files(self.server)
  301         self._helper._ssh_exec.assert_has_calls(
  302             [mock.call(self.server, mock.ANY) for i in range(1)])
  303 
  304     @ddt.data('/foo/bar', '5.6.7.8:/bar/quuz', '5.6.7.9:/foo/quuz',
  305               '[1001::1001]:/foo/bar', '[1001::1000]/:124:/foo/bar')
  306     def test_get_exports_for_share_single_ip(self, export_location):
  307         server = dict(public_address='1.2.3.4')
  308 
  309         result = self._helper.get_exports_for_share(server, export_location)
  310 
  311         path = export_location.split(':')[-1]
  312         expected_export_locations = [
  313             {"is_admin_only": False,
  314              "path": "%s:%s" % (server["public_address"], path),
  315              "metadata": {"export_location_metadata_example": "example"}}
  316         ]
  317         self.assertEqual(expected_export_locations, result)
  318 
  319     @ddt.data('/foo/bar', '5.6.7.8:/bar/quuz', '5.6.7.9:/foo/quuz')
  320     def test_get_exports_for_share_multi_ip(self, export_location):
  321         server = dict(public_addresses=['1.2.3.4', '1.2.3.5'])
  322 
  323         result = self._helper.get_exports_for_share(server, export_location)
  324 
  325         path = export_location.split(':')[-1]
  326         expected_export_locations = list(map(
  327             lambda addr: {
  328                 "is_admin_only": False,
  329                 "path": "%s:%s" % (addr, path),
  330                 "metadata": {"export_location_metadata_example": "example"}
  331             },
  332             server['public_addresses'])
  333         )
  334         self.assertEqual(expected_export_locations, result)
  335 
  336     @ddt.data(
  337         {'public_address_with_suffix': 'foo'},
  338         {'with_prefix_public_address': 'bar'},
  339         {'with_prefix_public_address_and_with_suffix': 'quuz'}, {})
  340     def test_get_exports_for_share_with_error(self, server):
  341         export_location = '1.2.3.4:/foo/bar'
  342 
  343         self.assertRaises(
  344             exception.ManilaException,
  345             self._helper.get_exports_for_share, server, export_location)
  346 
  347     @ddt.data('/foo/bar', '5.6.7.8:/foo/bar', '5.6.7.88:fake:/foo/bar',
  348               '[1001::1002]:/foo/bar', '[1001::1000]/124:/foo/bar')
  349     def test_get_share_path_by_export_location(self, export_location):
  350         result = self._helper.get_share_path_by_export_location(
  351             dict(), export_location)
  352 
  353         self.assertEqual('/foo/bar', result)
  354 
  355     @ddt.data(
  356         ('/shares/fake_share1\n\t\t1.1.1.10\n'
  357          '/shares/fake_share2\n\t\t1.1.1.16\n'
  358          '/mnt/fake_share1 1.1.1.11', False),
  359         ('/shares/fake_share_name\n\t\t1.1.1.10\n'
  360          '/shares/fake_share_name\n\t\t1.1.1.16\n'
  361          '/mnt/fake_share1\n\t\t1.1.1.11', True),
  362         ('/mnt/fake_share_name\n\t\t1.1.1.11\n'
  363          '/shares/fake_share_name\n\t\t1.1.1.10\n'
  364          '/shares/fake_share_name\n\t\t1.1.1.16\n', True))
  365     @ddt.unpack
  366     def test_disable_access_for_maintenance(self, output, hosts_match):
  367         fake_maintenance_path = "fake.path"
  368         self._helper.configuration.share_mount_path = '/shares'
  369         local_path = os.path.join(self._helper.configuration.share_mount_path,
  370                                   self.share_name)
  371 
  372         def fake_ssh_exec(*args, **kwargs):
  373             if 'exportfs' in args[1] and '-u' not in args[1]:
  374                 return output, ''
  375             else:
  376                 return '', ''
  377 
  378         self.mock_object(self._helper, '_ssh_exec',
  379                          mock.Mock(side_effect=fake_ssh_exec))
  380 
  381         self.mock_object(self._helper, '_sync_nfs_temp_and_perm_files')
  382         self.mock_object(self._helper, '_get_maintenance_file_path',
  383                          mock.Mock(return_value=fake_maintenance_path))
  384 
  385         self._helper.disable_access_for_maintenance(
  386             self.server, self.share_name)
  387 
  388         self._helper._ssh_exec.assert_any_call(
  389             self.server,
  390             ['cat', const.NFS_EXPORTS_FILE,
  391              '|', 'grep', self.share_name,
  392              '|', 'sudo', 'tee', fake_maintenance_path]
  393         )
  394         self._helper._ssh_exec.assert_has_calls([
  395             mock.call(self.server, ['sudo', 'exportfs']),
  396         ])
  397 
  398         if hosts_match:
  399             self._helper._ssh_exec.assert_has_calls([
  400                 mock.call(self.server, ['sudo', 'exportfs', '-u',
  401                                         ':'.join(['1.1.1.10', local_path])]),
  402                 mock.call(self.server, ['sudo', 'exportfs', '-u',
  403                                         ':'.join(['1.1.1.16', local_path])]),
  404             ])
  405 
  406         self._helper._sync_nfs_temp_and_perm_files.assert_called_once_with(
  407             self.server
  408         )
  409 
  410     def test_restore_access_after_maintenance(self):
  411         fake_maintenance_path = "fake.path"
  412         self.mock_object(self._helper, '_get_maintenance_file_path',
  413                          mock.Mock(return_value=fake_maintenance_path))
  414         self.mock_object(self._helper, '_ssh_exec')
  415 
  416         self._helper.restore_access_after_maintenance(
  417             self.server, self.share_name)
  418 
  419         self._helper._ssh_exec.assert_called_once_with(
  420             self.server,
  421             ['cat', fake_maintenance_path,
  422              '|', 'sudo', 'tee', '-a', const.NFS_EXPORTS_FILE,
  423              '&&', 'sudo', 'exportfs', '-r', '&&', 'sudo', 'rm', '-f',
  424              fake_maintenance_path]
  425         )
  426 
  427 
  428 @ddt.ddt
  429 class CIFSHelperIPAccessTestCase(test.TestCase):
  430     """Test case for CIFS helper with IP access."""
  431 
  432     def setUp(self):
  433         super(CIFSHelperIPAccessTestCase, self).setUp()
  434         self.server_details = {'instance_id': 'fake',
  435                                'public_address': '1.2.3.4', }
  436         self.share_name = 'fake_share_name'
  437         self.fake_conf = manila.share.configuration.Configuration(None)
  438         self._ssh_exec = mock.Mock(return_value=('', ''))
  439         self._execute = mock.Mock(return_value=('', ''))
  440         self._helper = helpers.CIFSHelperIPAccess(self._execute,
  441                                                   self._ssh_exec,
  442                                                   self.fake_conf)
  443         self.access = dict(
  444             access_level=const.ACCESS_LEVEL_RW,
  445             access_type='ip',
  446             access_to='1.1.1.1')
  447 
  448     def test_init_helper(self):
  449         self._helper.init_helper(self.server_details)
  450         self._helper._ssh_exec.assert_called_once_with(
  451             self.server_details,
  452             ['sudo', 'net', 'conf', 'list'],
  453         )
  454 
  455     def test_create_export_share_does_not_exist(self):
  456         def fake_ssh_exec(*args, **kwargs):
  457             if 'showshare' in args[1]:
  458                 raise exception.ProcessExecutionError()
  459             else:
  460                 return '', ''
  461 
  462         self.mock_object(self._helper, '_ssh_exec',
  463                          mock.Mock(side_effect=fake_ssh_exec))
  464 
  465         ret = self._helper.create_exports(self.server_details, self.share_name)
  466 
  467         expected_location = [{
  468             "is_admin_only": False,
  469             "path": "\\\\%s\\%s" % (
  470                 self.server_details['public_address'], self.share_name),
  471             "metadata": {"export_location_metadata_example": "example"}
  472         }]
  473         self.assertEqual(expected_location, ret)
  474         share_path = os.path.join(
  475             self._helper.configuration.share_mount_path,
  476             self.share_name)
  477         self._helper._ssh_exec.assert_has_calls([
  478             mock.call(
  479                 self.server_details,
  480                 ['sudo', 'net', 'conf', 'showshare', self.share_name, ]
  481             ),
  482             mock.call(
  483                 self.server_details,
  484                 [
  485                     'sudo', 'net', 'conf', 'addshare', self.share_name,
  486                     share_path, 'writeable=y', 'guest_ok=y',
  487                 ]
  488             ),
  489             mock.call(self.server_details, mock.ANY),
  490         ])
  491 
  492     def test_create_export_share_does_not_exist_exception(self):
  493 
  494         self.mock_object(self._helper, '_ssh_exec',
  495                          mock.Mock(
  496                              side_effect=[exception.ProcessExecutionError(),
  497                                           Exception('')]
  498                          ))
  499 
  500         self.assertRaises(
  501             exception.ManilaException, self._helper.create_exports,
  502             self.server_details, self.share_name)
  503 
  504     def test_create_exports_share_exist_recreate_true(self):
  505         ret = self._helper.create_exports(
  506             self.server_details, self.share_name, recreate=True)
  507 
  508         expected_location = [{
  509             "is_admin_only": False,
  510             "path": "\\\\%s\\%s" % (
  511                 self.server_details['public_address'], self.share_name),
  512             "metadata": {"export_location_metadata_example": "example"}
  513         }]
  514         self.assertEqual(expected_location, ret)
  515         share_path = os.path.join(
  516             self._helper.configuration.share_mount_path,
  517             self.share_name)
  518         self._helper._ssh_exec.assert_has_calls([
  519             mock.call(
  520                 self.server_details,
  521                 ['sudo', 'net', 'conf', 'showshare', self.share_name, ]
  522             ),
  523             mock.call(
  524                 self.server_details,
  525                 ['sudo', 'net', 'conf', 'delshare', self.share_name, ]
  526             ),
  527             mock.call(
  528                 self.server_details,
  529                 [
  530                     'sudo', 'net', 'conf', 'addshare', self.share_name,
  531                     share_path, 'writeable=y', 'guest_ok=y',
  532                 ]
  533             ),
  534             mock.call(self.server_details, mock.ANY),
  535         ])
  536 
  537     def test_create_export_share_exist_recreate_false(self):
  538         self.assertRaises(
  539             exception.ShareBackendException,
  540             self._helper.create_exports,
  541             self.server_details,
  542             self.share_name,
  543             recreate=False,
  544         )
  545         self._helper._ssh_exec.assert_has_calls([
  546             mock.call(
  547                 self.server_details,
  548                 ['sudo', 'net', 'conf', 'showshare', self.share_name, ]
  549             ),
  550         ])
  551 
  552     def test_remove_exports(self):
  553         self._helper.remove_exports(self.server_details, self.share_name)
  554 
  555         self._helper._ssh_exec.assert_called_once_with(
  556             self.server_details,
  557             ['sudo', 'net', 'conf', 'delshare', self.share_name],
  558         )
  559 
  560     def test_remove_export_forcibly(self):
  561         delshare_command = ['sudo', 'net', 'conf', 'delshare', self.share_name]
  562 
  563         def fake_ssh_exec(*args, **kwargs):
  564             if delshare_command == args[1]:
  565                 raise exception.ProcessExecutionError()
  566             else:
  567                 return ('', '')
  568 
  569         self.mock_object(self._helper, '_ssh_exec',
  570                          mock.Mock(side_effect=fake_ssh_exec))
  571 
  572         self._helper.remove_exports(self.server_details, self.share_name)
  573 
  574         self._helper._ssh_exec.assert_has_calls([
  575             mock.call(
  576                 self.server_details,
  577                 ['sudo', 'net', 'conf', 'delshare', self.share_name],
  578             ),
  579             mock.call(
  580                 self.server_details,
  581                 ['sudo', 'smbcontrol', 'all', 'close-share', self.share_name],
  582             ),
  583         ])
  584 
  585     def test_update_access_wrong_access_level(self):
  586         access_rules = [test_generic.get_fake_access_rule(
  587             '2.2.2.2', const.ACCESS_LEVEL_RO), ]
  588         self.assertRaises(
  589             exception.InvalidShareAccessLevel,
  590             self._helper.update_access,
  591             self.server_details,
  592             self.share_name,
  593             access_rules,
  594             [],
  595             [])
  596 
  597     def test_update_access_wrong_access_type(self):
  598         access_rules = [test_generic.get_fake_access_rule(
  599             '2.2.2.2', const.ACCESS_LEVEL_RW, access_type='fake'), ]
  600         self.assertRaises(
  601             exception.InvalidShareAccess,
  602             self._helper.update_access,
  603             self.server_details,
  604             self.share_name,
  605             access_rules,
  606             [],
  607             [])
  608 
  609     def test_update_access(self):
  610         access_rules = [test_generic.get_fake_access_rule(
  611             '1.1.1.1', const.ACCESS_LEVEL_RW), ]
  612 
  613         self._helper.update_access(self.server_details, self.share_name,
  614                                    access_rules, [], [])
  615         self._helper._ssh_exec.assert_called_once_with(
  616             self.server_details, ['sudo', 'net', 'conf', 'setparm',
  617                                   self.share_name, 'hosts allow',
  618                                   '1.1.1.1'])
  619 
  620     def test_get_allow_hosts(self):
  621         self.mock_object(self._helper, '_ssh_exec',
  622                          mock.Mock(
  623                              return_value=('1.1.1.1 2.2.2.2 3.3.3.3', '')))
  624         expected = ['1.1.1.1', '2.2.2.2', '3.3.3.3']
  625         result = self._helper._get_allow_hosts(
  626             self.server_details, self.share_name)
  627         self.assertEqual(expected, result)
  628         cmd = ['sudo', 'net', 'conf', 'getparm', self.share_name,
  629                'hosts allow']
  630         self._helper._ssh_exec.assert_called_once_with(
  631             self.server_details, cmd)
  632 
  633     @ddt.data(
  634         '', '1.2.3.4:/nfs/like/export', '/1.2.3.4/foo', '\\1.2.3.4\\foo',
  635         '//1.2.3.4\\mixed_slashes_and_backslashes_one',
  636         '\\\\1.2.3.4/mixed_slashes_and_backslashes_two')
  637     def test__get_share_group_name_from_export_location(self, export_location):
  638         self.assertRaises(
  639             exception.InvalidShare,
  640             self._helper._get_share_group_name_from_export_location,
  641             export_location)
  642 
  643     @ddt.data('//5.6.7.8/foo', '\\\\5.6.7.8\\foo')
  644     def test_get_exports_for_share(self, export_location):
  645         server = dict(public_address='1.2.3.4')
  646         self.mock_object(
  647             self._helper, '_get_share_group_name_from_export_location',
  648             mock.Mock(side_effect=(
  649                 self._helper._get_share_group_name_from_export_location)))
  650 
  651         result = self._helper.get_exports_for_share(server, export_location)
  652 
  653         expected_export_location = [{
  654             "is_admin_only": False,
  655             "path": "\\\\%s\\foo" % server['public_address'],
  656             "metadata": {"export_location_metadata_example": "example"}
  657         }]
  658         self.assertEqual(expected_export_location, result)
  659         (self._helper._get_share_group_name_from_export_location.
  660             assert_called_once_with(export_location))
  661 
  662     @ddt.data(
  663         {'public_address_with_suffix': 'foo'},
  664         {'with_prefix_public_address': 'bar'},
  665         {'with_prefix_public_address_and_with_suffix': 'quuz'}, {})
  666     def test_get_exports_for_share_with_exception(self, server):
  667         export_location = '1.2.3.4:/foo/bar'
  668 
  669         self.assertRaises(
  670             exception.ManilaException,
  671             self._helper.get_exports_for_share, server, export_location)
  672 
  673     @ddt.data('//5.6.7.8/foo', '\\\\5.6.7.8\\foo')
  674     def test_get_share_path_by_export_location(self, export_location):
  675         fake_path = ' /bar/quuz\n '
  676         fake_server = dict()
  677         self.mock_object(
  678             self._helper, '_ssh_exec',
  679             mock.Mock(return_value=(fake_path, 'fake')))
  680         self.mock_object(
  681             self._helper, '_get_share_group_name_from_export_location',
  682             mock.Mock(side_effect=(
  683                 self._helper._get_share_group_name_from_export_location)))
  684 
  685         result = self._helper.get_share_path_by_export_location(
  686             fake_server, export_location)
  687 
  688         self.assertEqual('/bar/quuz', result)
  689         self._helper._ssh_exec.assert_called_once_with(
  690             fake_server, ['sudo', 'net', 'conf', 'getparm', 'foo', 'path'])
  691         (self._helper._get_share_group_name_from_export_location.
  692             assert_called_once_with(export_location))
  693 
  694     def test_disable_access_for_maintenance(self):
  695         allowed_hosts = ['test', 'test2']
  696         maintenance_path = os.path.join(
  697             self._helper.configuration.share_mount_path,
  698             "%s.maintenance" % self.share_name)
  699         self.mock_object(self._helper, '_set_allow_hosts')
  700         self.mock_object(self._helper, '_get_allow_hosts',
  701                          mock.Mock(return_value=allowed_hosts))
  702 
  703         self._helper.disable_access_for_maintenance(
  704             self.server_details, self.share_name)
  705 
  706         self._helper._get_allow_hosts.assert_called_once_with(
  707             self.server_details, self.share_name)
  708         self._helper._set_allow_hosts.assert_called_once_with(
  709             self.server_details, [], self.share_name)
  710         kickoff_user_cmd = ['sudo', 'smbstatus', '-S']
  711         self._helper._ssh_exec.assert_any_call(
  712             self.server_details, kickoff_user_cmd)
  713         valid_cmd = ['echo', "'test test2'", '|', 'sudo', 'tee',
  714                      maintenance_path]
  715         self._helper._ssh_exec.assert_any_call(
  716             self.server_details, valid_cmd)
  717 
  718     def test__kick_out_users_success(self):
  719         smbstatus_return = """Service      pid     machine       Connected at
  720 -------------------------------------------------------
  721 fake_share_name   1001   fake_machine1    Thu Sep 14 14:59:07 2017
  722 fake_share_name   1002   fake_machine2    Thu Sep 14 14:59:07 2017
  723 """
  724         self.mock_object(self._helper, '_ssh_exec', mock.Mock(
  725             side_effect=[(smbstatus_return, "fake_stderr"), ("fake", "fake")]))
  726         self._helper._kick_out_users(self.server_details, self.share_name)
  727         self._helper._ssh_exec.assert_any_call(
  728             self.server_details, ['sudo', 'smbstatus', '-S'])
  729         self._helper._ssh_exec.assert_any_call(
  730             self.server_details, ["sudo", "kill", "-15", "1001", "1002"])
  731 
  732     def test__kick_out_users_failed(self):
  733         smbstatus_return = """Service      pid     machine       Connected at
  734 -------------------------------------------------------
  735 fake line
  736 """
  737         self.mock_object(self._helper, '_ssh_exec', mock.Mock(
  738             return_value=(smbstatus_return, "fake_stderr")))
  739         self.assertRaises(exception.ShareBackendException,
  740                           self._helper._kick_out_users, self.server_details,
  741                           self.share_name)
  742 
  743     def test_restore_access_after_maintenance(self):
  744         fake_maintenance_path = "test.path"
  745         self.mock_object(self._helper, '_set_allow_hosts')
  746         self.mock_object(self._helper, '_get_maintenance_file_path',
  747                          mock.Mock(return_value=fake_maintenance_path))
  748         self.mock_object(self._helper, '_ssh_exec',
  749                          mock.Mock(side_effect=[("fake fake2", 0), "fake"]))
  750 
  751         self._helper.restore_access_after_maintenance(
  752             self.server_details, self.share_name)
  753 
  754         self._helper._set_allow_hosts.assert_called_once_with(
  755             self.server_details, ['fake', 'fake2'], self.share_name)
  756         self._helper._ssh_exec.assert_any_call(
  757             self.server_details, ['cat', fake_maintenance_path])
  758         self._helper._ssh_exec.assert_any_call(
  759             self.server_details, ['sudo', 'rm', '-f', fake_maintenance_path])
  760 
  761 
  762 @ddt.ddt
  763 class CIFSHelperUserAccessTestCase(test.TestCase):
  764     """Test case for CIFS helper with user access."""
  765     access_rw = dict(
  766         access_level=const.ACCESS_LEVEL_RW,
  767         access_type='user',
  768         access_to='manila-user')
  769     access_ro = dict(
  770         access_level=const.ACCESS_LEVEL_RO,
  771         access_type='user',
  772         access_to='manila-user')
  773 
  774     def setUp(self):
  775         super(CIFSHelperUserAccessTestCase, self).setUp()
  776         self.server_details = {'instance_id': 'fake',
  777                                'public_address': '1.2.3.4', }
  778         self.share_name = 'fake_share_name'
  779         self.fake_conf = manila.share.configuration.Configuration(None)
  780         self._ssh_exec = mock.Mock(return_value=('', ''))
  781         self._execute = mock.Mock(return_value=('', ''))
  782         self._helper = helpers.CIFSHelperUserAccess(
  783             self._execute, self._ssh_exec, self.fake_conf)
  784 
  785     def test_update_access_exception_type(self):
  786         access_rules = [test_generic.get_fake_access_rule(
  787             'user1', const.ACCESS_LEVEL_RW, access_type='ip')]
  788         self.assertRaises(exception.InvalidShareAccess,
  789                           self._helper.update_access, self.server_details,
  790                           self.share_name, access_rules, [], [])
  791 
  792     def test_update_access(self):
  793         access_list = [test_generic.get_fake_access_rule(
  794             'user1', const.ACCESS_LEVEL_RW, access_type='user'),
  795             test_generic.get_fake_access_rule(
  796                 'user2', const.ACCESS_LEVEL_RO, access_type='user')]
  797         self._helper.update_access(self.server_details, self.share_name,
  798                                    access_list, [], [])
  799 
  800         self._helper._ssh_exec.assert_has_calls([
  801             mock.call(self.server_details,
  802                       ['sudo', 'net', 'conf', 'setparm', self.share_name,
  803                        'valid users', 'user1']),
  804             mock.call(self.server_details,
  805                       ['sudo', 'net', 'conf', 'setparm', self.share_name,
  806                        'read list', 'user2'])
  807         ])
  808 
  809     def test_update_access_exception_level(self):
  810         access_rules = [test_generic.get_fake_access_rule(
  811             'user1', 'fake_level', access_type='user'), ]
  812         self.assertRaises(
  813             exception.InvalidShareAccessLevel,
  814             self._helper.update_access,
  815             self.server_details,
  816             self.share_name,
  817             access_rules,
  818             [],
  819             [])
  820 
  821 
  822 @ddt.ddt
  823 class NFSSynchronizedTestCase(test.TestCase):
  824 
  825     @helpers.nfs_synchronized
  826     def wrapped_method(self, server, share_name):
  827         return server['instance_id'] + share_name
  828 
  829     @ddt.data(
  830         ({'lock_name': 'FOO', 'instance_id': 'QUUZ'}, 'nfs-FOO'),
  831         ({'instance_id': 'QUUZ'}, 'nfs-QUUZ'),
  832     )
  833     @ddt.unpack
  834     def test_with_lock_name(self, server, expected_lock_name):
  835         share_name = 'fake_share_name'
  836         self.mock_object(
  837             helpers.utils, 'synchronized',
  838             mock.Mock(side_effect=helpers.utils.synchronized))
  839 
  840         result = self.wrapped_method(server, share_name)
  841 
  842         self.assertEqual(server['instance_id'] + share_name, result)
  843         helpers.utils.synchronized.assert_called_once_with(
  844             expected_lock_name, external=True)