"Fossies" - the Fresh Open Source Software Archive 
Member "neutron-14.0.3/neutron/tests/unit/db/test_rbac_db_mixin.py" (22 Oct 2019, 14899 Bytes) of package /linux/misc/openstack/neutron-14.0.3.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
See also the latest
Fossies "Diffs" side-by-side code changes report for "test_rbac_db_mixin.py":
14.0.2_vs_14.0.3.
1 # Copyright (c) 2016 OpenStack Foundation.
2 # All Rights Reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15
16
17 import mock
18 from neutron_lib.callbacks import events
19 from neutron_lib import constants
20 from neutron_lib import context
21 from oslo_utils import uuidutils
22 import testtools
23
24 from neutron.db.db_base_plugin_v2 import NeutronDbPluginV2 as db_plugin_v2
25 from neutron.db import rbac_db_models
26 from neutron.extensions import rbac as ext_rbac
27 from neutron.objects import network as network_obj
28 from neutron.objects.qos import policy as qos_policy_obj
29 from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin
30
31
32 class NetworkRbacTestcase(test_plugin.NeutronDbPluginV2TestCase):
33 def setUp(self):
34 self.context = context.get_admin_context()
35 super(NetworkRbacTestcase, self).setUp(plugin='ml2')
36
37 def _make_networkrbac(self, network, target, action='access_as_shared'):
38 policy = {
39 'rbac_policy': {'project_id': network['network']['project_id'],
40 'object_id': network['network']['id'],
41 'object_type': 'network',
42 'action': action,
43 'target_tenant': target}}
44 return policy
45
46 def _setup_networkrbac_and_port(self, network, target_tenant):
47 policy = self._make_networkrbac(network, target_tenant)
48 netrbac = self.plugin.create_rbac_policy(self.context, policy)
49
50 test_port = {'port': {'name': 'test-port',
51 'network_id': network['network']['id'],
52 'mac_address': constants.ATTR_NOT_SPECIFIED,
53 'fixed_ips': constants.ATTR_NOT_SPECIFIED,
54 'admin_state_up': True,
55 'device_id': 'device_id',
56 'device_owner': 'device_owner',
57 'project_id': target_tenant,
58 'tenant_id': target_tenant}}
59
60 port = self.plugin.create_port(self.context, test_port)
61 return netrbac, port
62
63 def _assert_external_net_state(self, net_id, is_external):
64 req = self.new_show_request('networks', net_id)
65 res = self.deserialize(self.fmt, req.get_response(self.api))
66 self.assertEqual(is_external, res['network']['router:external'])
67
68 def test_create_network_rbac_external(self):
69 with self.network() as ext_net:
70 net_id = ext_net['network']['id']
71 self._assert_external_net_state(net_id, is_external=False)
72 policy = self._make_networkrbac(ext_net,
73 '*',
74 'access_as_external')
75 self.plugin.create_rbac_policy(self.context, policy)
76 self._assert_external_net_state(net_id, is_external=True)
77
78 def test_create_network_rbac_shared_existing(self):
79 tenant = 'test-tenant'
80 with self.network() as net:
81 policy = self._make_networkrbac(net,
82 tenant,
83 rbac_db_models.ACCESS_SHARED)
84 self.plugin.create_rbac_policy(self.context, policy)
85 # Give server maximum of 10 seconds to make sure we don't hit DB
86 # retry mechanism when resource already exists
87 with self.assert_max_execution_time(10):
88 with testtools.ExpectedException(
89 ext_rbac.DuplicateRbacPolicy):
90 self.plugin.create_rbac_policy(self.context, policy)
91
92 def test_update_network_rbac_external_valid(self):
93 orig_target = 'test-tenant-2'
94 new_target = 'test-tenant-3'
95
96 with self.network() as ext_net:
97 policy = self._make_networkrbac(ext_net,
98 orig_target,
99 'access_as_external')
100 netrbac = self.plugin.create_rbac_policy(self.context, policy)
101 update_policy = {'rbac_policy': {'target_tenant': new_target}}
102
103 netrbac2 = self.plugin.update_rbac_policy(self.context,
104 netrbac['id'],
105 update_policy)
106
107 policy['rbac_policy']['target_tenant'] = new_target
108 for k, v in policy['rbac_policy'].items():
109 self.assertEqual(netrbac2[k], v)
110
111 def test_delete_network_rbac_external(self):
112 with self.network() as ext_net:
113 net_id = ext_net['network']['id']
114 self._assert_external_net_state(net_id, is_external=False)
115 policy = self._make_networkrbac(ext_net,
116 '*',
117 'access_as_external')
118 net_rbac = self.plugin.create_rbac_policy(self.context, policy)
119 self._assert_external_net_state(net_id, is_external=True)
120 self.plugin.delete_rbac_policy(self.context, net_rbac['id'])
121 self._assert_external_net_state(net_id, is_external=False)
122
123 def test_delete_network_rbac_external_with_multi_rbac_policy(self):
124 with self.network() as ext_net:
125 net_id = ext_net['network']['id']
126 self._assert_external_net_state(net_id, is_external=False)
127 policy1 = self._make_networkrbac(ext_net,
128 'test-tenant-1',
129 'access_as_external')
130 net_rbac1 = self.plugin.create_rbac_policy(self.context, policy1)
131 self._assert_external_net_state(net_id, is_external=True)
132 policy2 = self._make_networkrbac(ext_net,
133 'test-tenant-2',
134 'access_as_external')
135 self.plugin.create_rbac_policy(self.context, policy2)
136 self._assert_external_net_state(net_id, is_external=True)
137 self.plugin.delete_rbac_policy(self.context, net_rbac1['id'])
138 self._assert_external_net_state(net_id, is_external=True)
139
140 def test_delete_external_network_shared_rbac(self):
141 with self.network() as ext_net:
142 net_id = ext_net['network']['id']
143 self.plugin.update_network(
144 self.context, net_id,
145 {'network': {'router:external': True}})
146 self._assert_external_net_state(net_id, is_external=True)
147 policy = self._make_networkrbac(ext_net, 'test-tenant-2')
148 net_rbac = self.plugin.create_rbac_policy(self.context, policy)
149 self.plugin.delete_rbac_policy(self.context, net_rbac['id'])
150 # Make sure that external attribute not changed.
151 self._assert_external_net_state(net_id, is_external=True)
152
153 def test_update_networkrbac_valid(self):
154 orig_target = 'test-tenant-2'
155 new_target = 'test-tenant-3'
156
157 with self.network() as net:
158 policy = self._make_networkrbac(net, orig_target)
159 netrbac = self.plugin.create_rbac_policy(self.context, policy)
160 update_policy = {'rbac_policy': {'target_tenant': new_target}}
161
162 netrbac2 = self.plugin.update_rbac_policy(self.context,
163 netrbac['id'],
164 update_policy)
165
166 policy['rbac_policy']['target_tenant'] = new_target
167 for k, v in policy['rbac_policy'].items():
168 self.assertEqual(netrbac2[k], v)
169
170 def test_delete_networkrbac_in_use_fail(self):
171 with self.network() as net:
172 netrbac, _ = self._setup_networkrbac_and_port(
173 network=net, target_tenant='test-tenant-2')
174
175 self.assertRaises(ext_rbac.RbacPolicyInUse,
176 self.plugin.delete_rbac_policy,
177 self.context, netrbac['id'])
178
179 def test_port_presence_prevents_network_rbac_policy_deletion(self):
180 with self.network() as net:
181 netrbac, port = self._setup_networkrbac_and_port(
182 network=net, target_tenant='alice')
183 self.assertRaises(ext_rbac.RbacPolicyInUse,
184 self.plugin.delete_rbac_policy,
185 self.context, netrbac['id'])
186
187 # a wildcard policy should allow the specific policy to be deleted
188 # since it allows the remaining port
189 wild_policy = self._make_networkrbac(net, '*')
190 wild_policy = self.plugin.create_rbac_policy(self.context,
191 wild_policy)
192 self.plugin.delete_rbac_policy(self.context, netrbac['id'])
193
194 # now that wildcard is the only remaining, it should be subjected
195 # to to the same restriction
196 self.assertRaises(ext_rbac.RbacPolicyInUse,
197 self.plugin.delete_rbac_policy,
198 self.context, wild_policy['id'])
199
200 # similarly, we can't update the policy to a different tenant
201 update_policy = {'rbac_policy': {'target_tenant': 'bob'}}
202 self.assertRaises(ext_rbac.RbacPolicyInUse,
203 self.plugin.update_rbac_policy,
204 self.context, wild_policy['id'],
205 update_policy)
206
207 # after port anchor is gone, update and delete should pass
208 self.plugin.delete_port(self.context, port['id'])
209 self.plugin.update_rbac_policy(
210 self.context, wild_policy['id'], update_policy)
211 self.plugin.delete_rbac_policy(self.context, wild_policy['id'])
212
213 # check that policy is indeed gone
214 self.assertRaises(ext_rbac.RbacPolicyNotFound,
215 self.plugin.get_rbac_policy,
216 self.context, wild_policy['id'])
217
218 def test_delete_networkrbac_self_share(self):
219 net_id = 'my-network'
220 net_owner = 'my-tenant-id'
221 # NOTE(ralonsoh): keep "tenant_id" for compatibility purposes in
222 # NeutronDbPluginV2.validate_network_rbac_policy_change()
223 net = {'network': {'id': net_id,
224 'tenant_id': net_owner,
225 'project_id': net_owner}}
226 policy = self._make_networkrbac(net, net_owner)['rbac_policy']
227 kwargs = {}
228
229 with mock.patch.object(db_plugin_v2, '_get_network') as get_net,\
230 mock.patch.object(db_plugin_v2,
231 'ensure_no_tenant_ports_on_network') as ensure:
232 get_net.return_value = net['network']
233 self.plugin.validate_network_rbac_policy_change(
234 None, events.BEFORE_DELETE, None,
235 self.context, 'network', policy, **kwargs)
236 self.assertEqual(0, ensure.call_count)
237
238 def test_update_self_share_networkrbac(self):
239 net_id = 'my-network'
240 net_owner = 'my-tenant-id'
241 # NOTE(ralonsoh): keep "tenant_id" for compatibility purposes in
242 # NeutronDbPluginV2.validate_network_rbac_policy_change()
243 net = {'network': {'id': net_id,
244 'tenant_id': net_owner,
245 'project_id': net_owner}}
246 policy = self._make_networkrbac(net, net_owner)['rbac_policy']
247 kwargs = {'policy_update': {'target_tenant': 'new-target-tenant'}}
248
249 with mock.patch.object(db_plugin_v2, '_get_network') as get_net,\
250 mock.patch.object(db_plugin_v2,
251 'ensure_no_tenant_ports_on_network') as ensure:
252 get_net.return_value = net['network']
253 self.plugin.validate_network_rbac_policy_change(
254 None, events.BEFORE_UPDATE, None,
255 self.context, 'network', policy, **kwargs)
256 self.assertEqual(0, ensure.call_count)
257
258 def _create_rbac_obj(self, _class):
259 return _class(id=uuidutils.generate_uuid(),
260 project_id='project_id',
261 object_id=uuidutils.generate_uuid(),
262 target_tenant='target_tenant',
263 action=rbac_db_models.ACCESS_SHARED)
264
265 @mock.patch.object(qos_policy_obj.QosPolicyRBAC, 'get_objects')
266 def test_get_rbac_policies_qos_policy(self, mock_qos_get_objects):
267 qos_policy_rbac = self._create_rbac_obj(qos_policy_obj.QosPolicyRBAC)
268 mock_qos_get_objects.return_value = [qos_policy_rbac]
269 filters = {'object_type': ['qos_policy']}
270 rbac_policies = self.plugin.get_rbac_policies(self.context, filters)
271 self.assertEqual(1, len(rbac_policies))
272 self.assertEqual(self.plugin._make_rbac_policy_dict(qos_policy_rbac),
273 rbac_policies[0])
274
275 @mock.patch.object(network_obj.NetworkRBAC, 'get_objects')
276 def test_get_rbac_policies_network(self, mock_net_get_objects):
277 net_rbac = self._create_rbac_obj(network_obj.NetworkRBAC)
278 mock_net_get_objects.return_value = [net_rbac]
279 filters = {'object_type': ['network']}
280 rbac_policies = self.plugin.get_rbac_policies(self.context, filters)
281 self.assertEqual(1, len(rbac_policies))
282 self.assertEqual(self.plugin._make_rbac_policy_dict(net_rbac),
283 rbac_policies[0])
284
285 @mock.patch.object(qos_policy_obj.QosPolicyRBAC, 'get_objects')
286 @mock.patch.object(network_obj.NetworkRBAC, 'get_objects')
287 def test_get_rbac_policies_all_classes(self, mock_net_get_objects,
288 mock_qos_get_objects):
289 net_rbac = self._create_rbac_obj(network_obj.NetworkRBAC)
290 qos_policy_rbac = self._create_rbac_obj(qos_policy_obj.QosPolicyRBAC)
291 mock_net_get_objects.return_value = [net_rbac]
292 mock_qos_get_objects.return_value = [qos_policy_rbac]
293 rbac_policies = self.plugin.get_rbac_policies(self.context)
294 self.assertEqual(2, len(rbac_policies))
295 rbac_policies = sorted(rbac_policies, key=lambda k: k['object_type'])
296 self.assertEqual(self.plugin._make_rbac_policy_dict(net_rbac),
297 rbac_policies[0])
298 self.assertEqual(self.plugin._make_rbac_policy_dict(qos_policy_rbac),
299 rbac_policies[1])