"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/unit/test_policy.py" (13 May 2020, 9881 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_policy.py": 16.0.1_vs_17.0.0.

    1 # Copyright 2011 Piston Cloud Computing, Inc.
    2 # All Rights Reserved.
    3 
    4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 #    not use this file except in compliance with the License. You may obtain
    6 #    a copy of the License at
    7 #
    8 #         http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 #    Unless required by applicable law or agreed to in writing, software
   11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 #    License for the specific language governing permissions and limitations
   14 #    under the License.
   15 
   16 import os
   17 import subprocess
   18 from unittest import mock
   19 import uuid
   20 
   21 from oslo_policy import policy as common_policy
   22 
   23 from keystone.common import policies
   24 from keystone.common.rbac_enforcer import policy
   25 import keystone.conf
   26 from keystone import exception
   27 from keystone.tests import unit
   28 from keystone.tests.unit import ksfixtures
   29 from keystone.tests.unit.ksfixtures import temporaryfile
   30 
   31 
   32 CONF = keystone.conf.CONF
   33 
   34 
   35 class PolicyFileTestCase(unit.TestCase):
   36     def setUp(self):
   37         # self.tmpfilename should exist before setUp super is called
   38         # this is to ensure it is available for the config_fixture in
   39         # the config_overrides call.
   40         self.tempfile = self.useFixture(temporaryfile.SecureTempFile())
   41         self.tmpfilename = self.tempfile.file_name
   42         super(PolicyFileTestCase, self).setUp()
   43         self.target = {}
   44 
   45     def _policy_fixture(self):
   46         return ksfixtures.Policy(
   47             self.config_fixture, policy_file=self.tmpfilename
   48         )
   49 
   50     def test_modified_policy_reloads(self):
   51         action = "example:test"
   52         empty_credentials = {}
   53         with open(self.tmpfilename, "w") as policyfile:
   54             policyfile.write("""{"example:test": []}""")
   55         policy.enforce(empty_credentials, action, self.target)
   56         with open(self.tmpfilename, "w") as policyfile:
   57             policyfile.write("""{"example:test": ["false:false"]}""")
   58         policy._ENFORCER._enforcer.clear()
   59         self.assertRaises(exception.ForbiddenAction, policy.enforce,
   60                           empty_credentials, action, self.target)
   61 
   62 
   63 class PolicyTestCase(unit.TestCase):
   64     def setUp(self):
   65         super(PolicyTestCase, self).setUp()
   66         self.rules = {
   67             "true": [],
   68             "example:allowed": [],
   69             "example:denied": [["false:false"]],
   70             "example:get_http": [["http:http://www.example.com"]],
   71             "example:my_file": [["role:compute_admin"],
   72                                 ["project_id:%(project_id)s"]],
   73             "example:early_and_fail": [["false:false", "rule:true"]],
   74             "example:early_or_success": [["rule:true"], ["false:false"]],
   75             "example:lowercase_admin": [["role:admin"], ["role:sysadmin"]],
   76             "example:uppercase_admin": [["role:ADMIN"], ["role:sysadmin"]],
   77         }
   78 
   79         # NOTE(vish): then overload underlying policy engine
   80         self._set_rules()
   81         self.credentials = {}
   82         self.target = {}
   83 
   84     def _set_rules(self):
   85         these_rules = common_policy.Rules.from_dict(self.rules)
   86         policy._ENFORCER._enforcer.set_rules(these_rules)
   87 
   88     def test_enforce_nonexistent_action_throws(self):
   89         action = "example:noexist"
   90         self.assertRaises(exception.ForbiddenAction, policy.enforce,
   91                           self.credentials, action, self.target)
   92 
   93     def test_enforce_bad_action_throws(self):
   94         action = "example:denied"
   95         self.assertRaises(exception.ForbiddenAction, policy.enforce,
   96                           self.credentials, action, self.target)
   97 
   98     def test_enforce_good_action(self):
   99         action = "example:allowed"
  100         policy.enforce(self.credentials, action, self.target)
  101 
  102     def test_templatized_enforcement(self):
  103         target_mine = {'project_id': 'fake'}
  104         target_not_mine = {'project_id': 'another'}
  105         credentials = {'project_id': 'fake', 'roles': []}
  106         action = "example:my_file"
  107         policy.enforce(credentials, action, target_mine)
  108         self.assertRaises(exception.ForbiddenAction, policy.enforce,
  109                           credentials, action, target_not_mine)
  110 
  111     def test_early_AND_enforcement(self):
  112         action = "example:early_and_fail"
  113         self.assertRaises(exception.ForbiddenAction, policy.enforce,
  114                           self.credentials, action, self.target)
  115 
  116     def test_early_OR_enforcement(self):
  117         action = "example:early_or_success"
  118         policy.enforce(self.credentials, action, self.target)
  119 
  120     def test_ignore_case_role_check(self):
  121         lowercase_action = "example:lowercase_admin"
  122         uppercase_action = "example:uppercase_admin"
  123         # NOTE(dprince): We mix case in the Admin role here to ensure
  124         # case is ignored
  125         admin_credentials = {'roles': ['AdMiN']}
  126         policy.enforce(admin_credentials, lowercase_action, self.target)
  127         policy.enforce(admin_credentials, uppercase_action, self.target)
  128 
  129 
  130 class PolicyScopeTypesEnforcementTestCase(unit.TestCase):
  131 
  132     def setUp(self):
  133         super(PolicyScopeTypesEnforcementTestCase, self).setUp()
  134         rule = common_policy.RuleDefault(
  135             name='foo',
  136             check_str='',
  137             scope_types=['system']
  138         )
  139         policy._ENFORCER._enforcer.register_default(rule)
  140         self.credentials = {}
  141         self.action = 'foo'
  142         self.target = {}
  143 
  144     def test_forbidden_is_raised_if_enforce_scope_is_true(self):
  145         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  146         self.assertRaises(
  147             exception.ForbiddenAction, policy.enforce, self.credentials,
  148             self.action, self.target
  149         )
  150 
  151     def test_warning_message_is_logged_if_enforce_scope_is_false(self):
  152         self.config_fixture.config(group='oslo_policy', enforce_scope=False)
  153         expected_msg = (
  154             'Policy foo failed scope check. The token used to make the '
  155             'request was project scoped but the policy requires [\'system\'] '
  156             'scope. This behavior may change in the future where using the '
  157             'intended scope is required'
  158         )
  159         with mock.patch('warnings.warn') as mock_warn:
  160             policy.enforce(self.credentials, self.action, self.target)
  161             mock_warn.assert_called_with(expected_msg)
  162 
  163 
  164 class PolicyJsonTestCase(unit.TestCase):
  165 
  166     def _get_default_policy_rules(self):
  167         """Return a dictionary of all in-code policies.
  168 
  169         All policies have a default value that is maintained in code.
  170         This method returns a dictionary containing all default policies.
  171         """
  172         rules = dict()
  173         for rule in policies.list_rules():
  174             rules[rule.name] = rule.check_str
  175         return rules
  176 
  177     def test_policies_loads(self):
  178         action = 'identity:list_projects'
  179         target = {'user_id': uuid.uuid4().hex,
  180                   'user.domain_id': uuid.uuid4().hex,
  181                   'group.domain_id': uuid.uuid4().hex,
  182                   'project.domain_id': uuid.uuid4().hex,
  183                   'project_id': uuid.uuid4().hex,
  184                   'domain_id': uuid.uuid4().hex}
  185         credentials = {'username': uuid.uuid4().hex, 'token': uuid.uuid4().hex,
  186                        'project_name': None, 'user_id': uuid.uuid4().hex,
  187                        'roles': [u'admin'], 'is_admin': True,
  188                        'is_admin_project': True, 'project_id': None,
  189                        'domain_id': uuid.uuid4().hex}
  190 
  191         # The enforcer is setup behind the scenes and registers the in code
  192         # default policies.
  193         result = policy._ENFORCER._enforcer.enforce(action, target,
  194                                                     credentials)
  195         self.assertTrue(result)
  196 
  197     def test_all_targets_documented(self):
  198         policy_keys = self._get_default_policy_rules()
  199 
  200         # These keys are in the policy.json but aren't targets.
  201         policy_rule_keys = [
  202             'admin_or_owner', 'admin_or_token_subject', 'admin_required',
  203             'owner', 'service_admin_or_token_subject', 'service_or_admin',
  204             'service_role', 'token_subject', ]
  205 
  206         def read_doc_targets():
  207             # Parse the doc/source/policy_mapping.rst file and return the
  208             # targets.
  209 
  210             doc_path = os.path.join(
  211                 unit.ROOTDIR, 'doc', 'source', 'getting-started',
  212                 'policy_mapping.rst')
  213             with open(doc_path) as doc_file:
  214                 for line in doc_file:
  215                     if line.startswith('Target'):
  216                         break
  217                 for line in doc_file:
  218                     # Skip === line
  219                     if line.startswith('==='):
  220                         break
  221                 for line in doc_file:
  222                     line = line.rstrip()
  223                     if not line or line.startswith(' '):
  224                         continue
  225                     if line.startswith('=='):
  226                         break
  227                     target, dummy, dummy = line.partition(' ')
  228                     yield str(target)
  229 
  230         doc_targets = list(read_doc_targets())
  231         self.assertItemsEqual(policy_keys, doc_targets + policy_rule_keys)
  232 
  233 
  234 class GeneratePolicyFileTestCase(unit.TestCase):
  235 
  236     def test_policy_generator_from_command_line(self):
  237         # This test ensures keystone.common.policy:get_enforcer ignores
  238         # unexpected arguments before handing them off to oslo.config, which
  239         # will fail and prevent users from generating policy files.
  240         ret_val = subprocess.Popen(
  241             ['oslopolicy-policy-generator', '--namespace', 'keystone'],
  242             stdout=subprocess.PIPE,
  243             stderr=subprocess.PIPE
  244         )
  245         output = ret_val.communicate()
  246         self.assertEqual(ret_val.returncode, 0, output)