"Fossies" - the Fresh Open Source Software Archive

Member "neutron-14.0.3/neutron/tests/unit/db/test_rbac_db_mixin.py" (22 Oct 2019, 14899 Bytes) of package /linux/misc/openstack/neutron-14.0.3.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_rbac_db_mixin.py": 14.0.2_vs_14.0.3.

    1 # Copyright (c) 2016 OpenStack Foundation.
    2 # All Rights Reserved.
    3 #
    4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 #    not use this file except in compliance with the License. You may obtain
    6 #    a copy of the License at
    7 #
    8 #         http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 #    Unless required by applicable law or agreed to in writing, software
   11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 #    License for the specific language governing permissions and limitations
   14 #    under the License.
   15 
   16 
   17 import mock
   18 from neutron_lib.callbacks import events
   19 from neutron_lib import constants
   20 from neutron_lib import context
   21 from oslo_utils import uuidutils
   22 import testtools
   23 
   24 from neutron.db.db_base_plugin_v2 import NeutronDbPluginV2 as db_plugin_v2
   25 from neutron.db import rbac_db_models
   26 from neutron.extensions import rbac as ext_rbac
   27 from neutron.objects import network as network_obj
   28 from neutron.objects.qos import policy as qos_policy_obj
   29 from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin
   30 
   31 
   32 class NetworkRbacTestcase(test_plugin.NeutronDbPluginV2TestCase):
   33     def setUp(self):
   34         self.context = context.get_admin_context()
   35         super(NetworkRbacTestcase, self).setUp(plugin='ml2')
   36 
   37     def _make_networkrbac(self, network, target, action='access_as_shared'):
   38         policy = {
   39             'rbac_policy': {'project_id': network['network']['project_id'],
   40                             'object_id': network['network']['id'],
   41                             'object_type': 'network',
   42                             'action': action,
   43                             'target_tenant': target}}
   44         return policy
   45 
   46     def _setup_networkrbac_and_port(self, network, target_tenant):
   47         policy = self._make_networkrbac(network, target_tenant)
   48         netrbac = self.plugin.create_rbac_policy(self.context, policy)
   49 
   50         test_port = {'port': {'name': 'test-port',
   51                               'network_id': network['network']['id'],
   52                               'mac_address': constants.ATTR_NOT_SPECIFIED,
   53                               'fixed_ips': constants.ATTR_NOT_SPECIFIED,
   54                               'admin_state_up': True,
   55                               'device_id': 'device_id',
   56                               'device_owner': 'device_owner',
   57                               'project_id': target_tenant,
   58                               'tenant_id': target_tenant}}
   59 
   60         port = self.plugin.create_port(self.context, test_port)
   61         return netrbac, port
   62 
   63     def _assert_external_net_state(self, net_id, is_external):
   64         req = self.new_show_request('networks', net_id)
   65         res = self.deserialize(self.fmt, req.get_response(self.api))
   66         self.assertEqual(is_external, res['network']['router:external'])
   67 
   68     def test_create_network_rbac_external(self):
   69         with self.network() as ext_net:
   70             net_id = ext_net['network']['id']
   71             self._assert_external_net_state(net_id, is_external=False)
   72             policy = self._make_networkrbac(ext_net,
   73                                             '*',
   74                                             'access_as_external')
   75             self.plugin.create_rbac_policy(self.context, policy)
   76             self._assert_external_net_state(net_id, is_external=True)
   77 
   78     def test_create_network_rbac_shared_existing(self):
   79         tenant = 'test-tenant'
   80         with self.network() as net:
   81             policy = self._make_networkrbac(net,
   82                                             tenant,
   83                                             rbac_db_models.ACCESS_SHARED)
   84             self.plugin.create_rbac_policy(self.context, policy)
   85             # Give server maximum of 10 seconds to make sure we don't hit DB
   86             # retry mechanism when resource already exists
   87             with self.assert_max_execution_time(10):
   88                 with testtools.ExpectedException(
   89                         ext_rbac.DuplicateRbacPolicy):
   90                     self.plugin.create_rbac_policy(self.context, policy)
   91 
   92     def test_update_network_rbac_external_valid(self):
   93         orig_target = 'test-tenant-2'
   94         new_target = 'test-tenant-3'
   95 
   96         with self.network() as ext_net:
   97             policy = self._make_networkrbac(ext_net,
   98                                             orig_target,
   99                                             'access_as_external')
  100             netrbac = self.plugin.create_rbac_policy(self.context, policy)
  101             update_policy = {'rbac_policy': {'target_tenant': new_target}}
  102 
  103             netrbac2 = self.plugin.update_rbac_policy(self.context,
  104                                                       netrbac['id'],
  105                                                       update_policy)
  106 
  107             policy['rbac_policy']['target_tenant'] = new_target
  108             for k, v in policy['rbac_policy'].items():
  109                 self.assertEqual(netrbac2[k], v)
  110 
  111     def test_delete_network_rbac_external(self):
  112         with self.network() as ext_net:
  113             net_id = ext_net['network']['id']
  114             self._assert_external_net_state(net_id, is_external=False)
  115             policy = self._make_networkrbac(ext_net,
  116                                             '*',
  117                                             'access_as_external')
  118             net_rbac = self.plugin.create_rbac_policy(self.context, policy)
  119             self._assert_external_net_state(net_id, is_external=True)
  120             self.plugin.delete_rbac_policy(self.context, net_rbac['id'])
  121             self._assert_external_net_state(net_id, is_external=False)
  122 
  123     def test_delete_network_rbac_external_with_multi_rbac_policy(self):
  124         with self.network() as ext_net:
  125             net_id = ext_net['network']['id']
  126             self._assert_external_net_state(net_id, is_external=False)
  127             policy1 = self._make_networkrbac(ext_net,
  128                                              'test-tenant-1',
  129                                              'access_as_external')
  130             net_rbac1 = self.plugin.create_rbac_policy(self.context, policy1)
  131             self._assert_external_net_state(net_id, is_external=True)
  132             policy2 = self._make_networkrbac(ext_net,
  133                                              'test-tenant-2',
  134                                              'access_as_external')
  135             self.plugin.create_rbac_policy(self.context, policy2)
  136             self._assert_external_net_state(net_id, is_external=True)
  137             self.plugin.delete_rbac_policy(self.context, net_rbac1['id'])
  138             self._assert_external_net_state(net_id, is_external=True)
  139 
  140     def test_delete_external_network_shared_rbac(self):
  141         with self.network() as ext_net:
  142             net_id = ext_net['network']['id']
  143             self.plugin.update_network(
  144                 self.context, net_id,
  145                 {'network': {'router:external': True}})
  146             self._assert_external_net_state(net_id, is_external=True)
  147             policy = self._make_networkrbac(ext_net, 'test-tenant-2')
  148             net_rbac = self.plugin.create_rbac_policy(self.context, policy)
  149             self.plugin.delete_rbac_policy(self.context, net_rbac['id'])
  150             # Make sure that external attribute not changed.
  151             self._assert_external_net_state(net_id, is_external=True)
  152 
  153     def test_update_networkrbac_valid(self):
  154         orig_target = 'test-tenant-2'
  155         new_target = 'test-tenant-3'
  156 
  157         with self.network() as net:
  158             policy = self._make_networkrbac(net, orig_target)
  159             netrbac = self.plugin.create_rbac_policy(self.context, policy)
  160             update_policy = {'rbac_policy': {'target_tenant': new_target}}
  161 
  162             netrbac2 = self.plugin.update_rbac_policy(self.context,
  163                                                       netrbac['id'],
  164                                                       update_policy)
  165 
  166             policy['rbac_policy']['target_tenant'] = new_target
  167             for k, v in policy['rbac_policy'].items():
  168                 self.assertEqual(netrbac2[k], v)
  169 
  170     def test_delete_networkrbac_in_use_fail(self):
  171         with self.network() as net:
  172             netrbac, _ = self._setup_networkrbac_and_port(
  173                 network=net, target_tenant='test-tenant-2')
  174 
  175             self.assertRaises(ext_rbac.RbacPolicyInUse,
  176                               self.plugin.delete_rbac_policy,
  177                               self.context, netrbac['id'])
  178 
  179     def test_port_presence_prevents_network_rbac_policy_deletion(self):
  180         with self.network() as net:
  181             netrbac, port = self._setup_networkrbac_and_port(
  182                 network=net, target_tenant='alice')
  183             self.assertRaises(ext_rbac.RbacPolicyInUse,
  184                               self.plugin.delete_rbac_policy,
  185                               self.context, netrbac['id'])
  186 
  187             # a wildcard policy should allow the specific policy to be deleted
  188             # since it allows the remaining port
  189             wild_policy = self._make_networkrbac(net, '*')
  190             wild_policy = self.plugin.create_rbac_policy(self.context,
  191                                                          wild_policy)
  192             self.plugin.delete_rbac_policy(self.context, netrbac['id'])
  193 
  194             # now that wildcard is the only remaining, it should be subjected
  195             # to to the same restriction
  196             self.assertRaises(ext_rbac.RbacPolicyInUse,
  197                               self.plugin.delete_rbac_policy,
  198                               self.context, wild_policy['id'])
  199 
  200             # similarly, we can't update the policy to a different tenant
  201             update_policy = {'rbac_policy': {'target_tenant': 'bob'}}
  202             self.assertRaises(ext_rbac.RbacPolicyInUse,
  203                               self.plugin.update_rbac_policy,
  204                               self.context, wild_policy['id'],
  205                               update_policy)
  206 
  207             # after port anchor is gone, update and delete should pass
  208             self.plugin.delete_port(self.context, port['id'])
  209             self.plugin.update_rbac_policy(
  210                 self.context, wild_policy['id'], update_policy)
  211             self.plugin.delete_rbac_policy(self.context, wild_policy['id'])
  212 
  213             # check that policy is indeed gone
  214             self.assertRaises(ext_rbac.RbacPolicyNotFound,
  215                               self.plugin.get_rbac_policy,
  216                               self.context, wild_policy['id'])
  217 
  218     def test_delete_networkrbac_self_share(self):
  219         net_id = 'my-network'
  220         net_owner = 'my-tenant-id'
  221         # NOTE(ralonsoh): keep "tenant_id" for compatibility purposes in
  222         # NeutronDbPluginV2.validate_network_rbac_policy_change()
  223         net = {'network': {'id': net_id,
  224                            'tenant_id': net_owner,
  225                            'project_id': net_owner}}
  226         policy = self._make_networkrbac(net, net_owner)['rbac_policy']
  227         kwargs = {}
  228 
  229         with mock.patch.object(db_plugin_v2, '_get_network') as get_net,\
  230             mock.patch.object(db_plugin_v2,
  231                               'ensure_no_tenant_ports_on_network') as ensure:
  232             get_net.return_value = net['network']
  233             self.plugin.validate_network_rbac_policy_change(
  234                 None, events.BEFORE_DELETE, None,
  235                 self.context, 'network', policy, **kwargs)
  236             self.assertEqual(0, ensure.call_count)
  237 
  238     def test_update_self_share_networkrbac(self):
  239         net_id = 'my-network'
  240         net_owner = 'my-tenant-id'
  241         # NOTE(ralonsoh): keep "tenant_id" for compatibility purposes in
  242         # NeutronDbPluginV2.validate_network_rbac_policy_change()
  243         net = {'network': {'id': net_id,
  244                            'tenant_id': net_owner,
  245                            'project_id': net_owner}}
  246         policy = self._make_networkrbac(net, net_owner)['rbac_policy']
  247         kwargs = {'policy_update': {'target_tenant': 'new-target-tenant'}}
  248 
  249         with mock.patch.object(db_plugin_v2, '_get_network') as get_net,\
  250             mock.patch.object(db_plugin_v2,
  251                               'ensure_no_tenant_ports_on_network') as ensure:
  252             get_net.return_value = net['network']
  253             self.plugin.validate_network_rbac_policy_change(
  254                 None, events.BEFORE_UPDATE, None,
  255                 self.context, 'network', policy, **kwargs)
  256             self.assertEqual(0, ensure.call_count)
  257 
  258     def _create_rbac_obj(self, _class):
  259         return _class(id=uuidutils.generate_uuid(),
  260                       project_id='project_id',
  261                       object_id=uuidutils.generate_uuid(),
  262                       target_tenant='target_tenant',
  263                       action=rbac_db_models.ACCESS_SHARED)
  264 
  265     @mock.patch.object(qos_policy_obj.QosPolicyRBAC, 'get_objects')
  266     def test_get_rbac_policies_qos_policy(self, mock_qos_get_objects):
  267         qos_policy_rbac = self._create_rbac_obj(qos_policy_obj.QosPolicyRBAC)
  268         mock_qos_get_objects.return_value = [qos_policy_rbac]
  269         filters = {'object_type': ['qos_policy']}
  270         rbac_policies = self.plugin.get_rbac_policies(self.context, filters)
  271         self.assertEqual(1, len(rbac_policies))
  272         self.assertEqual(self.plugin._make_rbac_policy_dict(qos_policy_rbac),
  273                          rbac_policies[0])
  274 
  275     @mock.patch.object(network_obj.NetworkRBAC, 'get_objects')
  276     def test_get_rbac_policies_network(self, mock_net_get_objects):
  277         net_rbac = self._create_rbac_obj(network_obj.NetworkRBAC)
  278         mock_net_get_objects.return_value = [net_rbac]
  279         filters = {'object_type': ['network']}
  280         rbac_policies = self.plugin.get_rbac_policies(self.context, filters)
  281         self.assertEqual(1, len(rbac_policies))
  282         self.assertEqual(self.plugin._make_rbac_policy_dict(net_rbac),
  283                          rbac_policies[0])
  284 
  285     @mock.patch.object(qos_policy_obj.QosPolicyRBAC, 'get_objects')
  286     @mock.patch.object(network_obj.NetworkRBAC, 'get_objects')
  287     def test_get_rbac_policies_all_classes(self, mock_net_get_objects,
  288                                            mock_qos_get_objects):
  289         net_rbac = self._create_rbac_obj(network_obj.NetworkRBAC)
  290         qos_policy_rbac = self._create_rbac_obj(qos_policy_obj.QosPolicyRBAC)
  291         mock_net_get_objects.return_value = [net_rbac]
  292         mock_qos_get_objects.return_value = [qos_policy_rbac]
  293         rbac_policies = self.plugin.get_rbac_policies(self.context)
  294         self.assertEqual(2, len(rbac_policies))
  295         rbac_policies = sorted(rbac_policies, key=lambda k: k['object_type'])
  296         self.assertEqual(self.plugin._make_rbac_policy_dict(net_rbac),
  297                          rbac_policies[0])
  298         self.assertEqual(self.plugin._make_rbac_policy_dict(qos_policy_rbac),
  299                          rbac_policies[1])