"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "manila/tests/share/test_api.py" between
manila-11.0.0.tar.gz and manila-11.0.1.tar.gz

About: OpenStack Manila provides “Shared Filesystems as a service”.
The "Victoria" series (latest release).

test_api.py  (manila-11.0.0):test_api.py  (manila-11.0.1)
skipping to change at line 397 skipping to change at line 397
shares = self.api.get_all(ctx, {'name': 'foo', 'all_tenants': 1}) shares = self.api.get_all(ctx, {'name': 'foo', 'all_tenants': 1})
share_api.policy.check_policy.assert_has_calls([ share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'), mock.call(ctx, 'share', 'get_all'),
]) ])
db_api.share_get_all.assert_called_once_with( db_api.share_get_all.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at', filters={}) ctx, sort_dir='desc', sort_key='created_at', filters={})
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[:1], shares) self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[:1], shares)
def test_get_all_admin_filter_by_status(self): def test_get_all_admin_filter_by_status(self):
ctx = context.RequestContext('fake_uid', 'fake_pid_2', is_admin=True) ctx = context.RequestContext('fake_uid', 'fake_pid_2', is_admin=True)
self.mock_object(db_api, 'share_get_all_by_project', expected_filter = {'status': constants.STATUS_AVAILABLE}
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[1:])) self.mock_object(
db_api, 'share_get_all_by_project',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[0::2]))
shares = self.api.get_all(ctx, {'status': constants.STATUS_AVAILABLE}) shares = self.api.get_all(ctx, {'status': constants.STATUS_AVAILABLE})
share_api.policy.check_policy.assert_has_calls([ share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'), mock.call(ctx, 'share', 'get_all'),
]) ])
db_api.share_get_all_by_project.assert_called_once_with( db_api.share_get_all_by_project.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at', ctx, sort_dir='desc', sort_key='created_at',
project_id='fake_pid_2', filters={}, is_public=False project_id='fake_pid_2', filters=expected_filter, is_public=False
) )
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[2::4], shares) self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[0::2], shares)
def test_get_all_admin_filter_by_status_and_all_tenants(self): def test_get_all_admin_filter_by_status_and_all_tenants(self):
ctx = context.RequestContext('fake_uid', 'fake_pid_2', is_admin=True) ctx = context.RequestContext('fake_uid', 'fake_pid_2', is_admin=True)
self.mock_object(db_api, 'share_get_all', self.mock_object(
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES)) db_api, 'share_get_all',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[1::2]))
expected_filter = {'status': constants.STATUS_ERROR}
shares = self.api.get_all( shares = self.api.get_all(
ctx, {'status': constants.STATUS_ERROR, 'all_tenants': 1}) ctx, {'status': constants.STATUS_ERROR, 'all_tenants': 1})
share_api.policy.check_policy.assert_has_calls([ share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'), mock.call(ctx, 'share', 'get_all'),
]) ])
db_api.share_get_all.assert_called_once_with( db_api.share_get_all.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at', filters={}) ctx, sort_dir='desc', sort_key='created_at',
filters=expected_filter)
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[1::2], shares) self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[1::2], shares)
def test_get_all_non_admin_filter_by_all_tenants(self): def test_get_all_non_admin_filter_by_all_tenants(self):
# Expected share list only by project of non-admin user # Expected share list only by project of non-admin user
ctx = context.RequestContext('fake_uid', 'fake_pid_2', is_admin=False) ctx = context.RequestContext('fake_uid', 'fake_pid_2', is_admin=False)
self.mock_object(db_api, 'share_get_all_by_project', self.mock_object(db_api, 'share_get_all_by_project',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[1:])) mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[1:]))
shares = self.api.get_all(ctx, {'all_tenants': 1}) shares = self.api.get_all(ctx, {'all_tenants': 1})
share_api.policy.check_policy.assert_has_calls([ share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'), mock.call(ctx, 'share', 'get_all'),
skipping to change at line 446 skipping to change at line 452
def test_get_all_non_admin_with_name_and_status_filters(self): def test_get_all_non_admin_with_name_and_status_filters(self):
ctx = context.RequestContext('fake_uid', 'fake_pid_2', is_admin=False) ctx = context.RequestContext('fake_uid', 'fake_pid_2', is_admin=False)
self.mock_object(db_api, 'share_get_all_by_project', self.mock_object(db_api, 'share_get_all_by_project',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[1:])) mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[1:]))
shares = self.api.get_all( shares = self.api.get_all(
ctx, {'name': 'bar', 'status': constants.STATUS_ERROR}) ctx, {'name': 'bar', 'status': constants.STATUS_ERROR})
share_api.policy.check_policy.assert_has_calls([ share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'), mock.call(ctx, 'share', 'get_all'),
]) ])
expected_filter_1 = {'status': constants.STATUS_ERROR}
expected_filter_2 = {'status': constants.STATUS_AVAILABLE}
db_api.share_get_all_by_project.assert_called_once_with( db_api.share_get_all_by_project.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at', ctx, sort_dir='desc', sort_key='created_at',
project_id='fake_pid_2', filters={}, is_public=False project_id='fake_pid_2', filters=expected_filter_1, is_public=False
) )
# two items expected, one filtered # two items expected, one filtered
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[1::2], shares) self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[1::2], shares)
# one item expected, two filtered # one item expected, two filtered
shares = self.api.get_all( shares = self.api.get_all(
ctx, {'name': 'foo1', 'status': constants.STATUS_AVAILABLE}) ctx, {'name': 'foo1', 'status': constants.STATUS_AVAILABLE})
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[2::4], shares) self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[2::4], shares)
share_api.policy.check_policy.assert_has_calls([ share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'), mock.call(ctx, 'share', 'get_all'),
mock.call(ctx, 'share', 'get_all'), mock.call(ctx, 'share', 'get_all'),
]) ])
db_api.share_get_all_by_project.assert_has_calls([ db_api.share_get_all_by_project.assert_has_calls([
mock.call(ctx, sort_dir='desc', sort_key='created_at', mock.call(
project_id='fake_pid_2', filters={}, is_public=False), ctx, sort_dir='desc', sort_key='created_at',
mock.call(ctx, sort_dir='desc', sort_key='created_at', project_id='fake_pid_2', filters=expected_filter_1,
project_id='fake_pid_2', filters={}, is_public=False), is_public=False),
mock.call(
ctx, sort_dir='desc', sort_key='created_at',
project_id='fake_pid_2', filters=expected_filter_2,
is_public=False),
]) ])
@ddt.data('True', 'true', '1', 'yes', 'y', 'on', 't', True) @ddt.data('True', 'true', '1', 'yes', 'y', 'on', 't', True)
def test_get_all_non_admin_public(self, is_public): def test_get_all_non_admin_public(self, is_public):
ctx = context.RequestContext('fake_uid', 'fake_pid_2', ctx = context.RequestContext('fake_uid', 'fake_pid_2',
is_admin=False) is_admin=False)
self.mock_object(db_api, 'share_get_all_by_project', mock.Mock( self.mock_object(db_api, 'share_get_all_by_project', mock.Mock(
return_value=_FAKE_LIST_OF_ALL_SHARES[1:])) return_value=_FAKE_LIST_OF_ALL_SHARES[1:]))
shares = self.api.get_all(ctx, {'is_public': is_public}) shares = self.api.get_all(ctx, {'is_public': is_public})
share_api.policy.check_policy.assert_has_calls([ share_api.policy.check_policy.assert_has_calls([
skipping to change at line 1018 skipping to change at line 1031
{'replication_type': 'dr', 'dhss': False, 'share_server_id': None}, {'replication_type': 'dr', 'dhss': False, 'share_server_id': None},
{'replication_type': 'readable', 'dhss': False, {'replication_type': 'readable', 'dhss': False,
'share_server_id': None}, 'share_server_id': None},
{'replication_type': None, 'dhss': False, 'share_server_id': None}, {'replication_type': None, 'dhss': False, 'share_server_id': None},
{'replication_type': None, 'dhss': True, 'share_server_id': 'fake'} {'replication_type': None, 'dhss': True, 'share_server_id': 'fake'}
) )
@ddt.unpack @ddt.unpack
def test_manage_new(self, replication_type, dhss, share_server_id): def test_manage_new(self, replication_type, dhss, share_server_id):
share_data = { share_data = {
'host': 'fake', 'host': 'fake',
'export_location': 'fake', 'export_location_path': 'fake',
'share_proto': 'fake', 'share_proto': 'fake',
'share_type_id': 'fake', 'share_type_id': 'fake',
} }
if dhss: if dhss:
share_data['share_server_id'] = share_server_id share_data['share_server_id'] = share_server_id
driver_options = {} driver_options = {}
date = datetime.datetime(1, 1, 1, 1, 1, 1) date = datetime.datetime(1, 1, 1, 1, 1, 1)
timeutils.utcnow.return_value = date timeutils.utcnow.return_value = date
fake_subnet = db_utils.create_share_network_subnet( fake_subnet = db_utils.create_share_network_subnet(
share_network_id='fake') share_network_id='fake')
skipping to change at line 1062 skipping to change at line 1075
mock.Mock(return_value=share)) mock.Mock(return_value=share))
self.mock_object(db_api, 'share_export_locations_update') self.mock_object(db_api, 'share_export_locations_update')
self.mock_object(db_api, 'share_get', self.mock_object(db_api, 'share_get',
mock.Mock(return_value=share)) mock.Mock(return_value=share))
self.mock_object(share_types, 'get_share_type', self.mock_object(share_types, 'get_share_type',
mock.Mock(return_value=fake_type)) mock.Mock(return_value=fake_type))
self.mock_object(db_api, 'share_server_get', self.mock_object(db_api, 'share_server_get',
mock.Mock(return_value=share_server)) mock.Mock(return_value=share_server))
self.mock_object(db_api, 'share_network_subnet_get', self.mock_object(db_api, 'share_network_subnet_get',
mock.Mock(return_value=fake_subnet)) mock.Mock(return_value=fake_subnet))
self.mock_object(self.api, 'get_all', mock.Mock(return_value=[])) self.mock_object(db_api, 'share_instances_get_all',
mock.Mock(return_value=[]))
self.api.manage(self.context, copy.deepcopy(share_data), self.api.manage(self.context, copy.deepcopy(share_data),
driver_options) driver_options)
share_data.update({ share_data.update({
'user_id': self.context.user_id, 'user_id': self.context.user_id,
'project_id': self.context.project_id, 'project_id': self.context.project_id,
'status': constants.STATUS_MANAGING, 'status': constants.STATUS_MANAGING,
'scheduled_at': date, 'scheduled_at': date,
'snapshot_support': fake_type['extra_specs']['snapshot_support'], 'snapshot_support': fake_type['extra_specs']['snapshot_support'],
skipping to change at line 1089 skipping to change at line 1103
'replication_type': replication_type, 'replication_type': replication_type,
}) })
expected_request_spec = self._get_request_spec_dict( expected_request_spec = self._get_request_spec_dict(
share, fake_type, size=0, share_proto=share_data['share_proto'], share, fake_type, size=0, share_proto=share_data['share_proto'],
host=share_data['host']) host=share_data['host'])
if dhss: if dhss:
share_data.update({ share_data.update({
'share_network_id': fake_subnet['share_network_id']}) 'share_network_id': fake_subnet['share_network_id']})
export_location = share_data.pop('export_location') export_location = share_data.pop('export_location_path')
self.api.get_all.assert_called_once_with(self.context, mock.ANY) filters = {'export_location_path': export_location,
'host': share_data['host']
}
if share_server_id:
filters['share_server_id'] = share_server_id
db_api.share_instances_get_all.assert_called_once_with(
self.context, filters=filters)
db_api.share_create.assert_called_once_with(self.context, share_data) db_api.share_create.assert_called_once_with(self.context, share_data)
db_api.share_get.assert_called_once_with(self.context, share['id']) db_api.share_get.assert_called_once_with(self.context, share['id'])
db_api.share_export_locations_update.assert_called_once_with( db_api.share_export_locations_update.assert_called_once_with(
self.context, share.instance['id'], export_location self.context, share.instance['id'], export_location)
)
self.scheduler_rpcapi.manage_share.assert_called_once_with( self.scheduler_rpcapi.manage_share.assert_called_once_with(
self.context, share['id'], driver_options, expected_request_spec) self.context, share['id'], driver_options, expected_request_spec)
if dhss: if dhss:
db_api.share_server_get.assert_called_once_with( db_api.share_server_get.assert_called_once_with(
self.context, share_data['share_server_id']) self.context, share_data['share_server_id'])
db_api.share_network_subnet_get.assert_called_once_with( db_api.share_network_subnet_get.assert_called_once_with(
self.context, share_server['share_network_subnet_id']) self.context, share_server['share_network_subnet_id'])
@ddt.data((True, exception.InvalidInput, True), @ddt.data((True, exception.InvalidInput, True),
(True, exception.InvalidInput, False), (True, exception.InvalidInput, False),
(False, exception.InvalidInput, True), (False, exception.InvalidInput, True),
(True, exception.InvalidInput, True)) (True, exception.InvalidInput, True))
@ddt.unpack @ddt.unpack
def test_manage_new_dhss_true_and_false(self, dhss, exception_type, def test_manage_new_dhss_true_and_false(self, dhss, exception_type,
has_share_server_id): has_share_server_id):
share_data = { share_data = {
'host': 'fake', 'host': 'fake',
'export_location': 'fake', 'export_location_path': 'fake',
'share_proto': 'fake', 'share_proto': 'fake',
'share_type_id': 'fake', 'share_type_id': 'fake',
} }
if has_share_server_id: if has_share_server_id:
share_data['share_server_id'] = 'fake' share_data['share_server_id'] = 'fake'
driver_options = {} driver_options = {}
date = datetime.datetime(1, 1, 1, 1, 1, 1) date = datetime.datetime(1, 1, 1, 1, 1, 1)
timeutils.utcnow.return_value = date timeutils.utcnow.return_value = date
fake_type = { fake_type = {
skipping to change at line 1136 skipping to change at line 1155
'snapshot_support': False, 'snapshot_support': False,
'create_share_from_snapshot_support': False, 'create_share_from_snapshot_support': False,
'revert_to_snapshot_support': False, 'revert_to_snapshot_support': False,
'mount_snapshot_support': False, 'mount_snapshot_support': False,
'driver_handles_share_servers': dhss, 'driver_handles_share_servers': dhss,
}, },
} }
self.mock_object(share_types, 'get_share_type', self.mock_object(share_types, 'get_share_type',
mock.Mock(return_value=fake_type)) mock.Mock(return_value=fake_type))
self.mock_object(self.api, 'get_all', mock.Mock(return_value=[])) self.mock_object(db_api, 'share_instances_get_all',
mock.Mock(return_value=[]))
self.assertRaises(exception_type, self.assertRaises(exception_type,
self.api.manage, self.api.manage,
self.context, self.context,
share_data=share_data, share_data=share_data,
driver_options=driver_options driver_options=driver_options
) )
share_types.get_share_type.assert_called_once_with( share_types.get_share_type.assert_called_once_with(
self.context, share_data['share_type_id'] self.context, share_data['share_type_id']
) )
self.api.get_all.assert_called_once_with( filters = {'export_location_path': share_data['export_location_path'],
self.context, { 'host': share_data['host']
'host': share_data['host'], }
'export_location': share_data['export_location'], if has_share_server_id:
'share_proto': share_data['share_proto'], filters['share_server_id'] = 'fake'
'share_type_id': share_data['share_type_id'] db_api.share_instances_get_all.assert_called_once_with(
} self.context, filters=filters)
)
def test_manage_new_share_server_not_found(self): def test_manage_new_share_server_not_found(self):
share_data = { share_data = {
'host': 'fake', 'host': 'fake',
'export_location': 'fake', 'export_location_path': 'fake',
'share_proto': 'fake', 'share_proto': 'fake',
'share_type_id': 'fake', 'share_type_id': 'fake',
'share_server_id': 'fake' 'share_server_id': 'fake'
} }
driver_options = {} driver_options = {}
date = datetime.datetime(1, 1, 1, 1, 1, 1) date = datetime.datetime(1, 1, 1, 1, 1, 1)
timeutils.utcnow.return_value = date timeutils.utcnow.return_value = date
fake_type = { fake_type = {
skipping to change at line 1183 skipping to change at line 1202
'replication_type': 'dr', 'replication_type': 'dr',
'create_share_from_snapshot_support': False, 'create_share_from_snapshot_support': False,
'revert_to_snapshot_support': False, 'revert_to_snapshot_support': False,
'mount_snapshot_support': False, 'mount_snapshot_support': False,
'driver_handles_share_servers': True, 'driver_handles_share_servers': True,
}, },
} }
self.mock_object(share_types, 'get_share_type', self.mock_object(share_types, 'get_share_type',
mock.Mock(return_value=fake_type)) mock.Mock(return_value=fake_type))
self.mock_object(self.api, 'get_all', mock.Mock(return_value=[])) self.mock_object(db_api, 'share_instances_get_all',
mock.Mock(return_value=[]))
self.assertRaises(exception.InvalidInput, self.assertRaises(exception.InvalidInput,
self.api.manage, self.api.manage,
self.context, self.context,
share_data=share_data, share_data=share_data,
driver_options=driver_options driver_options=driver_options
) )
share_types.get_share_type.assert_called_once_with( share_types.get_share_type.assert_called_once_with(
self.context, share_data['share_type_id'] self.context, share_data['share_type_id']
) )
self.api.get_all.assert_called_once_with( db_api.share_instances_get_all.assert_called_once_with(
self.context, { self.context, filters={
'export_location_path': share_data['export_location_path'],
'host': share_data['host'], 'host': share_data['host'],
'export_location': share_data['export_location'], 'share_server_id': share_data['share_server_id']
'share_proto': share_data['share_proto'],
'share_type_id': share_data['share_type_id']
} }
) )
def test_manage_new_share_server_not_active(self): def test_manage_new_share_server_not_active(self):
share_data = { share_data = {
'host': 'fake', 'host': 'fake',
'export_location': 'fake', 'export_location_path': 'fake',
'share_proto': 'fake', 'share_proto': 'fake',
'share_type_id': 'fake', 'share_type_id': 'fake',
'share_server_id': 'fake' 'share_server_id': 'fake'
} }
fake_share_data = { fake_share_data = {
'id': 'fakeid', 'id': 'fakeid',
'status': constants.STATUS_ERROR, 'status': constants.STATUS_ERROR,
} }
driver_options = {} driver_options = {}
skipping to change at line 1236 skipping to change at line 1255
'revert_to_snapshot_support': False, 'revert_to_snapshot_support': False,
'mount_snapshot_support': False, 'mount_snapshot_support': False,
'driver_handles_share_servers': True, 'driver_handles_share_servers': True,
}, },
} }
share = db_api.share_create(self.context, fake_share_data) share = db_api.share_create(self.context, fake_share_data)
self.mock_object(share_types, 'get_share_type', self.mock_object(share_types, 'get_share_type',
mock.Mock(return_value=fake_type)) mock.Mock(return_value=fake_type))
self.mock_object(self.api, 'get_all', mock.Mock(return_value=[])) self.mock_object(db_api, 'share_instances_get_all',
mock.Mock(return_value=[]))
self.mock_object(db_api, 'share_server_get', self.mock_object(db_api, 'share_server_get',
mock.Mock(return_value=share)) mock.Mock(return_value=share))
self.assertRaises(exception.InvalidShareServer, self.assertRaises(exception.InvalidShareServer,
self.api.manage, self.api.manage,
self.context, self.context,
share_data=share_data, share_data=share_data,
driver_options=driver_options driver_options=driver_options
) )
share_types.get_share_type.assert_called_once_with( share_types.get_share_type.assert_called_once_with(
self.context, share_data['share_type_id'] self.context, share_data['share_type_id']
) )
self.api.get_all.assert_called_once_with( db_api.share_instances_get_all.assert_called_once_with(
self.context, { self.context, filters={
'export_location_path': share_data['export_location_path'],
'host': share_data['host'], 'host': share_data['host'],
'export_location': share_data['export_location'], 'share_server_id': share_data['share_server_id']
'share_proto': share_data['share_proto'],
'share_type_id': share_data['share_type_id']
} }
) )
db_api.share_server_get.assert_called_once_with( db_api.share_server_get.assert_called_once_with(
self.context, share_data['share_server_id'] self.context, share_data['share_server_id']
) )
@ddt.data(constants.STATUS_MANAGE_ERROR, constants.STATUS_AVAILABLE) @ddt.data(constants.STATUS_MANAGE_ERROR, constants.STATUS_AVAILABLE)
def test_manage_duplicate(self, status): def test_manage_duplicate(self, status):
share_data = { share_data = {
'host': 'fake', 'host': 'fake',
'export_location': 'fake', 'export_location_path': 'fake',
'share_proto': 'fake', 'share_proto': 'fake',
'share_type_id': 'fake', 'share_type_id': 'fake',
} }
driver_options = {} driver_options = {}
fake_type = { fake_type = {
'id': 'fake_type_id', 'id': 'fake_type_id',
'extra_specs': { 'extra_specs': {
'snapshot_support': False, 'snapshot_support': False,
'create_share_from_snapshot_support': False, 'create_share_from_snapshot_support': False,
'driver_handles_share_servers': False, 'driver_handles_share_servers': False,
}, },
} }
shares = [{'id': 'fake', 'status': status}] already_managed = [{'id': 'fake', 'status': status}]
self.mock_object(self.api, 'get_all', self.mock_object(db_api, 'share_instances_get_all',
mock.Mock(return_value=shares)) mock.Mock(return_value=already_managed))
self.mock_object(share_types, 'get_share_type', self.mock_object(share_types, 'get_share_type',
mock.Mock(return_value=fake_type)) mock.Mock(return_value=fake_type))
self.assertRaises(exception.InvalidShare, self.api.manage, self.assertRaises(exception.InvalidShare, self.api.manage,
self.context, share_data, driver_options) self.context, share_data, driver_options)
def _get_request_spec_dict(self, share, share_type, **kwargs): def _get_request_spec_dict(self, share, share_type, **kwargs):
if share is None: if share is None:
share = {'instance': {}} share = {'instance': {}}
 End of changes. 25 change blocks. 
46 lines changed or deleted 65 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)