"Fossies" - the Fresh Open Source Software Archive

Member "cinder-13.0.7/cinder/tests/unit/volume/drivers/fusionstorage/test_dsware.py" (4 Oct 2019, 35768 Bytes) of package /linux/misc/openstack/cinder-13.0.7.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the last Fossies "Diffs" side-by-side code changes report for "test_dsware.py": 14.0.2_vs_15.0.0.

    1 # Copyright (c) 2013 - 2016 Huawei Technologies Co., Ltd.
    2 # All Rights Reserved.
    3 #
    4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 #    not use this file except in compliance with the License. You may obtain
    6 #    a copy of the License at
    7 #
    8 #         http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 #    Unless required by applicable law or agreed to in writing, software
   11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 #    License for the specific language governing permissions and limitations
   14 #    under the License.
   15 """
   16 Unit Tests for Huawei FusionStorage drivers.
   17 """
   18 
   19 import mock
   20 from oslo_config import cfg
   21 from oslo_service import loopingcall
   22 
   23 from cinder import context
   24 from cinder import exception
   25 from cinder.image import image_utils
   26 from cinder import test
   27 from cinder.tests.unit import fake_constants
   28 from cinder.tests.unit import fake_snapshot
   29 from cinder.tests.unit import fake_volume
   30 from cinder.volume import configuration as conf
   31 from cinder.volume.drivers.fusionstorage import dsware
   32 from cinder.volume.drivers.fusionstorage import fspythonapi
   33 
   34 
   35 test_volume = {'name': 'test_vol1',
   36                'size': 4,
   37                'volume_metadata': '',
   38                'host': 'host01@dsware',
   39                'instance_uuid': None,
   40                'provider_id': '127.0.0.1',
   41                'id': fake_constants.VOLUME_ID}
   42 
   43 test_src_volume = {'name': 'test_vol2',
   44                    'size': 4,
   45                    'status': 'available'}
   46 
   47 test_snapshot = {
   48     'name': 'test_snapshot1',
   49     'volume_id': fake_constants.VOLUME_ID,
   50     'volume_size': '4'}
   51 
   52 
   53 class FakeDSWAREDriver(dsware.DSWAREDriver):
   54     def __init__(self):
   55         configuration = conf.Configuration(
   56             [
   57                 cfg.StrOpt('fake'),
   58             ],
   59             None
   60         )
   61         super(FakeDSWAREDriver, self).__init__(configuration=configuration)
   62         self.dsware_client = fspythonapi.FSPythonApi()
   63         self.manage_ip = '127.0.0.1'
   64         self.pool_type = '1'
   65 
   66 
   67 class DSwareDriverTestCase(test.TestCase):
   68     def setUp(self):
   69         super(DSwareDriverTestCase, self).setUp()
   70         self.driver = FakeDSWAREDriver()
   71         self.context = context.get_admin_context()
   72         self.volume = fake_volume.fake_volume_obj(context=self.context,
   73                                                   **test_volume)
   74         self.scr_volume = fake_volume.fake_volume_obj(context=self.context,
   75                                                       **test_src_volume)
   76         self.snapshot = fake_snapshot.fake_snapshot_obj(context=self.context,
   77                                                         **test_snapshot)
   78 
   79     def test_private_get_dsware_manage_ip(self):
   80         retval = self.driver._get_dsware_manage_ip(self.volume)
   81         self.assertEqual('127.0.0.1', retval)
   82 
   83         test_volume_fail_dict = {'name': 'test_vol',
   84                                  'size': 4,
   85                                  'volume_metadata': '',
   86                                  'host': 'host01@dsware',
   87                                  'provider_id': None}
   88         test_volume_fail = fake_volume.fake_volume_obj(context=self.context,
   89                                                        **test_volume_fail_dict)
   90         self.assertRaises(exception.CinderException,
   91                           self.driver._get_dsware_manage_ip,
   92                           test_volume_fail)
   93 
   94     def test_private_get_poolid_from_host(self):
   95         retval = self.driver._get_poolid_from_host(
   96             'abc@fusionstorage_sas2copy#0')
   97         self.assertEqual('0', retval)
   98 
   99         retval = self.driver._get_poolid_from_host(
  100             'abc@fusionstorage_sas2copy@0')
  101         self.assertEqual(self.driver.pool_type, retval)
  102 
  103         retval = self.driver._get_poolid_from_host(None)
  104         self.assertEqual(self.driver.pool_type, retval)
  105 
  106     @mock.patch.object(fspythonapi.FSPythonApi, 'create_volume')
  107     @mock.patch.object(fspythonapi.FSPythonApi, 'query_dsware_version')
  108     @mock.patch.object(dsware.DSWAREDriver, '_get_poolid_from_host')
  109     def test_private_create_volume_old_version(self, mock_get_poolid,
  110                                                mock_query_dsware,
  111                                                mock_create_volume):
  112         # query_dsware_version return 1, old version
  113         mock_query_dsware.return_value = 1
  114         mock_create_volume.return_value = 0
  115         self.driver._create_volume(self.volume.name,
  116                                    self.volume.size,
  117                                    True,
  118                                    'abc@fusionstorage_sas2copy')
  119         mock_create_volume.assert_called_with(self.volume.name, 0,
  120                                               self.volume.size, 1)
  121 
  122         self.driver._create_volume(self.volume.name,
  123                                    self.volume.size,
  124                                    False,
  125                                    'abc@fusionstorage_sas2copy')
  126         mock_create_volume.assert_called_with(self.volume.name, 0,
  127                                               self.volume.size, 0)
  128 
  129     @mock.patch.object(fspythonapi.FSPythonApi, 'create_volume')
  130     @mock.patch.object(fspythonapi.FSPythonApi, 'query_dsware_version')
  131     @mock.patch.object(dsware.DSWAREDriver, '_get_poolid_from_host')
  132     def test_private_create_volume_new_version(self, mock_get_poolid,
  133                                                mock_query_dsware,
  134                                                mock_create_volume):
  135         # query_dsware_version return 0, new version
  136         mock_query_dsware.return_value = 0
  137         mock_get_poolid.return_value = 0
  138         mock_create_volume.return_value = 0
  139         self.driver._create_volume(self.volume.name,
  140                                    self.volume.size,
  141                                    True,
  142                                    'abcE@fusionstorage_sas2copy#0')
  143         mock_create_volume.assert_called_with(self.volume.name, 0,
  144                                               self.volume.size, 1)
  145 
  146         self.driver._create_volume(self.volume.name,
  147                                    self.volume.size,
  148                                    False,
  149                                    'abc@fusionstorage_sas2copy#0')
  150         mock_create_volume.assert_called_with(self.volume.name, 0,
  151                                               self.volume.size, 0)
  152 
  153         mock_query_dsware.return_value = 0
  154         mock_get_poolid.return_value = 1
  155         mock_create_volume.return_value = 0
  156         self.driver._create_volume(self.volume.name,
  157                                    self.volume.size,
  158                                    True,
  159                                    'abc@fusionstorage_sas2copy#1')
  160         mock_create_volume.assert_called_with(self.volume.name, 1,
  161                                               self.volume.size, 1)
  162 
  163         self.driver._create_volume(self.volume.name,
  164                                    self.volume.size,
  165                                    False,
  166                                    'abc@fusionstorage_sas2copy#1')
  167         mock_create_volume.assert_called_with(self.volume.name, 1,
  168                                               self.volume.size, 0)
  169 
  170     @mock.patch.object(fspythonapi.FSPythonApi, 'create_volume')
  171     @mock.patch.object(fspythonapi.FSPythonApi, 'query_dsware_version')
  172     @mock.patch.object(dsware.DSWAREDriver, '_get_poolid_from_host')
  173     def test_private_create_volume_query_version_fail(self, mock_get_poolid,
  174                                                       mock_query_dsware,
  175                                                       mock_create_volume):
  176         # query_dsware_version return 500015, query dsware version failed!
  177         mock_query_dsware.return_value = 500015
  178         self.assertRaises(exception.CinderException,
  179                           self.driver._create_volume,
  180                           self.volume.name,
  181                           self.volume.size,
  182                           True,
  183                           'abc@fusionstorage_sas2copy#0')
  184         self.assertRaises(exception.CinderException,
  185                           self.driver._create_volume,
  186                           self.volume.name,
  187                           self.volume.size,
  188                           False,
  189                           'abc@fusionstorage_sas2copy#0')
  190 
  191     @mock.patch.object(fspythonapi.FSPythonApi, 'create_volume')
  192     @mock.patch.object(fspythonapi.FSPythonApi, 'query_dsware_version')
  193     @mock.patch.object(dsware.DSWAREDriver, '_get_poolid_from_host')
  194     def test_private_create_volume_fail(self, mock_get_poolid,
  195                                         mock_query_dsware,
  196                                         mock_create_volume):
  197         mock_query_dsware.return_value = 1
  198         # create_volume return 1, create volume failed
  199         mock_create_volume.return_value = 1
  200         self.assertRaises(exception.CinderException,
  201                           self.driver._create_volume,
  202                           self.volume.name,
  203                           self.volume.size,
  204                           True,
  205                           'abc@fusionstorage_sas2copy#0')
  206         self.assertRaises(exception.CinderException,
  207                           self.driver._create_volume,
  208                           self.volume.name,
  209                           self.volume.size,
  210                           False,
  211                           'abc@fusionstorage_sas2copy#0')
  212 
  213     @mock.patch.object(dsware.DSWAREDriver, '_create_volume')
  214     @mock.patch.object(fspythonapi.FSPythonApi, 'get_manage_ip')
  215     def test_create_volume(self, mock_get_manage_ip, mock_create_volume):
  216         # success
  217         mock_get_manage_ip.return_value = self.driver.manage_ip
  218         retval = self.driver.create_volume(self.volume)
  219         self.assertEqual({"provider_id": self.driver.manage_ip},
  220                          retval)
  221 
  222         # failure
  223         mock_create_volume.side_effect = exception.CinderException(
  224             'DSWARE Create Volume failed!')
  225 
  226         self.assertRaises(exception.CinderException,
  227                           self.driver.create_volume,
  228                           self.volume)
  229 
  230     @mock.patch.object(fspythonapi.FSPythonApi, 'create_volume_from_snap')
  231     def test_private_create_volume_from_snap(self, mock_create_volume):
  232         mock_create_volume.side_effect = [0, 1]
  233         self.driver._create_volume_from_snap(self.volume.name,
  234                                              self.volume.size,
  235                                              self.snapshot.name)
  236         # failure
  237         self.assertRaises(exception.CinderException,
  238                           self.driver._create_volume_from_snap,
  239                           self.volume.name, self.volume.size,
  240                           self.snapshot.name)
  241 
  242     @mock.patch.object(fspythonapi.FSPythonApi, 'extend_volume')
  243     def test_extend_volume(self, mock_extend_volume):
  244         mock_extend_volume.return_value = 0
  245         self.driver.extend_volume(self.volume, 5)
  246 
  247         mock_extend_volume.return_value = 0
  248         self.assertRaises(exception.CinderException,
  249                           self.driver.extend_volume,
  250                           self.volume,
  251                           3)
  252 
  253         mock_extend_volume.return_value = 1
  254         self.assertRaises(exception.CinderException,
  255                           self.driver.extend_volume,
  256                           self.volume,
  257                           5)
  258 
  259     @mock.patch.object(dsware.DSWAREDriver, '_create_volume_from_snap')
  260     @mock.patch.object(fspythonapi.FSPythonApi, 'get_manage_ip')
  261     def test_create_volume_from_snap(self, mock_manage_ip, mock_create_vol):
  262         # success
  263         mock_manage_ip.return_value = self.driver.manage_ip
  264         retval = self.driver.create_volume_from_snapshot(self.volume,
  265                                                          self.snapshot)
  266         self.assertEqual({"provider_id": self.driver.manage_ip},
  267                          retval)
  268 
  269         # failure
  270         mock_create_vol.side_effect = exception.CinderException(
  271             'DSWARE:create volume from snap failed')
  272         self.assertRaises(exception.CinderException,
  273                           self.driver.create_volume_from_snapshot,
  274                           self.volume, self.snapshot)
  275 
  276     @mock.patch.object(fspythonapi.FSPythonApi, 'create_volume_from_volume')
  277     @mock.patch.object(fspythonapi.FSPythonApi, 'get_manage_ip')
  278     @mock.patch.object(dsware.DSWAREDriver,
  279                        '_wait_for_create_cloned_volume_finish_timer')
  280     def test_create_cloned_volume(self, mock_wait_finish,
  281                                   mock_get_manage_ip, mock_create_volume):
  282         # success
  283         mock_create_volume.return_value = None
  284         mock_get_manage_ip.return_value = self.driver.manage_ip
  285         mock_wait_finish.return_value = True
  286         retval = self.driver.create_cloned_volume(self.volume, self.scr_volume)
  287         self.assertEqual({"provider_id": "127.0.0.1"}, retval)
  288 
  289         # failure:create exception
  290         mock_create_volume.return_value = 500015
  291         self.assertRaises(exception.CinderException,
  292                           self.driver.create_cloned_volume,
  293                           self.volume, self.scr_volume)
  294         # failure:wait exception
  295         mock_create_volume.return_value = None
  296         mock_wait_finish.return_value = False
  297         self.assertRaises(exception.CinderException,
  298                           self.driver.create_cloned_volume,
  299                           self.volume, self.scr_volume)
  300 
  301     @mock.patch.object(fspythonapi.FSPythonApi, 'query_volume')
  302     def test_private_check_create_cloned_volume_finish(self,
  303                                                        mock_query_volume):
  304         query_result_done = {'result': 0, 'vol_name': 'vol1',
  305                              'father_name': 'vol1_father', 'status': '0',
  306                              'vol_size': '1024', 'real_size': '1024',
  307                              'pool_id': 'pool1', 'create_time': '01/01/2015'}
  308 
  309         query_result_doing = {'result': 0, 'vol_name': 'vol1',
  310                               'father_name': 'vol1_father', 'status': '6',
  311                               'vol_size': '1024', 'real_size': '1024',
  312                               'pool_id': 'pool1', 'create_time': '01/01/2015'}
  313 
  314         mock_query_volume.side_effect = [
  315             query_result_done, query_result_doing, query_result_doing]
  316 
  317         # success
  318         self.assertRaises(loopingcall.LoopingCallDone,
  319                           self.driver._check_create_cloned_volume_finish,
  320                           self.volume.name)
  321 
  322         # in the process of creating volume
  323         self.driver.count = self.driver.configuration.clone_volume_timeout - 1
  324         self.driver._check_create_cloned_volume_finish(self.volume.name)
  325         self.assertEqual(self.driver.configuration.clone_volume_timeout,
  326                          self.driver.count)
  327 
  328         # timeout
  329         self.driver.count = self.driver.configuration.clone_volume_timeout
  330         self.assertRaises(loopingcall.LoopingCallDone,
  331                           self.driver._check_create_cloned_volume_finish,
  332                           self.volume.name)
  333 
  334     @mock.patch.object(dsware.DSWAREDriver,
  335                        '_check_create_cloned_volume_finish')
  336     def test_private_wait_for_create_cloned_volume_finish_timer(self,
  337                                                                 mock_check):
  338         mock_check.side_effect = [loopingcall.LoopingCallDone(retvalue=True),
  339                                   loopingcall.LoopingCallDone(retvalue=False)]
  340         retval = self.driver._wait_for_create_cloned_volume_finish_timer(
  341             self.volume.name)
  342         self.assertTrue(retval)
  343 
  344         retval = self.driver._wait_for_create_cloned_volume_finish_timer(
  345             self.volume.name)
  346         self.assertFalse(retval)
  347 
  348     def test_private_analyse_output(self):
  349         out = 'ret_code=10\nret_desc=test\ndev_addr=/sda\n'
  350         retval = self.driver._analyse_output(out)
  351         self.assertEqual({'dev_addr': '/sda',
  352                           'ret_desc': 'test', 'ret_code': '10'},
  353                          retval)
  354 
  355         out = 'abcdefg'
  356         retval = self.driver._analyse_output(out)
  357         self.assertEqual({}, retval)
  358 
  359     def test_private_attach_volume(self):
  360         success = ['ret_code=0\nret_desc=success\ndev_addr=/dev/sdb\n', '']
  361         failure = ['ret_code=50510011\nret_desc=failed\ndev_addr=/dev/sdb\n',
  362                    '']
  363         mock_execute = self.mock_object(self.driver, '_execute')
  364         mock_execute.side_effect = [success, failure]
  365         # attached successful
  366         retval = self.driver._attach_volume(self.volume.name,
  367                                             self.driver.manage_ip)
  368         self.assertEqual({'dev_addr': '/dev/sdb',
  369                           'ret_desc': 'success', 'ret_code': '0'},
  370                          retval)
  371         # attached failure
  372         retval = self.driver._attach_volume(self.volume.name,
  373                                             self.driver.manage_ip)
  374         self.assertEqual({'dev_addr': '/dev/sdb',
  375                           'ret_desc': 'failed', 'ret_code': '50510011'},
  376                          retval)
  377 
  378     def test_private_detach_volume(self):
  379         success = ['ret_code=0\nret_desc=success\ndev_addr=/dev/sdb\n', '']
  380         failure = ['ret_code=50510011\nret_desc=failed\ndev_addr=/dev/sdb\n',
  381                    '']
  382         mock_execute = self.mock_object(self.driver, '_execute')
  383         mock_execute.side_effect = [success, failure]
  384         # detached successful
  385         retval = self.driver._detach_volume(self.volume.name,
  386                                             self.driver.manage_ip)
  387         self.assertEqual({'dev_addr': '/dev/sdb',
  388                           'ret_desc': 'success', 'ret_code': '0'},
  389                          retval)
  390         # detached failure
  391         retval = self.driver._detach_volume(self.volume.name,
  392                                             self.driver.manage_ip)
  393         self.assertEqual({'dev_addr': '/dev/sdb',
  394                           'ret_desc': 'failed',
  395                           'ret_code': '50510011'},
  396                          retval)
  397 
  398     def test_private_query_volume_attach(self):
  399         success = ['ret_code=0\nret_desc=success\ndev_addr=/dev/sdb\n', '']
  400         failure = ['ret_code=50510011\nret_desc=failed\ndev_addr=/dev/sdb\n',
  401                    '']
  402         mock_execute = self.mock_object(self.driver, '_execute')
  403         mock_execute.side_effect = [success, failure]
  404         # query successful
  405         retval = self.driver._query_volume_attach(self.volume.name,
  406                                                   self.driver.manage_ip)
  407         self.assertEqual({'dev_addr': '/dev/sdb',
  408                           'ret_desc': 'success',
  409                           'ret_code': '0'},
  410                          retval)
  411         # query failure
  412         retval = self.driver._query_volume_attach(self.volume.name,
  413                                                   self.driver.manage_ip)
  414         self.assertEqual({'dev_addr': '/dev/sdb',
  415                           'ret_desc': 'failed',
  416                           'ret_code': '50510011'},
  417                          retval)
  418 
  419     @mock.patch.object(dsware.DSWAREDriver, '_get_dsware_manage_ip')
  420     @mock.patch.object(dsware.DSWAREDriver, '_attach_volume')
  421     @mock.patch.object(image_utils, 'fetch_to_raw')
  422     @mock.patch.object(dsware.DSWAREDriver, '_detach_volume')
  423     def test_copy_image_to_volume(self, mock_detach, mock_fetch,
  424                                   mock_attach, mock_get_manage_ip):
  425         success = {'ret_code': '0',
  426                    'ret_desc': 'success',
  427                    'dev_addr': '/dev/sdb'}
  428         failure = {'ret_code': '50510011',
  429                    'ret_desc': 'failed',
  430                    'dev_addr': '/dev/sdb'}
  431         context = ''
  432         image_service = ''
  433         image_id = ''
  434         mock_get_manage_ip.return_value = '127.0.0.1'
  435         mock_attach.side_effect = [success, failure, success]
  436         mock_detach.side_effect = [success, failure, failure]
  437 
  438         # success
  439         self.driver.copy_image_to_volume(context, self.volume, image_service,
  440                                          image_id)
  441 
  442         # failure - attach failure
  443         self.assertRaises(exception.CinderException,
  444                           self.driver.copy_image_to_volume,
  445                           context, self.volume, image_service, image_id)
  446 
  447         # failure - detach failure
  448         self.assertRaises(exception.CinderException,
  449                           self.driver.copy_image_to_volume,
  450                           context, self.volume, image_service, image_id)
  451 
  452     @mock.patch.object(dsware.DSWAREDriver, '_get_dsware_manage_ip')
  453     @mock.patch.object(dsware.DSWAREDriver, '_attach_volume')
  454     @mock.patch.object(dsware.DSWAREDriver, '_query_volume_attach')
  455     @mock.patch.object(image_utils, 'upload_volume')
  456     @mock.patch.object(dsware.DSWAREDriver, '_detach_volume')
  457     def test_copy_volume_to_image_success(self, mock_detach, mock_upload,
  458                                           mock_query, mock_attach,
  459                                           mock_get_manage_ip):
  460         success = {'ret_code': '0',
  461                    'ret_desc': 'success',
  462                    'dev_addr': '/dev/sdb'}
  463         already_attached = {'ret_code': '50151401',
  464                             'ret_desc': 'already_attached',
  465                             'dev_addr': '/dev/sdb'}
  466         context = ''
  467         image_service = ''
  468         image_meta = ''
  469 
  470         mock_get_manage_ip.return_value = '127.0.0.1'
  471         mock_attach.return_value = success
  472         mock_detach.return_value = success
  473         self.driver.copy_volume_to_image(context, self.volume, image_service,
  474                                          image_meta)
  475         mock_upload.assert_called_with('', '', '', '/dev/sdb')
  476 
  477         mock_attach.return_value = already_attached
  478         mock_query.return_value = success
  479         mock_detach.return_value = success
  480         self.driver.copy_volume_to_image(context, self.volume, image_service,
  481                                          image_meta)
  482         mock_upload.assert_called_with('', '', '', '/dev/sdb')
  483 
  484     @mock.patch.object(dsware.DSWAREDriver, '_get_dsware_manage_ip')
  485     @mock.patch.object(dsware.DSWAREDriver, '_attach_volume')
  486     @mock.patch.object(dsware.DSWAREDriver, '_query_volume_attach')
  487     @mock.patch.object(image_utils, 'upload_volume')
  488     @mock.patch.object(dsware.DSWAREDriver, '_detach_volume')
  489     def test_copy_volume_to_image_attach_fail(self, mock_detach, mock_upload,
  490                                               mock_query, mock_attach,
  491                                               mock_get_manage_ip):
  492         failure = {'ret_code': '50510011',
  493                    'ret_desc': 'failed',
  494                    'dev_addr': '/dev/sdb'}
  495         context = ''
  496         image_service = ''
  497         image_meta = ''
  498 
  499         mock_get_manage_ip.return_value = '127.0.0.1'
  500         mock_attach.return_value = failure
  501         self.assertRaises(exception.CinderException,
  502                           self.driver.copy_volume_to_image,
  503                           context, self.volume, image_service, image_meta)
  504         mock_attach.return_value = None
  505         self.assertRaises(exception.CinderException,
  506                           self.driver.copy_volume_to_image,
  507                           context, self.volume, image_service, image_meta)
  508 
  509     @mock.patch.object(dsware.DSWAREDriver, '_get_dsware_manage_ip')
  510     @mock.patch.object(dsware.DSWAREDriver, '_attach_volume')
  511     @mock.patch.object(dsware.DSWAREDriver, '_query_volume_attach')
  512     @mock.patch.object(image_utils, 'upload_volume')
  513     @mock.patch.object(dsware.DSWAREDriver, '_detach_volume')
  514     def test_copy_volume_to_image_query_attach_fail(self, mock_detach,
  515                                                     mock_upload, mock_query,
  516                                                     mock_attach,
  517                                                     mock_get_manage_ip):
  518         already_attached = {'ret_code': '50151401',
  519                             'ret_desc': 'already_attached',
  520                             'dev_addr': '/dev/sdb'}
  521         failure = {'ret_code': '50510011',
  522                    'ret_desc': 'failed',
  523                    'dev_addr': '/dev/sdb'}
  524         context = ''
  525         image_service = ''
  526         image_meta = ''
  527 
  528         mock_get_manage_ip.return_value = '127.0.0.1'
  529         mock_attach.return_value = already_attached
  530         mock_query.return_value = failure
  531         self.assertRaises(exception.CinderException,
  532                           self.driver.copy_volume_to_image,
  533                           context, self.volume, image_service, image_meta)
  534 
  535         mock_query.return_value = None
  536         self.assertRaises(exception.CinderException,
  537                           self.driver.copy_volume_to_image,
  538                           context, self.volume, image_service, image_meta)
  539 
  540     @mock.patch.object(dsware.DSWAREDriver, '_get_dsware_manage_ip')
  541     @mock.patch.object(dsware.DSWAREDriver, '_attach_volume')
  542     @mock.patch.object(dsware.DSWAREDriver, '_query_volume_attach')
  543     @mock.patch.object(image_utils, 'upload_volume')
  544     @mock.patch.object(dsware.DSWAREDriver, '_detach_volume')
  545     def test_copy_volume_to_image_upload_fail(self, mock_detach, mock_upload,
  546                                               mock_query, mock_attach,
  547                                               mock_get_manage_ip):
  548         success = {'ret_code': '0',
  549                    'ret_desc': 'success',
  550                    'dev_addr': '/dev/sdb'}
  551         already_attached = {'ret_code': '50151401',
  552                             'ret_desc': 'already_attached',
  553                             'dev_addr': '/dev/sdb'}
  554         context = ''
  555         image_service = ''
  556         image_meta = ''
  557 
  558         mock_get_manage_ip.return_value = '127.0.0.1'
  559         mock_attach.return_value = already_attached
  560         mock_query.return_value = success
  561         mock_upload.side_effect = exception.CinderException(
  562             'upload_volume error')
  563         self.assertRaises(exception.CinderException,
  564                           self.driver.copy_volume_to_image,
  565                           context, self.volume, image_service, image_meta)
  566 
  567     @mock.patch.object(fspythonapi.FSPythonApi, 'query_volume')
  568     def test_private_get_volume(self, mock_query):
  569         result_success = {'result': 0}
  570         result_not_exist = {'result': "50150005\n"}
  571         result_exception = {'result': "50510006\n"}
  572 
  573         mock_query.side_effect = [
  574             result_success, result_not_exist, result_exception]
  575 
  576         retval = self.driver._get_volume(self.volume.name)
  577         self.assertTrue(retval)
  578 
  579         retval = self.driver._get_volume(self.volume.name)
  580         self.assertFalse(retval)
  581 
  582         self.assertRaises(exception.CinderException,
  583                           self.driver._get_volume,
  584                           self.volume.name)
  585 
  586     @mock.patch.object(fspythonapi.FSPythonApi, 'delete_volume')
  587     def test_private_delete_volume(self, mock_delete):
  588         result_success = 0
  589         result_not_exist = '50150005\n'
  590         result_being_deleted = '50151002\n'
  591         result_exception = '51050006\n'
  592 
  593         mock_delete.side_effect = [result_success, result_not_exist,
  594                                    result_being_deleted, result_exception]
  595 
  596         retval = self.driver._delete_volume(self.volume.name)
  597         self.assertTrue(retval)
  598 
  599         retval = self.driver._delete_volume(self.volume.name)
  600         self.assertTrue(retval)
  601 
  602         retval = self.driver._delete_volume(self.volume.name)
  603         self.assertTrue(retval)
  604 
  605         self.assertRaises(exception.CinderException,
  606                           self.driver._delete_volume, self.volume.name)
  607 
  608     @mock.patch.object(dsware.DSWAREDriver, '_get_volume')
  609     @mock.patch.object(dsware.DSWAREDriver, '_delete_volume')
  610     def test_delete_volume(self, mock_delete, mock_get):
  611         mock_get.return_value = False
  612         retval = self.driver.delete_volume(self.volume)
  613         self.assertTrue(retval)
  614 
  615         mock_get.return_value = True
  616         mock_delete.return_value = True
  617         retval = self.driver.delete_volume(self.volume)
  618         self.assertTrue(retval)
  619 
  620         mock_get.return_value = True
  621         mock_delete.side_effect = exception.CinderException(
  622             'delete volume exception')
  623         self.assertRaises(exception.CinderException,
  624                           self.driver.delete_volume,
  625                           self.volume)
  626 
  627         mock_get.side_effect = exception.CinderException(
  628             'get volume exception')
  629         self.assertRaises(exception.CinderException,
  630                           self.driver.delete_volume,
  631                           self.volume)
  632 
  633     @mock.patch.object(fspythonapi.FSPythonApi, 'query_snap')
  634     def test_private_get_snapshot(self, mock_query):
  635         result_success = {'result': 0}
  636         result_not_found = {'result': "50150006\n"}
  637         result_exception = {'result': "51050007\n"}
  638         mock_query.side_effect = [result_success, result_not_found,
  639                                   result_exception]
  640 
  641         retval = self.driver._get_snapshot(self.snapshot.name)
  642         self.assertTrue(retval)
  643 
  644         retval = self.driver._get_snapshot(self.snapshot.name)
  645         self.assertFalse(retval)
  646 
  647         self.assertRaises(exception.CinderException,
  648                           self.driver._get_snapshot,
  649                           self.snapshot.name)
  650 
  651     @mock.patch.object(fspythonapi.FSPythonApi, 'create_snapshot')
  652     def test_private_create_snapshot(self, mock_create):
  653         mock_create.side_effect = [0, 1]
  654 
  655         self.driver._create_snapshot(self.snapshot.name,
  656                                      self.volume.name)
  657 
  658         self.assertRaises(exception.CinderException,
  659                           self.driver._create_snapshot,
  660                           self.snapshot.name, self.volume.name)
  661 
  662     @mock.patch.object(fspythonapi.FSPythonApi, 'delete_snapshot')
  663     def test_private_delete_snapshot(self, mock_delete):
  664         mock_delete.side_effect = [0, 1]
  665 
  666         self.driver._delete_snapshot(self.snapshot.name)
  667 
  668         self.assertRaises(exception.CinderException,
  669                           self.driver._delete_snapshot, self.snapshot.name)
  670 
  671     @mock.patch.object(dsware.DSWAREDriver, '_get_volume')
  672     @mock.patch.object(dsware.DSWAREDriver, '_create_snapshot')
  673     def test_create_snapshot(self, mock_create, mock_get):
  674         mock_get.return_value = True
  675         self.driver.create_snapshot(self.snapshot)
  676 
  677         mock_create.side_effect = exception.CinderException(
  678             'create snapshot failed')
  679         self.assertRaises(exception.CinderException,
  680                           self.driver.create_snapshot, self.snapshot)
  681 
  682         mock_get.side_effect = [
  683             False, exception.CinderException('get volume failed')]
  684         self.assertRaises(exception.CinderException,
  685                           self.driver.create_snapshot,
  686                           self.snapshot)
  687         self.assertRaises(exception.CinderException,
  688                           self.driver.create_snapshot,
  689                           self.snapshot)
  690 
  691     @mock.patch.object(dsware.DSWAREDriver, '_get_snapshot')
  692     @mock.patch.object(dsware.DSWAREDriver, '_delete_snapshot')
  693     def test_delete_snapshot(self, mock_delete, mock_get):
  694         mock_get.side_effect = [True, False, exception.CinderException, True]
  695         self.driver.delete_snapshot(self.snapshot)
  696         self.driver.delete_snapshot(self.snapshot)
  697 
  698         self.assertRaises(exception.CinderException,
  699                           self.driver.delete_snapshot,
  700                           self.snapshot)
  701         mock_delete.side_effect = exception.CinderException(
  702             'delete snapshot exception')
  703         self.assertRaises(exception.CinderException,
  704                           self.driver.delete_snapshot,
  705                           self.snapshot)
  706 
  707     @mock.patch.object(fspythonapi.FSPythonApi, 'query_pool_info')
  708     def test_private_update_single_pool_info_status(self, mock_query):
  709         pool_info = {'result': 0,
  710                      'pool_id': 10,
  711                      'total_capacity': 10240,
  712                      'used_capacity': 5120,
  713                      'alloc_capacity': 7168}
  714         pool_info_none = {'result': 1}
  715 
  716         mock_query.side_effect = [pool_info, pool_info_none]
  717 
  718         self.driver._update_single_pool_info_status()
  719         self.assertEqual({'total_capacity_gb': 10.0,
  720                           'free_capacity_gb': 5.0,
  721                           'volume_backend_name': None,
  722                           'vendor_name': 'Open Source',
  723                           'driver_version': '1.0',
  724                           'storage_protocol': 'dsware',
  725                           'reserved_percentage': 0,
  726                           'QoS_support': False},
  727                          self.driver._stats)
  728 
  729         self.driver._update_single_pool_info_status()
  730         self.assertIsNone(self.driver._stats)
  731 
  732     @mock.patch.object(fspythonapi.FSPythonApi, 'query_pool_type')
  733     def test_private_update_multi_pool_of_same_type_status(self, mock_query):
  734         query_result = (0, [{'result': 0,
  735                              'pool_id': '0',
  736                              'total_capacity': '10240',
  737                              'used_capacity': '5120',
  738                              'alloc_capacity': '7168'}])
  739         query_result_none = (0, [])
  740 
  741         mock_query.side_effect = [query_result, query_result_none]
  742 
  743         self.driver._update_multi_pool_of_same_type_status()
  744         self.assertEqual({'volume_backend_name': None,
  745                           'vendor_name': 'Open Source',
  746                           'driver_version': '1.0',
  747                           'storage_protocol': 'dsware',
  748                           'pools': [{'pool_name': '0',
  749                                      'total_capacity_gb': 10.0,
  750                                      'allocated_capacity_gb': 5.0,
  751                                      'free_capacity_gb': 5.0,
  752                                      'QoS_support': False,
  753                                      'reserved_percentage': 0}]},
  754                          self.driver._stats)
  755 
  756         self.driver._update_multi_pool_of_same_type_status()
  757         self.assertIsNone(self.driver._stats)
  758 
  759     def test_private_calculate_pool_info(self):
  760         pool_sets = [{'pool_id': 0,
  761                       'total_capacity': 10240,
  762                       'used_capacity': 5120,
  763                       'QoS_support': False,
  764                       'reserved_percentage': 0}]
  765         retval = self.driver._calculate_pool_info(pool_sets)
  766         self.assertEqual([{'pool_name': 0,
  767                            'total_capacity_gb': 10.0,
  768                            'allocated_capacity_gb': 5.0,
  769                            'free_capacity_gb': 5.0,
  770                            'QoS_support': False,
  771                            'reserved_percentage': 0}],
  772                          retval)
  773 
  774     @mock.patch.object(dsware.DSWAREDriver, '_update_single_pool_info_status')
  775     @mock.patch.object(dsware.DSWAREDriver,
  776                        '_update_multi_pool_of_same_type_status')
  777     @mock.patch.object(fspythonapi.FSPythonApi, 'query_dsware_version')
  778     def test_get_volume_stats(self, mock_query, mock_type, mock_info):
  779         mock_query.return_value = 1
  780 
  781         self.driver.get_volume_stats(False)
  782         mock_query.assert_not_called()
  783 
  784         self.driver.get_volume_stats(True)
  785         mock_query.assert_called_once_with()