"Fossies" - the Fresh Open Source Software Archive

Member "barbican-12.0.0/barbican/tests/api/test_resources_policy.py" (14 Apr 2021, 59770 Bytes) of package /linux/misc/openstack/barbican-12.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_resources_policy.py": 11.0.0_vs_12.0.0.

    1 # Copyright (c) 2013-2014 Rackspace, Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 """
   16 This test module focuses on RBAC interactions with the API resource classes.
   17 For typical-flow business logic tests of these classes, see the
   18 'resources_test.py' module.
   19 """
   20 import os
   21 from unittest import mock
   22 
   23 from webob import exc
   24 
   25 from barbican.api.controllers import consumers
   26 from barbican.api.controllers import containers
   27 from barbican.api.controllers import orders
   28 from barbican.api.controllers import secrets
   29 from barbican.api.controllers import secretstores
   30 from barbican.api.controllers import versions
   31 from barbican.common import accept as common_accept
   32 from barbican.common import config
   33 from barbican.common import policy
   34 from barbican import context
   35 from barbican.model import models
   36 from barbican.tests import utils
   37 
   38 
   39 # Point to the policy.yaml file located in source control.
   40 TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
   41                                             '../../../etc', 'barbican'))
   42 
   43 CONF = config.new_config()
   44 
   45 policy.init()
   46 ENFORCER = policy.ENFORCER
   47 
   48 
   49 class TestableResource(object):
   50 
   51     def __init__(self, *args, **kwargs):
   52         self.controller = self.controller_cls(*args, **kwargs)
   53 
   54     def on_get(self, req, resp, *args, **kwargs):
   55         with mock.patch('pecan.request', req):
   56             with mock.patch('pecan.response', resp):
   57                 return self.controller.on_get(*args, **kwargs)
   58 
   59     def on_post(self, req, resp, *args, **kwargs):
   60         with mock.patch('pecan.request', req):
   61             with mock.patch('pecan.response', resp):
   62                 return self.controller.on_post(*args, **kwargs)
   63 
   64     def on_put(self, req, resp, *args, **kwargs):
   65         with mock.patch('pecan.request', req):
   66             with mock.patch('pecan.response', resp):
   67                 return self.controller.on_put(*args, **kwargs)
   68 
   69     def on_delete(self, req, resp, *args, **kwargs):
   70         with mock.patch('pecan.request', req):
   71             with mock.patch('pecan.response', resp):
   72                 return self.controller.on_delete(*args, **kwargs)
   73 
   74 
   75 class VersionsResource(TestableResource):
   76     controller_cls = versions.VersionsController
   77 
   78 
   79 class SecretsResource(TestableResource):
   80     controller_cls = secrets.SecretsController
   81 
   82 
   83 class SecretResource(TestableResource):
   84     controller_cls = secrets.SecretController
   85 
   86 
   87 class OrdersResource(TestableResource):
   88     controller_cls = orders.OrdersController
   89 
   90 
   91 class OrderResource(TestableResource):
   92     controller_cls = orders.OrderController
   93 
   94 
   95 class ContainerResource(TestableResource):
   96     controller_cls = containers.ContainerController
   97 
   98 
   99 class ConsumersResource(TestableResource):
  100     controller_cls = consumers.ContainerConsumersController
  101 
  102 
  103 class ConsumerResource(TestableResource):
  104     controller_cls = consumers.ContainerConsumerController
  105 
  106 
  107 class SecretStoresResource(TestableResource):
  108     controller_cls = secretstores.SecretStoresController
  109 
  110 
  111 class SecretStoreResource(TestableResource):
  112     controller_cls = secretstores.SecretStoreController
  113 
  114 
  115 class PreferredSecretStoreResource(TestableResource):
  116     controller_cls = secretstores.PreferredSecretStoreController
  117 
  118 
  119 class SecretConsumersResource(TestableResource):
  120     controller_cls = consumers.SecretConsumersController
  121 
  122 
  123 class SecretConsumerResource(TestableResource):
  124     controller_cls = consumers.SecretConsumerController
  125 
  126 
  127 class BaseTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin):
  128 
  129     def setUp(self):
  130         super(BaseTestCase, self).setUp()
  131         CONF(args=['--config-dir', TEST_VAR_DIR])
  132         self.policy_enforcer = ENFORCER
  133         self.policy_enforcer.load_rules(True)
  134         self.resp = mock.MagicMock()
  135 
  136     def _generate_req(self, roles=None, accept=None, content_type=None,
  137                       user_id=None, project_id=None):
  138         """Generate a fake HTTP request with security context added to it."""
  139         req = mock.MagicMock()
  140         req.get_param.return_value = None
  141 
  142         kwargs = {
  143             'user_id': user_id,
  144             'project_id': project_id,
  145             'roles': roles or [],
  146             'policy_enforcer': self.policy_enforcer,
  147         }
  148         req.environ = {}
  149         req.environ['barbican.context'] = context.RequestContext(**kwargs)
  150         req.content_type = content_type
  151         req.accept = common_accept.create_accept_header(accept)
  152 
  153         return req
  154 
  155     def _generate_stream_for_exit(self):
  156         """Mock HTTP stream generator, to force RBAC-pass exit.
  157 
  158         Generate a fake HTTP request stream that forces an IOError to
  159         occur, which short circuits API resource processing when RBAC
  160         checks under test here pass.
  161         """
  162         stream = mock.MagicMock()
  163         read = mock.MagicMock(return_value=None, side_effect=IOError())
  164         stream.read = read
  165         return stream
  166 
  167     def _assert_post_rbac_exception(self, exception, role):
  168         """Assert that we received the expected RBAC-passed exception."""
  169         self.assertEqual(500, exception.status_int)
  170 
  171     def _generate_get_error(self):
  172         """Falcon exception generator to throw from early-exit mocks.
  173 
  174         Creates an exception that should be raised by GET tests that pass
  175         RBAC. This allows such flows to short-circuit normal post-RBAC
  176         processing that is not tested in this module.
  177 
  178         :return: Python exception that should be raised by repo get methods.
  179         """
  180         # The 'Read Error' clause needs to match that asserted in
  181         #    _assert_post_rbac_exception() above.
  182         return exc.HTTPServerError(detail='Read Error')
  183 
  184     def _assert_pass_rbac(self, roles, method_under_test, accept=None,
  185                           content_type=None, user_id=None, project_id=None):
  186         """Assert that RBAC authorization rules passed for the specified roles.
  187 
  188         :param roles: List of roles to check, one at a time
  189         :param method_under_test: The test method to invoke for each role.
  190         :param accept Optional Accept header to set on the HTTP request
  191         :return: None
  192         """
  193         for role in roles:
  194             self.req = self._generate_req(roles=[role] if role else [],
  195                                           accept=accept,
  196                                           content_type=content_type,
  197                                           user_id=user_id,
  198                                           project_id=project_id)
  199 
  200             # Force an exception early past the RBAC passing.
  201             type(self.req).body = mock.PropertyMock(side_effect=IOError)
  202             self.req.body_file = self._generate_stream_for_exit()
  203             exception = self.assertRaises(exc.HTTPServerError,
  204                                           method_under_test)
  205             self._assert_post_rbac_exception(exception, role)
  206 
  207     def _assert_fail_rbac(self, roles, method_under_test, accept=None,
  208                           content_type=None, user_id=None, project_id=None):
  209         """Assert that RBAC rules failed for one of the specified roles.
  210 
  211         :param roles: List of roles to check, one at a time
  212         :param method_under_test: The test method to invoke for each role.
  213         :param accept Optional Accept header to set on the HTTP request
  214         :return: None
  215         """
  216         for role in roles:
  217             self.req = self._generate_req(roles=[role] if role else [],
  218                                           accept=accept,
  219                                           content_type=content_type,
  220                                           user_id=user_id,
  221                                           project_id=project_id)
  222 
  223             exception = self.assertRaises(exc.HTTPForbidden, method_under_test)
  224             self.assertEqual(403, exception.status_int)
  225 
  226 
  227 class WhenTestingVersionsResource(BaseTestCase):
  228     """RBAC tests for the barbican.api.resources.VersionsResource class."""
  229     def setUp(self):
  230         super(WhenTestingVersionsResource, self).setUp()
  231 
  232         self.resource = VersionsResource()
  233 
  234     def test_rules_should_be_loaded(self):
  235         self.assertIsNotNone(self.policy_enforcer.rules)
  236 
  237     def test_should_pass_get_versions(self):
  238         # Can't use base method that short circuits post-RBAC processing here,
  239         # as version GET is trivial
  240         for role in ['admin', 'observer', 'creator', 'audit']:
  241             self.req = self._generate_req(roles=[role] if role else [])
  242             self._invoke_on_get()
  243 
  244     def test_should_pass_get_versions_with_bad_roles(self):
  245         self.req = self._generate_req(roles=[None, 'bunkrolehere'])
  246         self._invoke_on_get()
  247 
  248     def test_should_pass_get_versions_with_no_roles(self):
  249         self.req = self._generate_req()
  250         self._invoke_on_get()
  251 
  252     def test_should_pass_get_versions_multiple_roles(self):
  253         self.req = self._generate_req(roles=['admin', 'observer', 'creator',
  254                                              'audit'])
  255         self._invoke_on_get()
  256 
  257     def _invoke_on_get(self):
  258         self.resource.on_get(self.req, self.resp)
  259 
  260 
  261 class WhenTestingSecretsResource(BaseTestCase):
  262     """RBAC tests for the barbican.api.resources.SecretsResource class."""
  263     def setUp(self):
  264         super(WhenTestingSecretsResource, self).setUp()
  265 
  266         self.external_project_id = '12345'
  267 
  268         # Force an error on GET calls that pass RBAC, as we are not testing
  269         #   such flows in this test module.
  270         self.secret_repo = mock.MagicMock()
  271         get_by_create_date = mock.MagicMock(return_value=None,
  272                                             side_effect=self
  273                                             ._generate_get_error())
  274         self.secret_repo.get_by_create_date = get_by_create_date
  275         self.setup_secret_repository_mock(self.secret_repo)
  276 
  277         self.setup_encrypted_datum_repository_mock()
  278         self.setup_kek_datum_repository_mock()
  279         self.setup_project_repository_mock()
  280         self.setup_secret_meta_repository_mock()
  281         self.setup_transport_key_repository_mock()
  282 
  283         self.resource = SecretsResource()
  284 
  285     def test_rules_should_be_loaded(self):
  286         self.assertIsNotNone(self.policy_enforcer.rules)
  287 
  288     def test_should_pass_create_secret(self):
  289         self._assert_pass_rbac(['admin', 'creator'], self._invoke_on_post,
  290                                content_type='application/json')
  291 
  292     def test_should_raise_create_secret(self):
  293         self._assert_fail_rbac([None, 'audit', 'observer', 'bogus'],
  294                                self._invoke_on_post,
  295                                content_type='application/json')
  296 
  297     def test_should_pass_get_secrets(self):
  298         self._assert_pass_rbac(['admin', 'observer', 'creator'],
  299                                self._invoke_on_get,
  300                                content_type='application/json')
  301 
  302     def test_should_raise_get_secrets(self):
  303         self._assert_fail_rbac([None, 'audit', 'bogus'],
  304                                self._invoke_on_get,
  305                                content_type='application/json')
  306 
  307     def _invoke_on_post(self):
  308         self.resource.on_post(self.req, self.resp)
  309 
  310     def _invoke_on_get(self):
  311         self.resource.on_get(self.req, self.resp)
  312 
  313 
  314 class WhenTestingSecretResource(BaseTestCase):
  315     """RBAC tests for SecretController class."""
  316 
  317     def setUp(self):
  318         super(WhenTestingSecretResource, self).setUp()
  319 
  320         self.external_project_id = '12345project'
  321         self.secret_id = '12345secret'
  322         self.user_id = '123456user'
  323         self.creator_user_id = '123456CreatorUser'
  324 
  325         # Force an error on GET and DELETE calls that pass RBAC,
  326         #   as we are not testing such flows in this test module.
  327         self.secret_repo = mock.MagicMock()
  328         fail_method = mock.MagicMock(return_value=None,
  329                                      side_effect=self._generate_get_error())
  330         self.secret_repo.get = fail_method
  331         self.secret_repo.delete_entity_by_id = fail_method
  332         self.setup_secret_repository_mock(self.secret_repo)
  333 
  334         self.setup_encrypted_datum_repository_mock()
  335         self.setup_kek_datum_repository_mock()
  336         self.setup_project_repository_mock()
  337         self.setup_secret_meta_repository_mock()
  338         self.setup_transport_key_repository_mock()
  339 
  340         acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
  341                                     project_access=True,
  342                                     user_ids=[self.user_id, 'anyRandomId'])
  343         self.acl_list = [acl_read]
  344         secret = mock.MagicMock()
  345         secret.secret_acls.__iter__.return_value = self.acl_list
  346         secret.project.external_id = self.external_project_id
  347         secret.creator_id = self.creator_user_id
  348 
  349         self.resource = SecretResource(secret)
  350 
  351         # self.resource.controller.get_acl_tuple = mock.MagicMock(
  352         #    return_value=(None, None))
  353 
  354     def test_rules_should_be_loaded(self):
  355         self.assertIsNotNone(self.policy_enforcer.rules)
  356 
  357     def test_should_pass_decrypt_secret(self):
  358         self._assert_pass_rbac(['admin', 'observer', 'creator'],
  359                                self._invoke_on_get,
  360                                accept='notjsonaccepttype',
  361                                content_type='application/json',
  362                                user_id=self.user_id,
  363                                project_id=self.external_project_id)
  364 
  365     def test_should_raise_decrypt_secret(self):
  366 
  367         self._assert_fail_rbac([None, 'audit', 'bogus'],
  368                                self._invoke_on_get,
  369                                accept='notjsonaccepttype')
  370 
  371     def test_should_pass_decrypt_secret_for_same_project_with_no_acl(self):
  372         """Token and secret project needs to be same in no ACL defined case."""
  373         self.acl_list.pop()  # remove read acl from default setup
  374         self._assert_pass_rbac(['admin', 'observer', 'creator'],
  375                                self._invoke_on_get,
  376                                accept='notjsonaccepttype',
  377                                content_type='application/json',
  378                                user_id=self.user_id,
  379                                project_id=self.external_project_id)
  380 
  381     def test_should_raise_decrypt_secret_with_project_access_disabled(self):
  382         """Should raise authz error as secret is marked private.
  383 
  384         As secret is private so project users should not be able to access
  385         the secret. Admin project user can still access it.
  386         """
  387         self.acl_list.pop()  # remove read acl from default setup
  388         acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
  389                                     project_access=False,
  390                                     user_ids=['anyRandomUserX', 'aclUser1'])
  391         self.acl_list.append(acl_read)
  392         self._assert_fail_rbac(['observer', 'creator', 'audit'],
  393                                self._invoke_on_get,
  394                                accept='notjsonaccepttype',
  395                                content_type='application/json',
  396                                user_id=self.user_id,
  397                                project_id=self.external_project_id)
  398 
  399     def test_pass_decrypt_secret_for_admin_user_project_access_disabled(self):
  400         """Should pass authz for admin role user as secret is marked private.
  401 
  402         Even when secret is private, admin user should still have access to
  403         the secret.
  404         """
  405         self.acl_list.pop()  # remove read acl from default setup
  406         acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
  407                                     project_access=False,
  408                                     user_ids=['anyRandomUserX', 'aclUser1'])
  409         self.acl_list.append(acl_read)
  410         self._assert_pass_rbac(['admin'],
  411                                self._invoke_on_get,
  412                                accept='notjsonaccepttype',
  413                                content_type='application/json',
  414                                user_id=self.user_id,
  415                                project_id=self.external_project_id)
  416 
  417     def test_should_raise_decrypt_secret_for_with_project_access_nolist(self):
  418         """Should raise authz error as secret is marked private.
  419 
  420         As secret is private so project users should not be able to access
  421         the secret.  This test passes user_ids as empty list, which is a
  422         valid and common case. Admin project user can still access it.
  423         """
  424         self.acl_list.pop()  # remove read acl from default setup
  425         acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
  426                                     project_access=False,
  427                                     user_ids=[])
  428         self.acl_list.append(acl_read)
  429         self._assert_fail_rbac(['observer', 'creator', 'audit'],
  430                                self._invoke_on_get,
  431                                accept='notjsonaccepttype',
  432                                content_type='application/json',
  433                                user_id=self.user_id,
  434                                project_id=self.external_project_id)
  435 
  436     def test_should_pass_decrypt_secret_private_enabled_with_read_acl(self):
  437         """Should pass authz as user has read acl for private secret.
  438 
  439         Even though secret is private, user with read acl should be able to
  440         access the secret.
  441         """
  442         self.acl_list.pop()  # remove read acl from default setup
  443         acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
  444                                     project_access=False,
  445                                     user_ids=['anyRandomUserX', 'aclUser1'])
  446         self.acl_list.append(acl_read)
  447         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
  448                                 'bogusRole'],
  449                                self._invoke_on_get,
  450                                accept='notjsonaccepttype',
  451                                content_type='application/json',
  452                                user_id='aclUser1',
  453                                project_id=self.external_project_id)
  454 
  455     def test_should_pass_decrypt_secret_different_user_valid_read_acl(self):
  456         self.acl_list.pop()  # remove read acl from default setup
  457         acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
  458                                     project_access=True,
  459                                     user_ids=['anyRandomUserX', 'aclUser1'])
  460         self.acl_list.append(acl_read)
  461         # token project_id is different from secret's project id but another
  462         # user (from different project) has read acl for secret so should pass
  463         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
  464                                 'bogusRole'],
  465                                self._invoke_on_get,
  466                                accept='notjsonaccepttype',
  467                                content_type='application/json',
  468                                user_id='aclUser1',
  469                                project_id='different_project_id')
  470 
  471     def test_should_raise_decrypt_secret_for_different_user_no_read_acl(self):
  472         self.acl_list.pop()  # remove read acl from default setup
  473         acl_read = models.SecretACL(secret_id=self.secret_id,
  474                                     operation='write',
  475                                     project_access=True,
  476                                     user_ids=['anyRandomUserX', 'aclUser1'])
  477         self.acl_list.append(acl_read)
  478         # token project_id is different from secret's project id but another
  479         # user (from different project) has read acl for secret so should pass
  480         self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'],
  481                                self._invoke_on_get,
  482                                accept='notjsonaccepttype',
  483                                content_type='application/json',
  484                                user_id='aclUser1',
  485                                project_id='different_project_id')
  486 
  487     def test_fail_decrypt_secret_for_creator_user_with_different_project(self):
  488         """Check for creator user rule for secret decrypt/get call.
  489 
  490         If token's user is creator of secret but its scoped to different
  491         project, then he/she is not allowed access to secret when project
  492         is marked private.
  493         """
  494         self.acl_list.pop()  # remove read acl from default setup
  495         acl_read = models.SecretACL(secret_id=self.secret_id,
  496                                     operation='write',
  497                                     project_access=True,
  498                                     user_ids=['anyRandomUserX', 'aclUser1'])
  499         self.acl_list.append(acl_read)
  500         self.resource.controller.secret.creator_id = 'creatorUserX'
  501         # token user is creator but scoped to project different from secret
  502         # project so don't allow decrypt secret call to creator of that secret
  503         self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit',
  504                                 'bogusRole'],
  505                                self._invoke_on_get,
  506                                accept='notjsonaccepttype',
  507                                content_type='application/json',
  508                                user_id='creatorUserX',
  509                                project_id='different_project_id')
  510 
  511     def test_should_pass_get_secret(self):
  512         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
  513                                self._invoke_on_get,
  514                                user_id=self.user_id,
  515                                project_id=self.external_project_id)
  516 
  517     def test_should_pass_get_secret_with_no_context(self):
  518         """In unauthenticated flow, get secret should work."""
  519         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
  520                                self._invoke_on_get_without_context,
  521                                user_id=self.user_id,
  522                                project_id=self.external_project_id)
  523 
  524     def test_should_raise_get_secret_for_different_project_no_acl(self):
  525         """Should raise error when secret and token's project is different."""
  526         self.acl_list.pop()  # remove read acl from default setup
  527         # token project_id is different from secret's project id so should fail
  528         self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'],
  529                                self._invoke_on_get,
  530                                user_id=self.user_id,
  531                                project_id='different_id')
  532 
  533     def test_should_pass_get_secret_for_same_project_but_different_user(self):
  534         # user id should not matter as long token and secret's project match
  535         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
  536                                self._invoke_on_get,
  537                                user_id='different_user_id',
  538                                project_id=self.external_project_id)
  539 
  540     def test_should_pass_get_secret_for_same_project_with_no_acl(self):
  541         self.acl_list.pop()  # remove read acl from default setup
  542         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
  543                                self._invoke_on_get,
  544                                user_id=self.user_id,
  545                                project_id=self.external_project_id)
  546 
  547     def test_should_raise_get_secret_for_with_project_access_disabled(self):
  548         """Should raise authz error as secret is marked private.
  549 
  550         As secret is private so project users should not be able to access
  551         the secret.
  552         """
  553         self.acl_list.pop()  # remove read acl from default setup
  554         acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
  555                                     project_access=False,
  556                                     user_ids=['anyRandomUserX', 'aclUser1'])
  557         self.acl_list.append(acl_read)
  558         self._assert_fail_rbac(['observer', 'creator', 'audit'],
  559                                self._invoke_on_get,
  560                                user_id=self.user_id,
  561                                project_id=self.external_project_id)
  562 
  563     def test_pass_get_secret_for_admin_user_with_project_access_disabled(self):
  564         """Should pass authz for admin user as secret is marked private.
  565 
  566         Even when secret is private, admin user should have access
  567         the secret.
  568         """
  569         self.acl_list.pop()  # remove read acl from default setup
  570         acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
  571                                     project_access=False,
  572                                     user_ids=['anyRandomUserX', 'aclUser1'])
  573         self.acl_list.append(acl_read)
  574         self._assert_pass_rbac(['admin'],
  575                                self._invoke_on_get,
  576                                user_id=self.user_id,
  577                                project_id=self.external_project_id)
  578 
  579     def test_should_pass_get_secret_for_private_enabled_with_read_acl(self):
  580         """Should pass authz as user has read acl for private secret.
  581 
  582         Even though secret is private, user with read acl should be able to
  583         access the secret.
  584         """
  585         self.acl_list.pop()  # remove read acl from default setup
  586         acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
  587                                     project_access=False,
  588                                     user_ids=['anyRandomUserX', 'aclUser1'])
  589         self.acl_list.append(acl_read)
  590         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
  591                                 'bogusRole'],
  592                                self._invoke_on_get,
  593                                user_id='aclUser1',
  594                                project_id=self.external_project_id)
  595 
  596     def test_should_pass_get_secret_different_user_with_valid_read_acl(self):
  597         """Should allow when read ACL is defined for a user.
  598 
  599         Secret's own project and token's project is different but read is
  600         allowed because of valid read ACL.
  601         """
  602         self.acl_list.pop()  # remove read acl from default setup
  603         acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
  604                                     project_access=True,
  605                                     user_ids=['anyRandomUserX', 'aclUser1'])
  606         self.acl_list.append(acl_read)
  607         # token project_id is different from secret's project id but another
  608         # user (from different project) has read acl for secret so should pass
  609         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
  610                                 'bogusRole'],
  611                                self._invoke_on_get,
  612                                user_id='aclUser1',
  613                                project_id='different_project_id')
  614 
  615     def test_should_raise_get_secret_for_different_user_with_no_read_acl(self):
  616         """Get secret fails when no read acl is defined.
  617 
  618         With different secret and token's project, read is not allowed without
  619         a read ACL.
  620         """
  621         self.acl_list.pop()  # remove read acl from default setup
  622         acl_read = models.SecretACL(secret_id=self.secret_id,
  623                                     operation='write',
  624                                     project_access=True,
  625                                     user_ids=['anyRandomUserX', 'aclUser1'])
  626         self.acl_list.append(acl_read)
  627         # token project_id is different from secret's project id but another
  628         # user (from different project) has read acl for secret so should pass
  629         self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'],
  630                                self._invoke_on_get,
  631                                user_id='aclUser1',
  632                                project_id='different_project_id')
  633 
  634     def test_fail_get_secret_for_creator_user_with_different_project(self):
  635         """Check for creator user rule for secret get call.
  636 
  637         If token's user is creator of secret but its scoped to different
  638         project, then he/she is not allowed access to secret when project
  639         is marked private.
  640         """
  641         self.acl_list.pop()  # remove read acl from default setup
  642         self.resource.controller.secret.creator_id = 'creatorUserX'
  643 
  644         self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit',
  645                                 'bogusRole'],
  646                                self._invoke_on_get,
  647                                user_id='creatorUserX',
  648                                project_id='different_project_id')
  649 
  650     def test_should_raise_get_secret(self):
  651         self._assert_fail_rbac([None, 'bogus'],
  652                                self._invoke_on_get)
  653 
  654     def test_should_pass_put_secret(self):
  655         self._assert_pass_rbac(['admin', 'creator'], self._invoke_on_put,
  656                                content_type="application/octet-stream",
  657                                user_id=self.user_id,
  658                                project_id=self.external_project_id)
  659 
  660     def test_should_raise_put_secret(self):
  661         self._assert_fail_rbac([None, 'audit', 'observer', 'bogus'],
  662                                self._invoke_on_put,
  663                                content_type="application/octet-stream")
  664 
  665     def test_should_pass_delete_secret(self):
  666         self._assert_pass_rbac(['admin'], self._invoke_on_delete,
  667                                user_id=self.user_id,
  668                                project_id=self.external_project_id)
  669 
  670     def test_should_raise_delete_secret(self):
  671         """A non-admin user cannot delete other user's secret.
  672 
  673         User id is different from initial user who has created the secret.
  674         """
  675         self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
  676                                self._invoke_on_delete,
  677                                user_id=self.user_id,
  678                                project_id=self.external_project_id)
  679 
  680     def test_should_pass_delete_secret_for_owner(self):
  681         """Non-admin user can delete his/her own secret
  682 
  683         Secret creator_id should match with token user to establish ownership.
  684         """
  685         self._assert_pass_rbac(['creator'], self._invoke_on_delete,
  686                                user_id=self.creator_user_id,
  687                                project_id=self.external_project_id)
  688 
  689     def _invoke_on_get(self):
  690         self.resource.on_get(self.req, self.resp)
  691 
  692     def _invoke_on_get_without_context(self):
  693         # Adding this to get code coverage around context check lines
  694         self.req.environ.pop('barbican.context')
  695         self.resource.on_get(self.req, self.resp,
  696                              self.external_project_id)
  697 
  698     def _invoke_on_put(self):
  699         self.resource.on_put(self.req, self.resp)
  700 
  701     def _invoke_on_delete(self):
  702         self.resource.on_delete(self.req, self.resp)
  703 
  704 
  705 class WhenTestingContainerResource(BaseTestCase):
  706     """RBAC tests for ContainerController class.
  707 
  708     Container controller tests are quite similar to SecretController as
  709     policy logic is same. Just adding them here to make sure logic related to
  710     acl gathering data works as expected.
  711     """
  712 
  713     def setUp(self):
  714         super(WhenTestingContainerResource, self).setUp()
  715 
  716         self.external_project_id = '12345project'
  717         self.container_id = '12345secret'
  718         self.user_id = '123456user'
  719         self.creator_user_id = '123456CreatorUser'
  720 
  721         # Force an error on GET and DELETE calls that pass RBAC,
  722         #   as we are not testing such flows in this test module.
  723         self.container_repo = mock.MagicMock()
  724         fail_method = mock.MagicMock(return_value=None,
  725                                      side_effect=self._generate_get_error())
  726         self.container_repo.get = fail_method
  727         self.container_repo.delete_entity_by_id = fail_method
  728 
  729         acl_read = models.ContainerACL(
  730             container_id=self.container_id, operation='read',
  731             project_access=True, user_ids=[self.user_id, 'anyRandomId'])
  732         self.acl_list = [acl_read]
  733         container = mock.MagicMock()
  734         container.to_dict_fields = mock.MagicMock(side_effect=IOError)
  735         container.id = self.container_id
  736         container.container_acls.__iter__.return_value = self.acl_list
  737         container.project.external_id = self.external_project_id
  738         container.creator_id = self.creator_user_id
  739 
  740         self.container_repo.get_container_by_id.return_value = container
  741 
  742         self.setup_container_repository_mock(self.container_repo)
  743 
  744         self.resource = ContainerResource(container)
  745 
  746     def test_rules_should_be_loaded(self):
  747         self.assertIsNotNone(self.policy_enforcer.rules)
  748 
  749     def test_should_pass_get_container(self):
  750         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
  751                                self._invoke_on_get,
  752                                user_id=self.user_id,
  753                                project_id=self.external_project_id)
  754 
  755     def test_should_pass_get_container_with_no_context(self):
  756         """In unauthenticated flow, get container should work."""
  757         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
  758                                self._invoke_on_get_without_context,
  759                                user_id=self.user_id,
  760                                project_id=self.external_project_id)
  761 
  762     def test_should_raise_get_container_for_different_project_no_acl(self):
  763         """Raise error when container and token's project is different."""
  764         self.acl_list.pop()  # remove read acl from default setup
  765         # token project_id is different from secret's project id so should fail
  766         self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'],
  767                                self._invoke_on_get,
  768                                user_id=self.user_id,
  769                                project_id='different_id')
  770 
  771     def test_should_pass_get_container_for_same_project_but_different_user(
  772             self):
  773         """Should pass if token and secret's project match.
  774 
  775         User id should not matter as long token and container's project match.
  776         """
  777         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
  778                                self._invoke_on_get,
  779                                user_id='different_user_id',
  780                                project_id=self.external_project_id)
  781 
  782     def test_should_pass_get_container_for_same_project_with_no_acl(self):
  783         self.acl_list.pop()  # remove read acl from default setup
  784         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
  785                                self._invoke_on_get,
  786                                user_id=self.user_id,
  787                                project_id=self.external_project_id)
  788 
  789     def test_should_raise_get_container_for_with_project_access_disabled(self):
  790         """Should raise authz error as container is marked private.
  791 
  792         As container is private so project users should not be able to access
  793         the secret (other than admin user).
  794         """
  795         self.acl_list.pop()  # remove read acl from default setup
  796         acl_read = models.ContainerACL(
  797             container_id=self.container_id, operation='read',
  798             project_access=False, user_ids=['anyRandomUserX', 'aclUser1'])
  799         self.acl_list.append(acl_read)
  800         self._assert_fail_rbac(['observer', 'creator', 'audit'],
  801                                self._invoke_on_get,
  802                                user_id=self.user_id,
  803                                project_id=self.external_project_id)
  804 
  805     def test_pass_get_container_for_admin_user_project_access_disabled(self):
  806         """Should pass authz for admin user when container is marked private.
  807 
  808         For private container, admin user should still be able to access
  809         the secret.
  810         """
  811         self.acl_list.pop()  # remove read acl from default setup
  812         acl_read = models.ContainerACL(
  813             container_id=self.container_id, operation='read',
  814             project_access=False, user_ids=['anyRandomUserX', 'aclUser1'])
  815         self.acl_list.append(acl_read)
  816         self._assert_pass_rbac(['admin'],
  817                                self._invoke_on_get,
  818                                user_id=self.user_id,
  819                                project_id=self.external_project_id)
  820 
  821     def test_should_pass_get_container_for_private_enabled_with_read_acl(self):
  822         """Should pass authz as user has read acl for private container.
  823 
  824         Even though container is private, user with read acl should be able to
  825         access the container.
  826         """
  827         self.acl_list.pop()  # remove read acl from default setup
  828         acl_read = models.ContainerACL(
  829             container_id=self.container_id, operation='read',
  830             project_access=False, user_ids=['anyRandomUserX', 'aclUser1'])
  831         self.acl_list.append(acl_read)
  832         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
  833                                 'bogusRole'],
  834                                self._invoke_on_get,
  835                                user_id='aclUser1',
  836                                project_id=self.external_project_id)
  837 
  838     def test_should_pass_get_container_different_user_with_valid_read_acl(
  839             self):
  840         """Should allow when read ACL is defined for a user.
  841 
  842         Container's own project and token's project is different but read is
  843         allowed because of valid read ACL. User can read regardless of what is
  844         token's project as it has necessary ACL.
  845         """
  846         self.acl_list.pop()  # remove read acl from default setup
  847         acl_read = models.ContainerACL(
  848             container_id=self.container_id, operation='read',
  849             project_access=True, user_ids=['anyRandomUserX', 'aclUser1'])
  850         self.acl_list.append(acl_read)
  851         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
  852                                 'bogusRole'],
  853                                self._invoke_on_get,
  854                                user_id='aclUser1',
  855                                project_id='different_project_id')
  856 
  857     def test_should_raise_get_container_for_different_user_with_no_read_acl(
  858             self):
  859         """Get secret fails when no read acl is defined.
  860 
  861         With different container and token's project, read is not allowed
  862         without a read ACL.
  863         """
  864         self.acl_list.pop()  # remove read acl from default setup
  865         acl_read = models.ContainerACL(
  866             container_id=self.container_id, operation='write',
  867             project_access=True, user_ids=['anyRandomUserX', 'aclUser1'])
  868         self.acl_list.append(acl_read)
  869         # token project_id is different from secret's project id but another
  870         # user (from different project) has read acl for secret so should pass
  871         self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'],
  872                                self._invoke_on_get,
  873                                user_id='aclUser1',
  874                                project_id='different_project_id')
  875 
  876     def test_fail_get_container_for_creator_user_different_project(self):
  877         """Check for creator user rule for container get call.
  878 
  879         If token's user is creator of container but its scoped to different
  880         project, then he/she is not allowed access to container when project
  881         is marked private.
  882         """
  883         self.acl_list.pop()  # remove read acl from default setup
  884         acl_read = models.ContainerACL(
  885             container_id=self.container_id, operation='read',
  886             project_access=False, user_ids=['anyRandomUserX', 'aclUser1'])
  887         self.acl_list.append(acl_read)
  888         self._assert_fail_rbac(['creator'],
  889                                self._invoke_on_get,
  890                                user_id=self.creator_user_id,
  891                                project_id='differet_project_id')
  892 
  893     def test_pass_get_container_for_creator_user_project_access_disabled(self):
  894         """Should pass authz for creator user when container is marked private.
  895 
  896         As container is private so user who created the container can still
  897         access it as long as user has 'creator' role in container project.
  898         """
  899         self.acl_list.pop()  # remove read acl from default setup
  900         acl_read = models.ContainerACL(
  901             container_id=self.container_id, operation='read',
  902             project_access=False, user_ids=['anyRandomUserX', 'aclUser1'])
  903         self.acl_list.append(acl_read)
  904         self._assert_pass_rbac(['creator'],
  905                                self._invoke_on_get,
  906                                user_id=self.creator_user_id,
  907                                project_id=self.external_project_id)
  908 
  909     def test_should_raise_get_container(self):
  910         self._assert_fail_rbac([None, 'bogus'],
  911                                self._invoke_on_get)
  912 
  913     def test_should_pass_delete_container(self):
  914         self._assert_pass_rbac(['admin'], self._invoke_on_delete,
  915                                user_id=self.user_id,
  916                                project_id=self.external_project_id)
  917 
  918     def test_should_raise_delete_container(self):
  919         """A non-admin user cannot delete other user's container.
  920 
  921         User id is different from initial user who has created the container.
  922         """
  923         self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
  924                                self._invoke_on_delete,
  925                                user_id=self.user_id,
  926                                project_id=self.external_project_id)
  927 
  928     def test_should_pass_delete_container_for_owner(self):
  929         """Non-admin user can delete his/her own container
  930 
  931         Container creator_id should match with token user to establish
  932         ownership.
  933         """
  934         self._assert_pass_rbac(['creator'], self._invoke_on_delete,
  935                                user_id=self.creator_user_id,
  936                                project_id=self.external_project_id)
  937 
  938     def _invoke_on_get(self):
  939         self.resource.on_get(self.req, self.resp)
  940 
  941     def _invoke_on_get_without_context(self):
  942         # Adding this to get code coverage around context check lines
  943         self.req.environ.pop('barbican.context')
  944         self.resource.on_get(self.req, self.resp)
  945 
  946     def _invoke_on_put(self):
  947         self.resource.on_put(self.req, self.resp)
  948 
  949     def _invoke_on_delete(self):
  950         self.resource.on_delete(self.req, self.resp)
  951 
  952 
  953 class WhenTestingOrdersResource(BaseTestCase):
  954     """RBAC tests for the barbican.api.resources.OrdersResource class."""
  955     def setUp(self):
  956         super(WhenTestingOrdersResource, self).setUp()
  957 
  958         self.external_project_id = '12345'
  959 
  960         # Force an error on GET calls that pass RBAC, as we are not testing
  961         #   such flows in this test module.
  962         self.order_repo = mock.MagicMock()
  963         get_by_create_date = mock.MagicMock(return_value=None,
  964                                             side_effect=self
  965                                             ._generate_get_error())
  966         self.order_repo.get_by_create_date = get_by_create_date
  967 
  968         self.setup_order_repository_mock(self.order_repo)
  969         self.setup_project_repository_mock()
  970 
  971         self.resource = OrdersResource(queue_resource=mock.MagicMock())
  972 
  973     def test_rules_should_be_loaded(self):
  974         self.assertIsNotNone(self.policy_enforcer.rules)
  975 
  976     def test_should_pass_create_order(self):
  977         self._assert_pass_rbac(['admin', 'creator'], self._invoke_on_post,
  978                                content_type='application/json')
  979 
  980     def test_should_raise_create_order(self):
  981         self._assert_fail_rbac([None, 'audit', 'observer', 'bogus'],
  982                                self._invoke_on_post)
  983 
  984     def test_should_pass_get_orders(self):
  985         self._assert_pass_rbac(['admin', 'observer', 'creator'],
  986                                self._invoke_on_get)
  987 
  988     def test_should_raise_get_orders(self):
  989         self._assert_fail_rbac([None, 'audit', 'bogus'],
  990                                self._invoke_on_get)
  991 
  992     def _invoke_on_post(self):
  993         self.resource.on_post(self.req, self.resp)
  994 
  995     def _invoke_on_get(self):
  996         self.resource.on_get(self.req, self.resp)
  997 
  998 
  999 class WhenTestingOrderResource(BaseTestCase):
 1000     """RBAC tests for the barbican.api.resources.OrderResource class."""
 1001     def setUp(self):
 1002         super(WhenTestingOrderResource, self).setUp()
 1003 
 1004         self.external_project_id = '12345project'
 1005         self.order_id = '12345order'
 1006 
 1007         # Force an error on GET and DELETE calls that pass RBAC,
 1008         #   as we are not testing such flows in this test module.
 1009         self.order_repo = mock.MagicMock()
 1010         fail_method = mock.MagicMock(return_value=None,
 1011                                      side_effect=self._generate_get_error())
 1012         self.order_repo.get = fail_method
 1013         self.order_repo.delete_entity_by_id = fail_method
 1014 
 1015         self.setup_order_repository_mock(self.order_repo)
 1016 
 1017         self.resource = OrderResource(self.order_id)
 1018 
 1019     def test_rules_should_be_loaded(self):
 1020         self.assertIsNotNone(self.policy_enforcer.rules)
 1021 
 1022     def test_should_pass_get_order(self):
 1023         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
 1024                                self._invoke_on_get)
 1025 
 1026     def test_should_raise_get_order(self):
 1027         self._assert_fail_rbac([None, 'bogus'],
 1028                                self._invoke_on_get)
 1029 
 1030     def test_should_pass_delete_order(self):
 1031         self._assert_pass_rbac(['admin'], self._invoke_on_delete)
 1032 
 1033     def test_should_raise_delete_order(self):
 1034         self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
 1035                                self._invoke_on_delete)
 1036 
 1037     def _invoke_on_get(self):
 1038         self.resource.on_get(self.req, self.resp)
 1039 
 1040     def _invoke_on_delete(self):
 1041         self.resource.on_delete(self.req, self.resp)
 1042 
 1043 
 1044 class WhenTestingConsumersResource(BaseTestCase):
 1045     """RBAC tests for the barbican.api.resources.ConsumersResource class."""
 1046     def setUp(self):
 1047         super(WhenTestingConsumersResource, self).setUp()
 1048 
 1049         self.external_project_id = '12345project'
 1050         self.container_id = '12345container'
 1051 
 1052         # Force an error on GET calls that pass RBAC, as we are not testing
 1053         #   such flows in this test module.
 1054         self.consumer_repo = mock.MagicMock()
 1055         get_by_container_id = mock.MagicMock(return_value=None,
 1056                                              side_effect=self
 1057                                              ._generate_get_error())
 1058         self.consumer_repo.get_by_container_id = get_by_container_id
 1059 
 1060         self.setup_project_repository_mock()
 1061         self.setup_container_consumer_repository_mock(self.consumer_repo)
 1062         self.setup_container_repository_mock()
 1063 
 1064         self.resource = ConsumersResource(container_id=self.container_id)
 1065 
 1066     def test_rules_should_be_loaded(self):
 1067         self.assertIsNotNone(self.policy_enforcer.rules)
 1068 
 1069     def test_should_pass_create_consumer(self):
 1070         self._assert_pass_rbac(['admin'], self._invoke_on_post,
 1071                                content_type='application/json')
 1072 
 1073     def test_should_raise_create_consumer(self):
 1074         self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
 1075                                self._invoke_on_post,
 1076                                content_type='application/json')
 1077 
 1078     def test_should_pass_delete_consumer(self):
 1079         self._assert_pass_rbac(['admin'], self._invoke_on_delete,
 1080                                content_type='application/json')
 1081 
 1082     def test_should_raise_delete_consumer(self):
 1083         self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
 1084                                self._invoke_on_delete)
 1085 
 1086     def test_should_pass_get_consumers(self):
 1087         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
 1088                                self._invoke_on_get,
 1089                                content_type='application/json')
 1090 
 1091     def test_should_raise_get_consumers(self):
 1092         self._assert_fail_rbac([None, 'bogus'],
 1093                                self._invoke_on_get,
 1094                                content_type='application/json')
 1095 
 1096     def _invoke_on_post(self):
 1097         self.resource.on_post(self.req, self.resp)
 1098 
 1099     def _invoke_on_delete(self):
 1100         self.resource.on_delete(self.req, self.resp)
 1101 
 1102     def _invoke_on_get(self):
 1103         self.resource.on_get(self.req, self.resp)
 1104 
 1105 
 1106 class WhenTestingConsumerResource(BaseTestCase):
 1107     """RBAC tests for the barbican.api.resources.ConsumerResource class."""
 1108     def setUp(self):
 1109         super(WhenTestingConsumerResource, self).setUp()
 1110 
 1111         self.external_project_id = '12345project'
 1112         self.consumer_id = '12345consumer'
 1113 
 1114         # Force an error on GET calls that pass RBAC, as we are not testing
 1115         #   such flows in this test module.
 1116         self.consumer_repo = mock.MagicMock()
 1117         fail_method = mock.MagicMock(return_value=None,
 1118                                      side_effect=self._generate_get_error())
 1119         self.consumer_repo.get = fail_method
 1120 
 1121         self.setup_project_repository_mock()
 1122         self.setup_container_consumer_repository_mock(self.consumer_repo)
 1123         self.resource = ConsumerResource(consumer_id=self.consumer_id)
 1124 
 1125     def test_rules_should_be_loaded(self):
 1126         self.assertIsNotNone(self.policy_enforcer.rules)
 1127 
 1128     def test_should_pass_get_consumer(self):
 1129         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
 1130                                self._invoke_on_get)
 1131 
 1132     def test_should_raise_get_consumer(self):
 1133         self._assert_fail_rbac([None, 'bogus'],
 1134                                self._invoke_on_get)
 1135 
 1136     def _invoke_on_get(self):
 1137         self.resource.on_get(self.req, self.resp)
 1138 
 1139 
 1140 class WhenTestingSecretStoresResource(BaseTestCase):
 1141     """RBAC tests for the barbican.api.resources.SecretStoresResource class."""
 1142     def setUp(self):
 1143         super(WhenTestingSecretStoresResource, self).setUp()
 1144 
 1145         self.external_project_id = '12345project'
 1146 
 1147         self.moc_enable_patcher = mock.patch(
 1148             'barbican.common.utils.is_multiple_backends_enabled')
 1149         enable_check_method = self.moc_enable_patcher.start()
 1150         enable_check_method.return_value = True
 1151         self.addCleanup(self.moc_enable_patcher.stop)
 1152 
 1153         # Force an error on GET calls that pass RBAC, as we are not testing
 1154         #   such flows in this test module.
 1155         self.project_repo = mock.MagicMock()
 1156         fail_method = mock.MagicMock(return_value=None,
 1157                                      side_effect=self._generate_get_error())
 1158         self.project_repo.find_by_external_project_id = fail_method
 1159         self.setup_project_repository_mock(self.project_repo)
 1160 
 1161         self.resource = SecretStoresResource()
 1162 
 1163     def test_rules_should_be_loaded(self):
 1164         self.assertIsNotNone(self.policy_enforcer.rules)
 1165 
 1166     def test_should_pass_get_all_secret_stores(self):
 1167         self._assert_pass_rbac(
 1168             ['admin', 'observer', 'audit', 'creator', 'reader'],
 1169             self._invoke_on_get)
 1170 
 1171     def test_should_pass_get_global_default(self):
 1172         self._assert_pass_rbac(
 1173             ['admin', 'observer', 'audit', 'creator', 'reader'],
 1174             self._invoke_get_global_default)
 1175 
 1176     def test_should_pass_get_preferred(self):
 1177         self._assert_pass_rbac(
 1178             ['admin', 'observer', 'audit', 'creator', 'reader'],
 1179             self._invoke_get_preferred)
 1180 
 1181     def _invoke_on_get(self):
 1182         self.resource.on_get(self.req, self.resp)
 1183 
 1184     def _invoke_get_global_default(self):
 1185         with mock.patch('pecan.request', self.req):
 1186             with mock.patch('pecan.response', self.resp):
 1187                 return self.resource.controller.get_global_default()
 1188 
 1189     def _invoke_get_preferred(self):
 1190         with mock.patch('pecan.request', self.req):
 1191             with mock.patch('pecan.response', self.resp):
 1192                 return self.resource.controller.get_preferred()
 1193 
 1194 
 1195 class WhenTestingSecretStoreResource(BaseTestCase):
 1196     """RBAC tests for the barbican.api.resources.SecretStoreResource class."""
 1197     def setUp(self):
 1198         super(WhenTestingSecretStoreResource, self).setUp()
 1199 
 1200         self.external_project_id = '12345project'
 1201         self.store_id = '123456SecretStoreId'
 1202 
 1203         self.moc_enable_patcher = mock.patch(
 1204             'barbican.common.utils.is_multiple_backends_enabled')
 1205         enable_check_method = self.moc_enable_patcher.start()
 1206         enable_check_method.return_value = True
 1207         self.addCleanup(self.moc_enable_patcher.stop)
 1208 
 1209         # Force an error on GET calls that pass RBAC, as we are not testing
 1210         #   such flows in this test module.
 1211 
 1212         self.project_repo = mock.MagicMock()
 1213         fail_method = mock.MagicMock(return_value=None,
 1214                                      side_effect=self._generate_get_error())
 1215 
 1216         self.project_repo.find_by_external_project_id = fail_method
 1217         self.setup_project_repository_mock(self.project_repo)
 1218 
 1219         secret_store_res = mock.MagicMock()
 1220         secret_store_res.to_dict_fields = mock.MagicMock(side_effect=IOError)
 1221         secret_store_res.id = self.store_id
 1222 
 1223         self.resource = SecretStoreResource(secret_store_res)
 1224 
 1225     def test_rules_should_be_loaded(self):
 1226         self.assertIsNotNone(self.policy_enforcer.rules)
 1227 
 1228     def test_should_pass_get_a_secret_store(self):
 1229         self._assert_pass_rbac(
 1230             ['admin', 'observer', 'audit', 'creator', 'reader'],
 1231             self._invoke_on_get)
 1232 
 1233     def _invoke_on_get(self):
 1234         self.resource.on_get(self.req, self.resp)
 1235 
 1236 
 1237 class WhenTestingPreferredSecretStoreResource(BaseTestCase):
 1238     """RBAC tests for barbican.api.resources.PreferredSecretStoreResource"""
 1239     def setUp(self):
 1240         super(WhenTestingPreferredSecretStoreResource, self).setUp()
 1241 
 1242         self.external_project_id = '12345project'
 1243         self.store_id = '123456SecretStoreId'
 1244 
 1245         self.moc_enable_patcher = mock.patch(
 1246             'barbican.common.utils.is_multiple_backends_enabled')
 1247         enable_check_method = self.moc_enable_patcher.start()
 1248         enable_check_method.return_value = True
 1249         self.addCleanup(self.moc_enable_patcher.stop)
 1250 
 1251         # Force an error on POST/DELETE calls that pass RBAC, as we are not
 1252         # testing such flows in this test module.
 1253         self.project_repo = mock.MagicMock()
 1254         fail_method = mock.MagicMock(return_value=None,
 1255                                      side_effect=self._generate_get_error())
 1256         self.project_repo.find_by_external_project_id = fail_method
 1257         self.setup_project_repository_mock(self.project_repo)
 1258 
 1259         self.resource = PreferredSecretStoreResource(mock.MagicMock())
 1260 
 1261     def test_rules_should_be_loaded(self):
 1262         self.assertIsNotNone(self.policy_enforcer.rules)
 1263 
 1264     def test_should_pass_set_preferred_secret_store(self):
 1265         self._assert_pass_rbac(['admin'],
 1266                                self._invoke_on_post)
 1267 
 1268     def test_should_raise_set_preferred_secret_store(self):
 1269         self._assert_fail_rbac(
 1270             [None, 'creator', 'observer', 'audit', 'reader'],
 1271             self._invoke_on_post)
 1272 
 1273     def _invoke_on_post(self):
 1274         self.resource.on_post(self.req, self.resp)
 1275 
 1276 
 1277 class WhenTestingSecretConsumersResource(BaseTestCase):
 1278     """RBAC tests for barbican.api.resources.SecretConsumersResource"""
 1279     def setUp(self):
 1280         super(WhenTestingSecretConsumersResource, self).setUp()
 1281 
 1282         self.external_project_id = '12345project'
 1283         self.secret_id = '12345secret'
 1284 
 1285         # Force an error on GET calls that pass RBAC, as we are not testing
 1286         #   such flows in this test module.
 1287         self.consumer_repo = mock.MagicMock()
 1288         get_by_secret_id = mock.MagicMock(return_value=None,
 1289                                           side_effect=self
 1290                                           ._generate_get_error())
 1291         self.consumer_repo.get_by_secret_id = get_by_secret_id
 1292 
 1293         self.setup_project_repository_mock()
 1294         self.setup_secret_consumer_repository_mock(self.consumer_repo)
 1295         self.setup_secret_repository_mock()
 1296 
 1297         self.resource = SecretConsumersResource(secret_id=self.secret_id)
 1298 
 1299     def test_rules_should_be_loaded(self):
 1300         self.assertIsNotNone(self.policy_enforcer.rules)
 1301 
 1302     def test_should_pass_create_consumer(self):
 1303         self._assert_pass_rbac(['admin'], self._invoke_on_post,
 1304                                content_type='application/json')
 1305 
 1306     def test_should_raise_create_consumer(self):
 1307         self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
 1308                                self._invoke_on_post,
 1309                                content_type='application/json')
 1310 
 1311     def test_should_pass_delete_consumer(self):
 1312         self._assert_pass_rbac(['admin'], self._invoke_on_delete,
 1313                                content_type='application/json')
 1314 
 1315     def test_should_raise_delete_consumer(self):
 1316         self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
 1317                                self._invoke_on_delete)
 1318 
 1319     def test_should_pass_get_consumers(self):
 1320         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
 1321                                self._invoke_on_get,
 1322                                content_type='application/json')
 1323 
 1324     def test_should_raise_get_consumers(self):
 1325         self._assert_fail_rbac([None, 'bogus'],
 1326                                self._invoke_on_get,
 1327                                content_type='application/json')
 1328 
 1329     def _invoke_on_post(self):
 1330         self.resource.on_post(self.req, self.resp)
 1331 
 1332     def _invoke_on_delete(self):
 1333         self.resource.on_delete(self.req, self.resp)
 1334 
 1335     def _invoke_on_get(self):
 1336         self.resource.on_get(self.req, self.resp)
 1337 
 1338 
 1339 class WhenTestingSecretConsumerResource(BaseTestCase):
 1340     """RBAC tests for barbican.api.resources.SecretConsumerResource"""
 1341     def setUp(self):
 1342         super(WhenTestingSecretConsumerResource, self).setUp()
 1343 
 1344         self.external_project_id = '12345project'
 1345         self.consumer_id = '12345consumer'
 1346 
 1347         # Force an error on GET calls that pass RBAC, as we are not testing
 1348         #   such flows in this test module.
 1349         self.consumer_repo = mock.MagicMock()
 1350         fail_method = mock.MagicMock(return_value=None,
 1351                                      side_effect=self._generate_get_error())
 1352         self.consumer_repo.get = fail_method
 1353 
 1354         self.setup_project_repository_mock()
 1355         self.setup_secret_consumer_repository_mock(self.consumer_repo)
 1356         self.resource = SecretConsumerResource(consumer_id=self.consumer_id)
 1357 
 1358     def test_rules_should_be_loaded(self):
 1359         self.assertIsNotNone(self.policy_enforcer.rules)
 1360 
 1361     def test_should_pass_get_consumer(self):
 1362         self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
 1363                                self._invoke_on_get)
 1364 
 1365     def test_should_raise_get_consumer(self):
 1366         self._assert_fail_rbac([None, 'bogus'],
 1367                                self._invoke_on_get)
 1368 
 1369     def _invoke_on_get(self):
 1370         self.resource.on_get(self.req, self.resp)