"Fossies" - the Fresh Open Source Software Archive

Member "cinder-13.0.7/cinder/tests/unit/volume/test_rpcapi.py" (4 Oct 2019, 30333 Bytes) of package /linux/misc/openstack/cinder-13.0.7.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the last Fossies "Diffs" side-by-side code changes report for "test_rpcapi.py": 14.0.2_vs_15.0.0.

    1 # Copyright 2012, Intel, Inc.
    2 #
    3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 #    not use this file except in compliance with the License. You may obtain
    5 #    a copy of the License at
    6 #
    7 #         http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 #    Unless required by applicable law or agreed to in writing, software
   10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 #    License for the specific language governing permissions and limitations
   13 #    under the License.
   14 
   15 """
   16 Unit Tests for cinder.volume.rpcapi
   17 """
   18 import ddt
   19 import mock
   20 
   21 from oslo_config import cfg
   22 from oslo_serialization import jsonutils
   23 
   24 from cinder.common import constants
   25 from cinder import db
   26 from cinder import exception
   27 from cinder import objects
   28 from cinder.objects import fields
   29 from cinder import test
   30 from cinder.tests.unit.backup import fake_backup
   31 from cinder.tests.unit import fake_constants as fake
   32 from cinder.tests.unit import fake_service
   33 from cinder.tests.unit import fake_volume
   34 from cinder.tests.unit import utils as tests_utils
   35 from cinder.volume import rpcapi as volume_rpcapi
   36 
   37 
   38 CONF = cfg.CONF
   39 
   40 
   41 @ddt.ddt
   42 class VolumeRPCAPITestCase(test.RPCAPITestCase):
   43 
   44     def setUp(self):
   45         super(VolumeRPCAPITestCase, self).setUp()
   46         self.rpcapi = volume_rpcapi.VolumeAPI
   47         self.base_version = '3.0'
   48         vol = {}
   49         vol['host'] = 'fake_host'
   50         vol['availability_zone'] = CONF.storage_availability_zone
   51         vol['status'] = "available"
   52         vol['attach_status'] = "detached"
   53         vol['metadata'] = {"test_key": "test_val"}
   54         vol['size'] = 1
   55         volume = db.volume_create(self.context, vol)
   56 
   57         kwargs = {
   58             'status': fields.SnapshotStatus.CREATING,
   59             'progress': '0%',
   60             'display_name': 'fake_name',
   61             'display_description': 'fake_description'}
   62         snapshot = tests_utils.create_snapshot(self.context, vol['id'],
   63                                                **kwargs)
   64 
   65         generic_group = tests_utils.create_group(
   66             self.context,
   67             availability_zone=CONF.storage_availability_zone,
   68             group_type_id='group_type1',
   69             host='fakehost@fakedrv#fakepool')
   70 
   71         group_snapshot = tests_utils.create_group_snapshot(
   72             self.context,
   73             group_id=generic_group.id,
   74             group_type_id=fake.GROUP_TYPE_ID)
   75 
   76         self.fake_volume = jsonutils.to_primitive(volume)
   77         self.fake_volume_obj = fake_volume.fake_volume_obj(self.context, **vol)
   78         self.fake_snapshot = snapshot
   79         self.fake_reservations = ["RESERVATION"]
   80         self.fake_backup_obj = fake_backup.fake_backup_obj(self.context)
   81         self.fake_group = generic_group
   82         self.fake_group_snapshot = group_snapshot
   83 
   84         self.can_send_version_mock = self.patch(
   85             'oslo_messaging.RPCClient.can_send_version', return_value=True)
   86 
   87     def tearDown(self):
   88         super(VolumeRPCAPITestCase, self).tearDown()
   89         self.fake_snapshot.destroy()
   90         self.fake_volume_obj.destroy()
   91         self.fake_group_snapshot.destroy()
   92         self.fake_group.destroy()
   93         self.fake_backup_obj.destroy()
   94 
   95     def _change_cluster_name(self, resource, cluster_name):
   96         resource.cluster_name = cluster_name
   97         resource.obj_reset_changes()
   98 
   99     def test_create_volume(self):
  100         self._test_rpc_api('create_volume',
  101                            rpc_method='cast',
  102                            server='fake_host',
  103                            volume=self.fake_volume_obj,
  104                            request_spec=objects.RequestSpec.from_primitives(
  105                                {}),
  106                            filter_properties={'availability_zone': 'fake_az'},
  107                            allow_reschedule=True)
  108 
  109     @ddt.data(None, 'my_cluster')
  110     def test_delete_volume(self, cluster_name):
  111         self._change_cluster_name(self.fake_volume_obj, cluster_name)
  112         self._test_rpc_api('delete_volume',
  113                            rpc_method='cast',
  114                            server=cluster_name or self.fake_volume_obj.host,
  115                            volume=self.fake_volume_obj,
  116                            unmanage_only=False,
  117                            cascade=False)
  118 
  119     def test_delete_volume_cascade(self):
  120         self._test_rpc_api('delete_volume',
  121                            rpc_method='cast',
  122                            server=self.fake_volume_obj.host,
  123                            volume=self.fake_volume_obj,
  124                            unmanage_only=False,
  125                            cascade=True)
  126 
  127     @ddt.data(None, 'mycluster')
  128     def test_create_snapshot(self, cluster_name):
  129         self._change_cluster_name(self.fake_volume_obj, cluster_name)
  130         self._test_rpc_api('create_snapshot',
  131                            rpc_method='cast',
  132                            server=cluster_name or self.fake_volume_obj.host,
  133                            volume=self.fake_volume_obj,
  134                            snapshot=self.fake_snapshot)
  135 
  136     @ddt.data(None, 'mycluster')
  137     def test_delete_snapshot(self, cluster_name):
  138         self._change_cluster_name(self.fake_snapshot.volume, cluster_name)
  139         self._test_rpc_api(
  140             'delete_snapshot', rpc_method='cast',
  141             server=cluster_name or self.fake_snapshot.volume.host,
  142             snapshot=self.fake_snapshot, unmanage_only=False)
  143 
  144     def test_delete_snapshot_with_unmanage_only(self):
  145         self._test_rpc_api('delete_snapshot',
  146                            rpc_method='cast',
  147                            server=self.fake_snapshot.volume.host,
  148                            snapshot=self.fake_snapshot,
  149                            unmanage_only=True)
  150 
  151     @ddt.data('3.0', '3.3')
  152     def test_attach_volume_to_instance(self, version):
  153         self.can_send_version_mock.return_value = (version == '3.3')
  154         self._test_rpc_api('attach_volume',
  155                            rpc_method='call',
  156                            server=self.fake_volume_obj.host,
  157                            volume=self.fake_volume_obj,
  158                            instance_uuid=fake.INSTANCE_ID,
  159                            host_name=None,
  160                            mountpoint='fake_mountpoint',
  161                            mode='ro',
  162                            expected_kwargs_diff={
  163                                'volume_id': self.fake_volume_obj.id},
  164                            retval=fake_volume.fake_db_volume_attachment(),
  165                            version=version)
  166 
  167     @ddt.data('3.0', '3.3')
  168     def test_attach_volume_to_host(self, version):
  169         self.can_send_version_mock.return_value = (version == '3.3')
  170         self._test_rpc_api('attach_volume',
  171                            rpc_method='call',
  172                            server=self.fake_volume_obj.host,
  173                            volume=self.fake_volume_obj,
  174                            instance_uuid=None,
  175                            host_name='fake_host',
  176                            mountpoint='fake_mountpoint',
  177                            mode='rw',
  178                            expected_kwargs_diff={
  179                                'volume_id': self.fake_volume_obj.id},
  180                            retval=fake_volume.fake_db_volume_attachment(),
  181                            version=version)
  182 
  183     @ddt.data('3.0', '3.3')
  184     def test_attach_volume_cluster(self, version):
  185         self.can_send_version_mock.return_value = (version == '3.3')
  186         self._change_cluster_name(self.fake_volume_obj, 'mycluster')
  187         self._test_rpc_api('attach_volume',
  188                            rpc_method='call',
  189                            server=self.fake_volume_obj.cluster_name,
  190                            volume=self.fake_volume_obj,
  191                            instance_uuid=None,
  192                            host_name='fake_host',
  193                            mountpoint='fake_mountpoint',
  194                            mode='rw',
  195                            expected_kwargs_diff={
  196                                'volume_id': self.fake_volume_obj.id},
  197                            retval=fake_volume.fake_db_volume_attachment(),
  198                            version=version)
  199 
  200     @ddt.data('3.0', '3.4')
  201     def test_detach_volume(self, version):
  202         self.can_send_version_mock.return_value = (version == '3.4')
  203         self._test_rpc_api('detach_volume',
  204                            rpc_method='call',
  205                            server=self.fake_volume_obj.host,
  206                            volume=self.fake_volume_obj,
  207                            attachment_id=fake.ATTACHMENT_ID,
  208                            expected_kwargs_diff={
  209                                'volume_id': self.fake_volume_obj.id},
  210                            # NOTE(dulek): Detach isn't returning anything, but
  211                            # it's a call and it is synchronous.
  212                            retval=None,
  213                            version=version)
  214 
  215     @ddt.data('3.0', '3.4')
  216     def test_detach_volume_cluster(self, version):
  217         self.can_send_version_mock.return_value = (version == '3.4')
  218         self._change_cluster_name(self.fake_volume_obj, 'mycluster')
  219         self._test_rpc_api('detach_volume',
  220                            rpc_method='call',
  221                            server=self.fake_volume_obj.cluster_name,
  222                            volume=self.fake_volume_obj,
  223                            attachment_id='fake_uuid',
  224                            expected_kwargs_diff={
  225                                'volume_id': self.fake_volume_obj.id},
  226                            # NOTE(dulek): Detach isn't returning anything, but
  227                            # it's a call and it is synchronous.
  228                            retval=None,
  229                            version=version)
  230 
  231     @ddt.data(None, 'mycluster')
  232     def test_copy_volume_to_image(self, cluster_name):
  233         self._change_cluster_name(self.fake_volume_obj, cluster_name)
  234         self._test_rpc_api('copy_volume_to_image',
  235                            rpc_method='cast',
  236                            server=cluster_name or self.fake_volume_obj.host,
  237                            volume=self.fake_volume_obj,
  238                            expected_kwargs_diff={
  239                                'volume_id': self.fake_volume_obj.id},
  240                            image_meta={'id': fake.IMAGE_ID,
  241                                        'container_format': 'fake_type',
  242                                        'disk_format': 'fake_format'})
  243 
  244     @ddt.data(None, 'mycluster')
  245     def test_initialize_connection(self, cluster_name):
  246         self._change_cluster_name(self.fake_volume_obj, cluster_name)
  247         self._test_rpc_api('initialize_connection',
  248                            rpc_method='call',
  249                            server=cluster_name or self.fake_volume_obj.host,
  250                            connector='fake_connector',
  251                            volume=self.fake_volume_obj)
  252 
  253     @ddt.data(None, 'mycluster')
  254     def test_terminate_connection(self, cluster_name):
  255         self._change_cluster_name(self.fake_volume_obj, cluster_name)
  256         self._test_rpc_api('terminate_connection',
  257                            rpc_method='call',
  258                            server=cluster_name or self.fake_volume_obj.host,
  259                            volume=self.fake_volume_obj,
  260                            connector='fake_connector',
  261                            force=False,
  262                            # NOTE(dulek): Terminate isn't returning anything,
  263                            # but it's a call and it is synchronous.
  264                            retval=None,
  265                            expected_kwargs_diff={
  266                                'volume_id': self.fake_volume_obj.id})
  267 
  268     @ddt.data(None, 'mycluster')
  269     def test_accept_transfer(self, cluster_name):
  270         self._change_cluster_name(self.fake_volume_obj, cluster_name)
  271         self._test_rpc_api('accept_transfer',
  272                            rpc_method='call',
  273                            server=cluster_name or self.fake_volume_obj.host,
  274                            volume=self.fake_volume_obj,
  275                            new_user=fake.USER_ID,
  276                            new_project=fake.PROJECT_ID,
  277                            no_snapshots=True,
  278                            expected_kwargs_diff={
  279                                'volume_id': self.fake_volume_obj.id},
  280                            version='3.16')
  281 
  282     @ddt.data(None, 'mycluster')
  283     def test_extend_volume(self, cluster_name):
  284         self._change_cluster_name(self.fake_volume_obj, cluster_name)
  285         self._test_rpc_api('extend_volume',
  286                            rpc_method='cast',
  287                            server=cluster_name or self.fake_volume_obj.host,
  288                            volume=self.fake_volume_obj,
  289                            new_size=1,
  290                            reservations=self.fake_reservations)
  291 
  292     def test_migrate_volume(self):
  293         class FakeBackend(object):
  294 
  295             def __init__(self):
  296                 self.host = 'fake_host'
  297                 self.cluster_name = 'cluster_name'
  298                 self.capabilities = {}
  299         dest_backend = FakeBackend()
  300         self._test_rpc_api('migrate_volume',
  301                            rpc_method='cast',
  302                            server=self.fake_volume_obj.host,
  303                            volume=self.fake_volume_obj,
  304                            dest_backend=dest_backend,
  305                            force_host_copy=True,
  306                            expected_kwargs_diff={
  307                                'host': {'host': 'fake_host',
  308                                         'cluster_name': 'cluster_name',
  309                                         'capabilities': {}}},
  310                            version='3.5')
  311 
  312     def test_migrate_volume_completion(self):
  313         self._test_rpc_api('migrate_volume_completion',
  314                            rpc_method='call',
  315                            server=self.fake_volume_obj.host,
  316                            volume=self.fake_volume_obj,
  317                            new_volume=self.fake_volume_obj,
  318                            error=False,
  319                            retval=fake.VOLUME_ID)
  320 
  321     def test_retype(self):
  322         class FakeBackend(object):
  323 
  324             def __init__(self):
  325                 self.host = 'fake_host'
  326                 self.cluster_name = 'cluster_name'
  327                 self.capabilities = {}
  328         dest_backend = FakeBackend()
  329         self._test_rpc_api('retype',
  330                            rpc_method='cast',
  331                            server=self.fake_volume_obj.host,
  332                            volume=self.fake_volume_obj,
  333                            new_type_id=fake.VOLUME_TYPE_ID,
  334                            dest_backend=dest_backend,
  335                            migration_policy='never',
  336                            reservations=self.fake_reservations,
  337                            old_reservations=self.fake_reservations,
  338                            expected_kwargs_diff={
  339                                'host': {'host': 'fake_host',
  340                                         'cluster_name': 'cluster_name',
  341                                         'capabilities': {}}},
  342                            version='3.5')
  343 
  344     def test_manage_existing(self):
  345         self._test_rpc_api('manage_existing',
  346                            rpc_method='cast',
  347                            server=self.fake_volume_obj.host,
  348                            volume=self.fake_volume_obj,
  349                            ref={'lv_name': 'foo'})
  350 
  351     def test_manage_existing_snapshot(self):
  352         self._test_rpc_api('manage_existing_snapshot',
  353                            rpc_method='cast',
  354                            server=self.fake_snapshot.volume.host,
  355                            snapshot=self.fake_snapshot,
  356                            ref='foo',
  357                            backend='fake_host')
  358 
  359     def test_freeze_host(self):
  360         service = fake_service.fake_service_obj(self.context,
  361                                                 host='fake_host',
  362                                                 binary=constants.VOLUME_BINARY)
  363         self._test_rpc_api('freeze_host',
  364                            rpc_method='call',
  365                            server='fake_host',
  366                            service=service,
  367                            retval=True)
  368 
  369     def test_thaw_host(self):
  370         service = fake_service.fake_service_obj(self.context,
  371                                                 host='fake_host',
  372                                                 binary=constants.VOLUME_BINARY)
  373         self._test_rpc_api('thaw_host',
  374                            rpc_method='call',
  375                            server='fake_host',
  376                            service=service,
  377                            retval=True)
  378 
  379     @ddt.data('3.0', '3.8')
  380     def test_failover(self, version):
  381         self.can_send_version_mock.side_effect = lambda x: x == version
  382         service = objects.Service(self.context, host='fake_host',
  383                                   cluster_name=None)
  384         expected_method = 'failover' if version == '3.8' else 'failover_host'
  385         self._test_rpc_api('failover', rpc_method='cast',
  386                            expected_method=expected_method, server='fake_host',
  387                            service=service,
  388                            secondary_backend_id='fake_backend',
  389                            version=version)
  390 
  391     @mock.patch('cinder.volume.rpcapi.VolumeAPI._get_cctxt')
  392     def test_failover_completed(self, cctxt_mock):
  393         service = objects.Service(self.context, host='fake_host',
  394                                   cluster_name='cluster_name')
  395         self._test_rpc_api('failover_completed', rpc_method='cast',
  396                            fanout=True, server='fake_host', service=service,
  397                            updates=mock.sentinel.updates)
  398 
  399     def test_get_capabilities(self):
  400         self._test_rpc_api('get_capabilities',
  401                            rpc_method='call',
  402                            server='fake_host',
  403                            backend_id='fake_host',
  404                            discover=True,
  405                            retval={'foo': 'bar'})
  406 
  407     def test_remove_export(self):
  408         self._test_rpc_api('remove_export',
  409                            rpc_method='cast',
  410                            server=self.fake_volume_obj.host,
  411                            volume=self.fake_volume_obj,
  412                            expected_kwargs_diff={
  413                                'volume_id': self.fake_volume_obj.id})
  414 
  415     @ddt.data(None, 'mycluster')
  416     def test_get_backup_device(self, cluster_name):
  417         self._change_cluster_name(self.fake_volume_obj, cluster_name)
  418         backup_device_dict = {'backup_device': self.fake_volume,
  419                               'is_snapshot': False,
  420                               'secure_enabled': True}
  421         backup_device_obj = objects.BackupDeviceInfo.from_primitive(
  422             backup_device_dict, self.context)
  423         self._test_rpc_api('get_backup_device',
  424                            rpc_method='call',
  425                            server=cluster_name or self.fake_volume_obj.host,
  426                            backup=self.fake_backup_obj,
  427                            volume=self.fake_volume_obj,
  428                            expected_kwargs_diff={
  429                                'want_objects': True,
  430                            },
  431                            retval=backup_device_obj,
  432                            version='3.2')
  433 
  434     @ddt.data(None, 'mycluster')
  435     def test_get_backup_device_old(self, cluster_name):
  436         self.can_send_version_mock.side_effect = (True, False, False)
  437         self._change_cluster_name(self.fake_volume_obj, cluster_name)
  438         backup_device_dict = {'backup_device': self.fake_volume,
  439                               'is_snapshot': False,
  440                               'secure_enabled': True}
  441         backup_device_obj = objects.BackupDeviceInfo.from_primitive(
  442             backup_device_dict, self.context)
  443 
  444         self._test_rpc_api('get_backup_device',
  445                            rpc_method='call',
  446                            server=cluster_name or self.fake_volume_obj.host,
  447                            backup=self.fake_backup_obj,
  448                            volume=self.fake_volume_obj,
  449                            retval=backup_device_dict,
  450                            expected_retval=backup_device_obj,
  451                            version='3.0')
  452 
  453     @ddt.data(None, 'mycluster')
  454     def test_secure_file_operations_enabled(self, cluster_name):
  455         self._change_cluster_name(self.fake_volume_obj, cluster_name)
  456         self._test_rpc_api('secure_file_operations_enabled',
  457                            rpc_method='call',
  458                            server=cluster_name or self.fake_volume_obj.host,
  459                            volume=self.fake_volume_obj,
  460                            retval=True)
  461 
  462     def test_create_group(self):
  463         self._test_rpc_api('create_group', rpc_method='cast',
  464                            server='fakehost@fakedrv', group=self.fake_group)
  465 
  466     @ddt.data(None, 'mycluster')
  467     def test_delete_group(self, cluster_name):
  468         self._change_cluster_name(self.fake_group, cluster_name)
  469         self._test_rpc_api('delete_group', rpc_method='cast',
  470                            server=cluster_name or self.fake_group.host,
  471                            group=self.fake_group)
  472 
  473     @ddt.data(None, 'mycluster')
  474     def test_update_group(self, cluster_name):
  475         self._change_cluster_name(self.fake_group, cluster_name)
  476         self._test_rpc_api('update_group', rpc_method='cast',
  477                            server=cluster_name or self.fake_group.host,
  478                            group=self.fake_group,
  479                            add_volumes=[fake.VOLUME2_ID],
  480                            remove_volumes=[fake.VOLUME3_ID])
  481 
  482     def test_create_group_from_src(self):
  483         self._test_rpc_api('create_group_from_src', rpc_method='cast',
  484                            server=self.fake_group.host, group=self.fake_group,
  485                            group_snapshot=self.fake_group_snapshot,
  486                            source_group=None)
  487 
  488     def test_create_group_snapshot(self):
  489         self._test_rpc_api('create_group_snapshot', rpc_method='cast',
  490                            server=self.fake_group_snapshot.group.host,
  491                            group_snapshot=self.fake_group_snapshot)
  492 
  493     def test_delete_group_snapshot(self):
  494         self._test_rpc_api('delete_group_snapshot', rpc_method='cast',
  495                            server=self.fake_group_snapshot.group.host,
  496                            group_snapshot=self.fake_group_snapshot)
  497 
  498     @ddt.data(('myhost', None), ('myhost', 'mycluster'))
  499     @ddt.unpack
  500     @mock.patch('cinder.volume.rpcapi.VolumeAPI._get_cctxt')
  501     def test_do_cleanup(self, host, cluster, get_cctxt_mock):
  502         cleanup_request = objects.CleanupRequest(self.context,
  503                                                  host=host,
  504                                                  cluster_name=cluster)
  505         rpcapi = volume_rpcapi.VolumeAPI()
  506         rpcapi.do_cleanup(self.context, cleanup_request)
  507         get_cctxt_mock.assert_called_once_with(
  508             cleanup_request.service_topic_queue, '3.7')
  509         get_cctxt_mock.return_value.cast.assert_called_once_with(
  510             self.context, 'do_cleanup', cleanup_request=cleanup_request)
  511 
  512     def test_do_cleanup_too_old(self):
  513         cleanup_request = objects.CleanupRequest(self.context)
  514         rpcapi = volume_rpcapi.VolumeAPI()
  515         with mock.patch.object(rpcapi.client, 'can_send_version',
  516                                return_value=False) as can_send_mock:
  517             self.assertRaises(exception.ServiceTooOld,
  518                               rpcapi.do_cleanup,
  519                               self.context,
  520                               cleanup_request)
  521             can_send_mock.assert_called_once_with('3.7')
  522 
  523     @ddt.data(('myhost', None, '3.10'), ('myhost', 'mycluster', '3.10'),
  524               ('myhost', None, '3.0'))
  525     @ddt.unpack
  526     @mock.patch('oslo_messaging.RPCClient.can_send_version')
  527     def test_get_manageable_volumes(
  528             self,
  529             host,
  530             cluster_name,
  531             version,
  532             can_send_version):
  533         can_send_version.side_effect = lambda x: x == version
  534         service = objects.Service(self.context, host=host,
  535                                   cluster_name=cluster_name)
  536         expected_kwargs_diff = {
  537             'want_objects': True} if version == '3.10' else {}
  538         self._test_rpc_api('get_manageable_volumes',
  539                            rpc_method='call',
  540                            service=service,
  541                            server=cluster_name or host,
  542                            marker=5,
  543                            limit=20,
  544                            offset=5,
  545                            sort_keys='fake_keys',
  546                            sort_dirs='fake_dirs',
  547                            expected_kwargs_diff=expected_kwargs_diff,
  548                            version=version)
  549         can_send_version.assert_has_calls([mock.call('3.10')])
  550 
  551     @ddt.data(('myhost', None, '3.10'), ('myhost', 'mycluster', '3.10'),
  552               ('myhost', None, '3.0'))
  553     @ddt.unpack
  554     @mock.patch('oslo_messaging.RPCClient.can_send_version')
  555     def test_get_manageable_snapshots(
  556             self,
  557             host,
  558             cluster_name,
  559             version,
  560             can_send_version):
  561         can_send_version.side_effect = lambda x: x == version
  562         service = objects.Service(self.context, host=host,
  563                                   cluster_name=cluster_name)
  564         expected_kwargs_diff = {
  565             'want_objects': True} if version == '3.10' else {}
  566         self._test_rpc_api('get_manageable_snapshots',
  567                            rpc_method='call',
  568                            service=service,
  569                            server=cluster_name or host,
  570                            marker=5,
  571                            limit=20,
  572                            offset=5,
  573                            sort_keys='fake_keys',
  574                            sort_dirs='fake_dirs',
  575                            expected_kwargs_diff=expected_kwargs_diff,
  576                            version=version)
  577         can_send_version.assert_has_calls([mock.call('3.10')])
  578 
  579     @mock.patch('oslo_messaging.RPCClient.can_send_version', mock.Mock())
  580     def test_set_log_levels(self):
  581         service = objects.Service(self.context, host='host1')
  582         self._test_rpc_api('set_log_levels',
  583                            rpc_method='cast',
  584                            server=service.host,
  585                            service=service,
  586                            log_request='log_request',
  587                            version='3.12')
  588 
  589     @mock.patch('oslo_messaging.RPCClient.can_send_version', mock.Mock())
  590     def test_get_log_levels(self):
  591         service = objects.Service(self.context, host='host1')
  592         self._test_rpc_api('get_log_levels',
  593                            rpc_method='call',
  594                            server=service.host,
  595                            service=service,
  596                            log_request='log_request',
  597                            version='3.12')
  598 
  599     @ddt.data(None, 'mycluster')
  600     def test_initialize_connection_snapshot(self, cluster_name):
  601         self._change_cluster_name(self.fake_snapshot.volume, cluster_name)
  602         self._test_rpc_api('initialize_connection_snapshot',
  603                            rpc_method='call',
  604                            server=(cluster_name or
  605                                    self.fake_snapshot.volume.host),
  606                            connector='fake_connector',
  607                            snapshot=self.fake_snapshot,
  608                            expected_kwargs_diff={
  609                                'snapshot_id': self.fake_snapshot.id},
  610                            version='3.13')
  611 
  612     @ddt.data(None, 'mycluster')
  613     def test_terminate_connection_snapshot(self, cluster_name):
  614         self._change_cluster_name(self.fake_snapshot.volume, cluster_name)
  615         self._test_rpc_api('terminate_connection_snapshot',
  616                            rpc_method='call',
  617                            server=(cluster_name or
  618                                    self.fake_snapshot.volume.host),
  619                            snapshot=self.fake_snapshot,
  620                            connector='fake_connector',
  621                            force=False,
  622                            retval=None,
  623                            expected_kwargs_diff={
  624                                'snapshot_id': self.fake_snapshot.id},
  625                            version='3.13')
  626 
  627     def test_remove_export_snapshot(self):
  628         self._test_rpc_api('remove_export_snapshot',
  629                            rpc_method='cast',
  630                            server=self.fake_volume_obj.host,
  631                            snapshot=self.fake_snapshot,
  632                            expected_kwargs_diff={
  633                                'snapshot_id': self.fake_snapshot.id},
  634                            version='3.13')
  635 
  636     def test_enable_replication(self):
  637         self._test_rpc_api('enable_replication', rpc_method='cast',
  638                            server=self.fake_group.host,
  639                            group=self.fake_group,
  640                            version='3.14')
  641 
  642     def test_disable_replication(self):
  643         self._test_rpc_api('disable_replication', rpc_method='cast',
  644                            server=self.fake_group.host,
  645                            group=self.fake_group,
  646                            version='3.14')
  647 
  648     def test_failover_replication(self):
  649         self._test_rpc_api('failover_replication', rpc_method='cast',
  650                            server=self.fake_group.host,
  651                            group=self.fake_group,
  652                            allow_attached_volume=False,
  653                            secondary_backend_id=None,
  654                            version='3.14')
  655 
  656     def test_list_replication_targets(self):
  657         self._test_rpc_api('list_replication_targets', rpc_method='call',
  658                            server=self.fake_group.host,
  659                            group=self.fake_group,
  660                            version='3.14')