"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "manila/tests/share/drivers/netapp/dataontap/cluster_mode/test_lib_base.py" between
manila-8.1.2.tar.gz and manila-8.1.3.tar.gz

About: OpenStack Manila provides “Shared Filesystems as a service”.
The "Stein" series (maintained release).

test_lib_base.py  (manila-8.1.2):test_lib_base.py  (manila-8.1.3)
skipping to change at line 119 skipping to change at line 119
self.assertIsNotNone(self.library._app_version) self.assertIsNotNone(self.library._app_version)
def test_do_setup(self): def test_do_setup(self):
mock_get_api_client = self.mock_object(self.library, '_get_api_client') mock_get_api_client = self.mock_object(self.library, '_get_api_client')
self.mock_object( self.mock_object(
performance, 'PerformanceLibrary', performance, 'PerformanceLibrary',
mock.Mock(return_value='fake_perf_library')) mock.Mock(return_value='fake_perf_library'))
self.mock_object( self.mock_object(
self.library._client, 'check_for_cluster_credentials', self.library._client, 'check_for_cluster_credentials',
mock.Mock(return_value=True)) mock.Mock(return_value=True))
self.mock_object(
self.library, '_check_snaprestore_license',
mock.Mock(return_value=True))
self.mock_object(
self.library,
'_get_licenses',
mock.Mock(return_value=fake.LICENSES))
self.library.do_setup(self.context) self.library.do_setup(self.context)
self.assertEqual(fake.LICENSES, self.library._licenses)
mock_get_api_client.assert_called_once_with() mock_get_api_client.assert_called_once_with()
(self.library._client.check_for_cluster_credentials. (self.library._client.check_for_cluster_credentials.
assert_called_once_with()) assert_called_once_with())
self.assertEqual('fake_perf_library', self.library._perf_library) self.assertEqual('fake_perf_library', self.library._perf_library)
self.mock_object(self.library._client, self.mock_object(self.library._client,
'check_for_cluster_credentials', 'check_for_cluster_credentials',
mock.Mock(return_value=True)) mock.Mock(return_value=True))
self.mock_object(
self.library, '_check_snaprestore_license',
mock.Mock(return_value=True))
mock_set_cluster_info = self.mock_object( mock_set_cluster_info = self.mock_object(
self.library, '_set_cluster_info') self.library, '_set_cluster_info')
self.library.do_setup(self.context) self.library.do_setup(self.context)
mock_set_cluster_info.assert_called_once() mock_set_cluster_info.assert_called_once()
def test_set_cluster_info(self): def test_set_cluster_info(self):
self.library._set_cluster_info() self.library._set_cluster_info()
self.assertTrue(self.library._cluster_info['nve_support'], self.assertTrue(self.library._cluster_info['nve_support'],
fake.CLUSTER_NODES) fake.CLUSTER_NODES)
def test_check_for_setup_error(self): def test_check_for_setup_error(self):
self.library._licenses = []
self.mock_object(self.library,
'_get_licenses',
mock.Mock(return_value=['fake_license']))
mock_start_periodic_tasks = self.mock_object(self.library, mock_start_periodic_tasks = self.mock_object(self.library,
'_start_periodic_tasks') '_start_periodic_tasks')
self.library.check_for_setup_error() self.library.check_for_setup_error()
self.assertEqual(['fake_license'], self.library._licenses)
mock_start_periodic_tasks.assert_called_once_with() mock_start_periodic_tasks.assert_called_once_with()
def test_get_vserver(self): def test_get_vserver(self):
self.assertRaises(NotImplementedError, self.library._get_vserver) self.assertRaises(NotImplementedError, self.library._get_vserver)
def test_get_api_client(self): def test_get_api_client(self):
client_kwargs = fake.CLIENT_KWARGS.copy() client_kwargs = fake.CLIENT_KWARGS.copy()
# First call should proceed normally. # First call should proceed normally.
skipping to change at line 334 skipping to change at line 338
self.mock_object(self.library._client, self.mock_object(self.library._client,
'get_vserver_aggregate_capacities', 'get_vserver_aggregate_capacities',
mock.Mock(return_value=fake.AGGREGATE_CAPACITIES)) mock.Mock(return_value=fake.AGGREGATE_CAPACITIES))
result = self.library._get_aggregate_space() result = self.library._get_aggregate_space()
(self.library._client.get_vserver_aggregate_capacities. (self.library._client.get_vserver_aggregate_capacities.
assert_called_once_with(fake.AGGREGATES)) assert_called_once_with(fake.AGGREGATES))
self.assertDictEqual(fake.AGGREGATE_CAPACITIES, result) self.assertDictEqual(fake.AGGREGATE_CAPACITIES, result)
def test_check_snaprestore_license_notfound(self): def test_check_snaprestore_license_admin_notfound(self):
self.library._have_cluster_creds = True
licenses = list(fake.LICENSES) licenses = list(fake.LICENSES)
licenses.remove('snaprestore') licenses.remove('snaprestore')
self.mock_object(self.client, self.mock_object(self.client,
'get_licenses', 'get_licenses',
mock.Mock(return_value=licenses)) mock.Mock(return_value=licenses))
result = self.library._check_snaprestore_license() result = self.library._check_snaprestore_license()
self.assertIs(False, result) self.assertIs(False, result)
def test_check_snaprestore_license_found(self): def test_check_snaprestore_license_admin_found(self):
self.mock_object(self.client, self.library._have_cluster_creds = True
'get_licenses', self.library._licenses = fake.LICENSES
mock.Mock(return_value=fake.LICENSES))
result = self.library._check_snaprestore_license() result = self.library._check_snaprestore_license()
self.assertIs(True, result) self.assertIs(True, result)
def test_check_snaprestore_license_svm_scoped_notfound(self):
self.library._have_cluster_creds = False
self.mock_object(self.library._client,
'restore_snapshot',
mock.Mock(side_effect=netapp_api.NaApiError(
code=netapp_api.EAPIERROR,
message=fake.NO_SNAPRESTORE_LICENSE)))
result = self.library._check_snaprestore_license()
self.assertIs(False, result)
def test_check_snaprestore_license_svm_scoped_found(self):
self.library._have_cluster_creds = False
self.mock_object(self.library._client,
'restore_snapshot',
mock.Mock(side_effect=netapp_api.NaApiError(
code=netapp_api.EAPIERROR,
message='Other error')))
result = self.library._check_snaprestore_license()
self.assertIs(True, result)
def test_check_snaprestore_license_svm_scoped_found_exception(self):
self.mock_object(lib_base.LOG, 'exception')
self.library._have_cluster_creds = False
self.mock_object(self.library._client,
'restore_snapshot',
mock.Mock(return_value=None))
self.assertRaises(
exception.NetAppException,
self.library._check_snaprestore_license)
lib_base.LOG.exception.assert_called_once()
def test_get_aggregate_node_cluster_creds(self): def test_get_aggregate_node_cluster_creds(self):
self.library._have_cluster_creds = True self.library._have_cluster_creds = True
self.mock_object(self.library._client, self.mock_object(self.library._client,
'get_node_for_aggregate', 'get_node_for_aggregate',
mock.Mock(return_value=fake.CLUSTER_NODE)) mock.Mock(return_value=fake.CLUSTER_NODE))
result = self.library._get_aggregate_node(fake.AGGREGATE) result = self.library._get_aggregate_node(fake.AGGREGATE)
(self.library._client.get_node_for_aggregate. (self.library._client.get_node_for_aggregate.
skipping to change at line 450 skipping to change at line 486
result = self.library.get_share_server_pools(fake.SHARE_SERVER) result = self.library.get_share_server_pools(fake.SHARE_SERVER)
self.assertListEqual(fake.POOLS, result) self.assertListEqual(fake.POOLS, result)
def test_get_pools(self): def test_get_pools(self):
self.mock_object( self.mock_object(
self.library, '_get_aggregate_space', self.library, '_get_aggregate_space',
mock.Mock(return_value=fake.AGGREGATE_CAPACITIES)) mock.Mock(return_value=fake.AGGREGATE_CAPACITIES))
self.library._have_cluster_creds = True self.library._have_cluster_creds = True
self.library._revert_to_snapshot_support = True
self.library._cluster_info = fake.CLUSTER_INFO self.library._cluster_info = fake.CLUSTER_INFO
self.library._ssc_stats = fake.SSC_INFO self.library._ssc_stats = fake.SSC_INFO
self.library._perf_library.get_node_utilization_for_pool = ( self.library._perf_library.get_node_utilization_for_pool = (
mock.Mock(side_effect=[30.0, 42.0])) mock.Mock(side_effect=[30.0, 42.0]))
self.mock_object(self.library,
'_check_snaprestore_license',
mock.Mock(return_value=True))
result = self.library._get_pools(filter_function='filter', result = self.library._get_pools(filter_function='filter',
goodness_function='goodness') goodness_function='goodness')
self.assertListEqual(fake.POOLS, result) self.assertListEqual(fake.POOLS, result)
def test_get_pools_vserver_creds(self): def test_get_pools_vserver_creds(self):
self.mock_object( self.mock_object(
self.library, '_get_aggregate_space', self.library, '_get_aggregate_space',
mock.Mock(return_value=fake.AGGREGATE_CAPACITIES_VSERVER_CREDS)) mock.Mock(return_value=fake.AGGREGATE_CAPACITIES_VSERVER_CREDS))
self.library._have_cluster_creds = False self.library._have_cluster_creds = False
self.library._revert_to_snapshot_support = True
self.library._cluster_info = fake.CLUSTER_INFO self.library._cluster_info = fake.CLUSTER_INFO
self.library._ssc_stats = fake.SSC_INFO_VSERVER_CREDS self.library._ssc_stats = fake.SSC_INFO_VSERVER_CREDS
self.library._perf_library.get_node_utilization_for_pool = ( self.library._perf_library.get_node_utilization_for_pool = (
mock.Mock(side_effect=[50.0, 50.0])) mock.Mock(side_effect=[50.0, 50.0]))
self.mock_object(self.library,
'_check_snaprestore_license',
mock.Mock(return_value=True))
result = self.library._get_pools() result = self.library._get_pools()
self.assertListEqual(fake.POOLS_VSERVER_CREDS, result) self.assertListEqual(fake.POOLS_VSERVER_CREDS, result)
def test_handle_ems_logging(self): def test_handle_ems_logging(self):
self.mock_object(self.library, self.mock_object(self.library,
'_build_ems_log_message_0', '_build_ems_log_message_0',
mock.Mock(return_value=fake.EMS_MESSAGE_0)) mock.Mock(return_value=fake.EMS_MESSAGE_0))
 End of changes. 13 change blocks. 
18 lines changed or deleted 50 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)