"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/protection/v3/test_registered_limits.py" (13 May 2020, 15187 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_registered_limits.py": 16.0.1_vs_17.0.0.

    1 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 #    not use this file except in compliance with the License. You may obtain
    3 #    a copy of the License at
    4 #
    5 #         http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 #    Unless required by applicable law or agreed to in writing, software
    8 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 #    License for the specific language governing permissions and limitations
   11 #    under the License.
   12 
   13 import uuid
   14 
   15 import http.client
   16 
   17 from keystone.common import provider_api
   18 import keystone.conf
   19 from keystone.tests.common import auth as common_auth
   20 from keystone.tests import unit
   21 from keystone.tests.unit import base_classes
   22 from keystone.tests.unit import ksfixtures
   23 
   24 CONF = keystone.conf.CONF
   25 PROVIDERS = provider_api.ProviderAPIs
   26 
   27 
   28 class _UserRegisteredLimitTests(object):
   29     """Common default functionality for all users except system admins."""
   30 
   31     def test_user_can_get_a_registered_limit(self):
   32         service = PROVIDERS.catalog_api.create_service(
   33             uuid.uuid4().hex, unit.new_service_ref()
   34         )
   35 
   36         registered_limit = unit.new_registered_limit_ref(
   37             service_id=service['id'], id=uuid.uuid4().hex
   38         )
   39         limits = PROVIDERS.unified_limit_api.create_registered_limits(
   40             [registered_limit]
   41         )
   42         limit_id = limits[0]['id']
   43 
   44         with self.test_client() as c:
   45             r = c.get(
   46                 '/v3/registered_limits/%s' % limit_id, headers=self.headers
   47             )
   48             self.assertEqual(limit_id, r.json['registered_limit']['id'])
   49 
   50     def test_user_can_list_registered_limits(self):
   51         service = PROVIDERS.catalog_api.create_service(
   52             uuid.uuid4().hex, unit.new_service_ref()
   53         )
   54 
   55         registered_limit = unit.new_registered_limit_ref(
   56             service_id=service['id'], id=uuid.uuid4().hex
   57         )
   58         limits = PROVIDERS.unified_limit_api.create_registered_limits(
   59             [registered_limit]
   60         )
   61         limit_id = limits[0]['id']
   62 
   63         with self.test_client() as c:
   64             r = c.get(
   65                 '/v3/registered_limits', headers=self.headers
   66             )
   67             self.assertTrue(len(r.json['registered_limits']) == 1)
   68             self.assertEqual(limit_id, r.json['registered_limits'][0]['id'])
   69 
   70     def test_user_cannot_create_registered_limits(self):
   71         service = PROVIDERS.catalog_api.create_service(
   72             uuid.uuid4().hex, unit.new_service_ref()
   73         )
   74 
   75         create = {
   76             'registered_limits': [
   77                 unit.new_registered_limit_ref(
   78                     service_id=service['id']
   79                 )
   80             ]
   81         }
   82 
   83         with self.test_client() as c:
   84             c.post(
   85                 '/v3/registered_limits', json=create, headers=self.headers,
   86                 expected_status_code=http.client.FORBIDDEN
   87             )
   88 
   89     def test_user_cannot_update_registered_limits(self):
   90         service = PROVIDERS.catalog_api.create_service(
   91             uuid.uuid4().hex, unit.new_service_ref()
   92         )
   93 
   94         registered_limit = unit.new_registered_limit_ref(
   95             service_id=service['id'], id=uuid.uuid4().hex
   96         )
   97         limits = PROVIDERS.unified_limit_api.create_registered_limits(
   98             [registered_limit]
   99         )
  100         limit_id = limits[0]['id']
  101 
  102         with self.test_client() as c:
  103             update = {
  104                 'registered_limit': {'default_limit': 5}
  105             }
  106 
  107             c.patch(
  108                 '/v3/registered_limits/%s' % limit_id, json=update,
  109                 headers=self.headers,
  110                 expected_status_code=http.client.FORBIDDEN
  111             )
  112 
  113     def test_user_cannot_delete_registered_limits(self):
  114         service = PROVIDERS.catalog_api.create_service(
  115             uuid.uuid4().hex, unit.new_service_ref()
  116         )
  117 
  118         registered_limit = unit.new_registered_limit_ref(
  119             service_id=service['id'], id=uuid.uuid4().hex
  120         )
  121         limits = PROVIDERS.unified_limit_api.create_registered_limits(
  122             [registered_limit]
  123         )
  124         limit_id = limits[0]['id']
  125 
  126         with self.test_client() as c:
  127             c.delete(
  128                 '/v3/registered_limits/%s' % limit_id, headers=self.headers,
  129                 expected_status_code=http.client.FORBIDDEN
  130             )
  131 
  132 
  133 class SystemReaderTests(base_classes.TestCaseWithBootstrap,
  134                         common_auth.AuthTestMixin,
  135                         _UserRegisteredLimitTests):
  136     def setUp(self):
  137         super(SystemReaderTests, self).setUp()
  138         self.loadapp()
  139         self.useFixture(ksfixtures.Policy(self.config_fixture))
  140         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  141 
  142         system_reader = unit.new_user_ref(
  143             domain_id=CONF.identity.default_domain_id
  144         )
  145         self.user_id = PROVIDERS.identity_api.create_user(
  146             system_reader
  147         )['id']
  148         PROVIDERS.assignment_api.create_system_grant_for_user(
  149             self.user_id, self.bootstrapper.reader_role_id
  150         )
  151 
  152         auth = self.build_authentication_request(
  153             user_id=self.user_id, password=system_reader['password'],
  154             system=True
  155         )
  156 
  157         # Grab a token using the persona we're testing and prepare headers
  158         # for requests we'll be making in the tests.
  159         with self.test_client() as c:
  160             r = c.post('/v3/auth/tokens', json=auth)
  161             self.token_id = r.headers['X-Subject-Token']
  162             self.headers = {'X-Auth-Token': self.token_id}
  163 
  164 
  165 class SystemMemberTests(base_classes.TestCaseWithBootstrap,
  166                         common_auth.AuthTestMixin,
  167                         _UserRegisteredLimitTests):
  168 
  169     def setUp(self):
  170         super(SystemMemberTests, self).setUp()
  171         self.loadapp()
  172         self.useFixture(ksfixtures.Policy(self.config_fixture))
  173         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  174 
  175         system_member = unit.new_user_ref(
  176             domain_id=CONF.identity.default_domain_id
  177         )
  178         self.user_id = PROVIDERS.identity_api.create_user(
  179             system_member
  180         )['id']
  181         PROVIDERS.assignment_api.create_system_grant_for_user(
  182             self.user_id, self.bootstrapper.member_role_id
  183         )
  184 
  185         auth = self.build_authentication_request(
  186             user_id=self.user_id, password=system_member['password'],
  187             system=True
  188         )
  189 
  190         # Grab a token using the persona we're testing and prepare headers
  191         # for requests we'll be making in the tests.
  192         with self.test_client() as c:
  193             r = c.post('/v3/auth/tokens', json=auth)
  194             self.token_id = r.headers['X-Subject-Token']
  195             self.headers = {'X-Auth-Token': self.token_id}
  196 
  197 
  198 class SystemAdminTests(base_classes.TestCaseWithBootstrap,
  199                        common_auth.AuthTestMixin):
  200 
  201     def setUp(self):
  202         super(SystemAdminTests, self).setUp()
  203         self.loadapp()
  204         self.useFixture(ksfixtures.Policy(self.config_fixture))
  205         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  206 
  207         # Reuse the system administrator account created during
  208         # ``keystone-manage bootstrap``
  209         self.user_id = self.bootstrapper.admin_user_id
  210         auth = self.build_authentication_request(
  211             user_id=self.user_id,
  212             password=self.bootstrapper.admin_password,
  213             system=True
  214         )
  215 
  216         # Grab a token using the persona we're testing and prepare headers
  217         # for requests we'll be making in the tests.
  218         with self.test_client() as c:
  219             r = c.post('/v3/auth/tokens', json=auth)
  220             self.token_id = r.headers['X-Subject-Token']
  221             self.headers = {'X-Auth-Token': self.token_id}
  222 
  223     def test_user_can_get_a_registered_limit(self):
  224         service = PROVIDERS.catalog_api.create_service(
  225             uuid.uuid4().hex, unit.new_service_ref()
  226         )
  227 
  228         registered_limit = unit.new_registered_limit_ref(
  229             service_id=service['id'], id=uuid.uuid4().hex
  230         )
  231         limits = PROVIDERS.unified_limit_api.create_registered_limits(
  232             [registered_limit]
  233         )
  234         limit_id = limits[0]['id']
  235 
  236         with self.test_client() as c:
  237             r = c.get(
  238                 '/v3/registered_limits/%s' % limit_id, headers=self.headers
  239             )
  240             self.assertEqual(limit_id, r.json['registered_limit']['id'])
  241 
  242     def test_user_can_list_registered_limits(self):
  243         service = PROVIDERS.catalog_api.create_service(
  244             uuid.uuid4().hex, unit.new_service_ref()
  245         )
  246 
  247         registered_limit = unit.new_registered_limit_ref(
  248             service_id=service['id'], id=uuid.uuid4().hex
  249         )
  250         limits = PROVIDERS.unified_limit_api.create_registered_limits(
  251             [registered_limit]
  252         )
  253         limit_id = limits[0]['id']
  254 
  255         with self.test_client() as c:
  256             r = c.get(
  257                 '/v3/registered_limits', headers=self.headers
  258             )
  259             self.assertTrue(len(r.json['registered_limits']) == 1)
  260             self.assertEqual(limit_id, r.json['registered_limits'][0]['id'])
  261 
  262     def test_user_can_create_registered_limits(self):
  263         service = PROVIDERS.catalog_api.create_service(
  264             uuid.uuid4().hex, unit.new_service_ref()
  265         )
  266 
  267         create = {
  268             'registered_limits': [
  269                 unit.new_registered_limit_ref(
  270                     service_id=service['id']
  271                 )
  272             ]
  273         }
  274 
  275         with self.test_client() as c:
  276             c.post('/v3/registered_limits', json=create, headers=self.headers)
  277 
  278     def test_user_can_update_registered_limits(self):
  279         service = PROVIDERS.catalog_api.create_service(
  280             uuid.uuid4().hex, unit.new_service_ref()
  281         )
  282 
  283         registered_limit = unit.new_registered_limit_ref(
  284             service_id=service['id'], id=uuid.uuid4().hex
  285         )
  286         limits = PROVIDERS.unified_limit_api.create_registered_limits(
  287             [registered_limit]
  288         )
  289         limit_id = limits[0]['id']
  290 
  291         with self.test_client() as c:
  292             update = {
  293                 'registered_limit': {'default_limit': 5}
  294             }
  295 
  296             c.patch(
  297                 '/v3/registered_limits/%s' % limit_id, json=update,
  298                 headers=self.headers
  299             )
  300 
  301     def test_user_can_delete_registered_limits(self):
  302         service = PROVIDERS.catalog_api.create_service(
  303             uuid.uuid4().hex, unit.new_service_ref()
  304         )
  305 
  306         registered_limit = unit.new_registered_limit_ref(
  307             service_id=service['id'], id=uuid.uuid4().hex
  308         )
  309         limits = PROVIDERS.unified_limit_api.create_registered_limits(
  310             [registered_limit]
  311         )
  312         limit_id = limits[0]['id']
  313 
  314         with self.test_client() as c:
  315             c.delete(
  316                 '/v3/registered_limits/%s' % limit_id, headers=self.headers
  317             )
  318 
  319 
  320 class DomainUserTests(base_classes.TestCaseWithBootstrap,
  321                       common_auth.AuthTestMixin,
  322                       _UserRegisteredLimitTests):
  323 
  324     def setUp(self):
  325         super(DomainUserTests, self).setUp()
  326         self.loadapp()
  327         self.useFixture(ksfixtures.Policy(self.config_fixture))
  328         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  329 
  330         domain = PROVIDERS.resource_api.create_domain(
  331             uuid.uuid4().hex, unit.new_domain_ref()
  332         )
  333         self.domain_id = domain['id']
  334         domain_admin = unit.new_user_ref(domain_id=self.domain_id)
  335         self.user_id = PROVIDERS.identity_api.create_user(domain_admin)['id']
  336         PROVIDERS.assignment_api.create_grant(
  337             self.bootstrapper.admin_role_id, user_id=self.user_id,
  338             domain_id=self.domain_id
  339         )
  340 
  341         auth = self.build_authentication_request(
  342             user_id=self.user_id,
  343             password=domain_admin['password'],
  344             domain_id=self.domain_id
  345         )
  346 
  347         # Grab a token using the persona we're testing and prepare headers
  348         # for requests we'll be making in the tests.
  349         with self.test_client() as c:
  350             r = c.post('/v3/auth/tokens', json=auth)
  351             self.token_id = r.headers['X-Subject-Token']
  352             self.headers = {'X-Auth-Token': self.token_id}
  353 
  354 
  355 class ProjectUserTests(base_classes.TestCaseWithBootstrap,
  356                        common_auth.AuthTestMixin,
  357                        _UserRegisteredLimitTests):
  358 
  359     def setUp(self):
  360         super(ProjectUserTests, self).setUp()
  361         self.loadapp()
  362         self.useFixture(ksfixtures.Policy(self.config_fixture))
  363         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  364 
  365         # Reuse the system administrator account created during
  366         # ``keystone-manage bootstrap``
  367         self.user_id = self.bootstrapper.admin_user_id
  368         auth = self.build_authentication_request(
  369             user_id=self.user_id,
  370             password=self.bootstrapper.admin_password,
  371             project_id=self.bootstrapper.project_id
  372         )
  373 
  374         # Grab a token using the persona we're testing and prepare headers
  375         # for requests we'll be making in the tests.
  376         with self.test_client() as c:
  377             r = c.post('/v3/auth/tokens', json=auth)
  378             self.token_id = r.headers['X-Subject-Token']
  379             self.headers = {'X-Auth-Token': self.token_id}
  380 
  381 
  382 class ProjectUserTestsWithoutEnforceScope(
  383         base_classes.TestCaseWithBootstrap,
  384         common_auth.AuthTestMixin,
  385         _UserRegisteredLimitTests):
  386 
  387     def setUp(self):
  388         super(ProjectUserTestsWithoutEnforceScope, self).setUp()
  389         self.loadapp()
  390         self.useFixture(ksfixtures.Policy(self.config_fixture))
  391 
  392         # Explicityly set enforce_scope to False to make sure we maintain
  393         # backwards compatibility with project users.
  394         self.config_fixture.config(group='oslo_policy', enforce_scope=False)
  395 
  396         domain = PROVIDERS.resource_api.create_domain(
  397             uuid.uuid4().hex, unit.new_domain_ref()
  398         )
  399         user = unit.new_user_ref(domain_id=domain['id'])
  400         self.user_id = PROVIDERS.identity_api.create_user(user)['id']
  401 
  402         self.project_id = PROVIDERS.resource_api.create_project(
  403             uuid.uuid4().hex, unit.new_project_ref(domain_id=domain['id'])
  404         )['id']
  405 
  406         PROVIDERS.assignment_api.create_grant(
  407             self.bootstrapper.member_role_id, user_id=self.user_id,
  408             project_id=self.project_id
  409         )
  410 
  411         auth = self.build_authentication_request(
  412             user_id=self.user_id,
  413             password=user['password'],
  414             project_id=self.project_id
  415         )
  416 
  417         # Grab a token using the persona we're testing and prepare headers
  418         # for requests we'll be making in the tests.
  419         with self.test_client() as c:
  420             r = c.post('/v3/auth/tokens', json=auth)
  421             self.token_id = r.headers['X-Subject-Token']
  422             self.headers = {'X-Auth-Token': self.token_id}