"Fossies" - the Fresh Open Source Software Archive 
Member "manila-11.0.1/manila/tests/api/v1/test_share_manage.py" (1 Feb 2021, 11100 Bytes) of package /linux/misc/openstack/manila-11.0.1.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
See also the latest
Fossies "Diffs" side-by-side code changes report for "test_share_manage.py":
11.0.0_vs_11.0.1.
1 # Copyright 2015 Mirantis inc.
2 # All Rights Reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15
16 from unittest import mock
17
18 import ddt
19 import webob
20
21 from manila.api import common
22 from manila.api.v1 import share_manage
23 from manila.db import api as db_api
24 from manila import exception
25 from manila import policy
26 from manila.share import api as share_api
27 from manila.share import share_types
28 from manila import test
29 from manila.tests.api import fakes
30 from manila import utils
31
32
33 def get_fake_manage_body(export_path='/fake', service_host='fake@host#POOL',
34 protocol='fake', share_type='fake', **kwargs):
35 fake_share = {
36 'export_path': export_path,
37 'service_host': service_host,
38 'protocol': protocol,
39 'share_type': share_type,
40 }
41 fake_share.update(kwargs)
42 return {'share': fake_share}
43
44
45 @ddt.ddt
46 class ShareManageTest(test.TestCase):
47 """Share Manage Test."""
48 def setUp(self):
49 super(ShareManageTest, self).setUp()
50 self.controller = share_manage.ShareManageController()
51 self.resource_name = self.controller.resource_name
52 self.request = fakes.HTTPRequest.blank('/share/manage',
53 use_admin_context=True)
54 self.context = self.request.environ['manila.context']
55 self.mock_policy_check = self.mock_object(
56 policy, 'check_policy', mock.Mock(return_value=True))
57 self.mock_object(
58 common, 'validate_public_share_policy',
59 mock.Mock(side_effect=lambda *args, **kwargs: args[1]))
60
61 @ddt.data({},
62 {'shares': {}},
63 {'share': get_fake_manage_body('', None, None)},
64 {'share': get_fake_manage_body(
65 export_path={'not_path': '/fake'})})
66 def test_share_manage_invalid_body(self, body):
67 self.assertRaises(webob.exc.HTTPUnprocessableEntity,
68 self.controller.create,
69 self.request,
70 body)
71 self.mock_policy_check.assert_called_once_with(
72 self.context, self.resource_name, 'manage')
73
74 def test_share_manage_service_not_found(self):
75 body = get_fake_manage_body()
76 self.mock_object(db_api, 'service_get_by_host_and_topic', mock.Mock(
77 side_effect=exception.ServiceNotFound(service_id='fake')))
78
79 self.assertRaises(webob.exc.HTTPNotFound,
80 self.controller.create,
81 self.request,
82 body)
83 self.mock_policy_check.assert_called_once_with(
84 self.context, self.resource_name, 'manage')
85
86 def test_share_manage_share_type_not_found(self):
87 body = get_fake_manage_body()
88 self.mock_object(db_api, 'service_get_by_host_and_topic', mock.Mock())
89 self.mock_object(utils, 'service_is_up', mock.Mock(return_value=True))
90 self.mock_object(db_api, 'share_type_get_by_name', mock.Mock(
91 side_effect=exception.ShareTypeNotFoundByName(
92 share_type_name='fake')))
93
94 self.assertRaises(webob.exc.HTTPNotFound,
95 self.controller.create,
96 self.request,
97 body)
98 self.mock_policy_check.assert_called_once_with(
99 self.context, self.resource_name, 'manage')
100
101 def _setup_manage_mocks(self, service_is_up=True):
102 self.mock_object(db_api, 'service_get_by_host_and_topic', mock.Mock(
103 return_value={'host': 'fake'}))
104 self.mock_object(share_types, 'get_share_type_by_name_or_id',
105 mock.Mock(return_value={'id': 'fake'}))
106 self.mock_object(utils, 'service_is_up', mock.Mock(
107 return_value=service_is_up))
108
109 @ddt.data({'service_is_up': False, 'service_host': 'fake@host#POOL'},
110 {'service_is_up': True, 'service_host': 'fake@host'})
111 def test_share_manage_bad_request(self, settings):
112 body = get_fake_manage_body(service_host=settings.pop('service_host'))
113 self._setup_manage_mocks(**settings)
114
115 self.assertRaises(webob.exc.HTTPBadRequest,
116 self.controller.create,
117 self.request,
118 body)
119 self.mock_policy_check.assert_called_once_with(
120 self.context, self.resource_name, 'manage')
121
122 def test_share_manage_duplicate_share(self):
123 body = get_fake_manage_body()
124 exc = exception.InvalidShare(reason="fake")
125 self._setup_manage_mocks()
126 self.mock_object(share_api.API, 'manage', mock.Mock(side_effect=exc))
127
128 self.assertRaises(webob.exc.HTTPConflict,
129 self.controller.create,
130 self.request,
131 body)
132 self.mock_policy_check.assert_called_once_with(
133 self.context, self.resource_name, 'manage')
134
135 def test_share_manage_forbidden_manage(self):
136 body = get_fake_manage_body()
137 self._setup_manage_mocks()
138 error = mock.Mock(side_effect=exception.PolicyNotAuthorized(action=''))
139 self.mock_object(share_api.API, 'manage', error)
140
141 self.assertRaises(webob.exc.HTTPForbidden,
142 self.controller.create,
143 self.request,
144 body)
145 self.mock_policy_check.assert_called_once_with(
146 self.context, self.resource_name, 'manage')
147
148 def test_share_manage_forbidden_validate_service_host(self):
149 body = get_fake_manage_body()
150 self._setup_manage_mocks()
151 error = mock.Mock(side_effect=exception.PolicyNotAuthorized(action=''))
152 self.mock_object(utils, 'service_is_up', mock.Mock(side_effect=error))
153
154 self.assertRaises(webob.exc.HTTPForbidden,
155 self.controller.create,
156 self.request,
157 body)
158 self.mock_policy_check.assert_called_once_with(
159 self.context, self.resource_name, 'manage')
160
161 def test_share_manage_invalid_input(self):
162 body = get_fake_manage_body()
163 self._setup_manage_mocks()
164 error = mock.Mock(side_effect=exception.InvalidInput(message="",
165 reason="fake"))
166 self.mock_object(share_api.API, 'manage', mock.Mock(side_effect=error))
167
168 self.assertRaises(webob.exc.HTTPBadRequest,
169 self.controller.create,
170 self.request,
171 body)
172 self.mock_policy_check.assert_called_once_with(
173 self.context, self.resource_name, 'manage')
174
175 def test_share_manage_invalid_share_server(self):
176 body = get_fake_manage_body()
177 self._setup_manage_mocks()
178 error = mock.Mock(
179 side_effect=exception.InvalidShareServer(reason="")
180 )
181 self.mock_object(share_api.API, 'manage', mock.Mock(side_effect=error))
182
183 self.assertRaises(webob.exc.HTTPConflict,
184 self.controller.create,
185 self.request,
186 body)
187 self.mock_policy_check.assert_called_once_with(
188 self.context, self.resource_name, 'manage')
189
190 @ddt.data(
191 get_fake_manage_body(name='foo', description='bar'),
192 get_fake_manage_body(display_name='foo', description='bar'),
193 get_fake_manage_body(name='foo', display_description='bar'),
194 get_fake_manage_body(display_name='foo', display_description='bar'),
195 get_fake_manage_body(display_name='foo', display_description='bar',
196 driver_options=dict(volume_id='quuz')),
197 get_fake_manage_body(display_name='foo', display_description='bar',
198 export_path={'path': '/fake'}),
199 )
200 def test_share_manage(self, data):
201 self._setup_manage_mocks()
202 return_share = {'share_type_id': '', 'id': 'fake'}
203 self.mock_object(
204 share_api.API, 'manage', mock.Mock(return_value=return_share))
205 share = {
206 'host': data['share']['service_host'],
207 'export_location_path': data['share']['export_path'],
208 'share_proto': data['share']['protocol'].upper(),
209 'share_type_id': 'fake',
210 'display_name': 'foo',
211 'display_description': 'bar',
212 }
213 data['share']['is_public'] = 'foo'
214 driver_options = data['share'].get('driver_options', {})
215 if isinstance(share['export_location_path'], dict):
216 share['export_location_path'] = (
217 share['export_location_path']['path']
218 )
219
220 actual_result = self.controller.create(self.request, data)
221
222 share_api.API.manage.assert_called_once_with(
223 mock.ANY, share, driver_options)
224 self.assertIsNotNone(actual_result)
225 self.mock_policy_check.assert_called_once_with(
226 self.context, self.resource_name, 'manage')
227
228 def test_share_manage_allow_dhss_true(self):
229 self._setup_manage_mocks()
230 data = get_fake_manage_body(name='foo', description='bar')
231 return_share = {'share_type_id': '', 'id': 'fake'}
232 self.mock_object(
233 share_api.API, 'manage', mock.Mock(return_value=return_share))
234 share = {
235 'host': data['share']['service_host'],
236 'export_location_path': data['share']['export_path'],
237 'share_proto': data['share']['protocol'].upper(),
238 'share_type_id': 'fake',
239 'display_name': 'foo',
240 'display_description': 'bar',
241 'share_server_id': 'fake'
242 }
243 data['share']['share_server_id'] = 'fake'
244 driver_options = data['share'].get('driver_options', {})
245
246 self.controller._manage(self.request, data, allow_dhss_true=True)
247
248 share_api.API.manage.assert_called_once_with(
249 self.context, share, driver_options
250 )
251 self.mock_policy_check.assert_called_once_with(
252 self.context, self.resource_name, 'manage')
253
254 def test_wrong_permissions(self):
255 body = get_fake_manage_body()
256 fake_req = fakes.HTTPRequest.blank(
257 '/share/manage', use_admin_context=False)
258
259 self.assertRaises(webob.exc.HTTPForbidden,
260 self.controller.create,
261 fake_req, body)
262 self.mock_policy_check.assert_called_once_with(
263 fake_req.environ['manila.context'], self.resource_name, 'manage')