"Fossies" - the Fresh Open Source Software Archive

Member "monasca-api-4.0.0/monasca_api/tests/policy/test_policy.py" (13 May 2020, 11515 Bytes) of package /linux/misc/openstack/monasca-api-4.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_policy.py": 3.1.0_vs_4.0.0.

    1 # Copyright 2017 OP5 AB
    2 # Copyright 2011 Piston Cloud Computing, Inc.
    3 # All Rights Reserved.
    4 
    5 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    6 #    not use this file except in compliance with the License. You may obtain
    7 #    a copy of the License at
    8 #
    9 #         http://www.apache.org/licenses/LICENSE-2.0
   10 #
   11 #    Unless required by applicable law or agreed to in writing, software
   12 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   13 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   14 #    License for the specific language governing permissions and limitations
   15 #    under the License.
   16 
   17 import requests_mock
   18 from unittest import mock
   19 
   20 from oslo_context import context
   21 from oslo_policy import policy as os_policy
   22 
   23 from monasca_api.common.policy import policy_engine
   24 from monasca_api.tests.policy import base
   25 
   26 
   27 class PolicyFileTestCase(base.BaseTestCase):
   28     def setUp(self):
   29         super(PolicyFileTestCase, self).setUp()
   30         self.context = context.RequestContext(user='fake',
   31                                               tenant='fake',
   32                                               is_admin=False)
   33         self.target = {}
   34 
   35     def test_modified_policy_reloads(self):
   36         tmp_file = \
   37             self.create_tempfiles(files=[('policies', '{}')], ext='.yaml')[0]
   38         base.BaseTestCase.conf_override(policy_file=tmp_file,
   39                                         group='oslo_policy')
   40 
   41         policy_engine.reset()
   42         policy_engine.init()
   43 
   44         action = 'example:test'
   45         rule = os_policy.RuleDefault(action, '')
   46         policy_engine._ENFORCER.register_defaults([rule])
   47 
   48         with open(tmp_file, 'w') as policy_file:
   49             policy_file.write('{"example:test": ""}')
   50         policy_engine.authorize(self.context, action, self.target)
   51 
   52         with open(tmp_file, 'w') as policy_file:
   53             policy_file.write('{"example:test": "!"}')
   54         policy_engine._ENFORCER.load_rules(True)
   55         self.assertRaises(os_policy.PolicyNotAuthorized,
   56                           policy_engine.authorize,
   57                           self.context, action, self.target)
   58 
   59 
   60 class PolicyTestCase(base.BaseTestCase):
   61     def setUp(self):
   62         super(PolicyTestCase, self).setUp()
   63         rules = [
   64             os_policy.RuleDefault("true", "@"),
   65             os_policy.RuleDefault("example:allowed", "@"),
   66             os_policy.RuleDefault("example:denied", "!"),
   67             os_policy.RuleDefault("old_action_not_default", "@"),
   68             os_policy.RuleDefault("new_action", "@"),
   69             os_policy.RuleDefault("old_action_default", "rule:admin_api"),
   70             os_policy.RuleDefault("example:lowercase_admin",
   71                                   "role:admin or role:sysadmin"),
   72             os_policy.RuleDefault("example:uppercase_admin",
   73                                   "role:ADMIN or role:sysadmin"),
   74             os_policy.RuleDefault("example:get_http",
   75                                   "http://www.example.com"),
   76             os_policy.RuleDefault("example:my_file",
   77                                   "role:compute_admin or "
   78                                   "project_id:%(project_id)s"),
   79             os_policy.RuleDefault("example:early_and_fail", "! and @"),
   80             os_policy.RuleDefault("example:early_or_success", "@ or !"),
   81         ]
   82         policy_engine.reset()
   83         policy_engine.init()
   84 
   85         self.context = context.RequestContext(user='fake',
   86                                               tenant='fake',
   87                                               is_admin=False)
   88         policy_engine._ENFORCER.register_defaults(rules)
   89         self.target = {}
   90 
   91     def test_authorize_nonexistent_action_throws(self):
   92 
   93         action = 'example:noexists'
   94         self.assertRaises(os_policy.PolicyNotRegistered, policy_engine.authorize,
   95                           self.context, action, self.target)
   96 
   97     def test_authorize_bad_action_throws(self):
   98         action = 'example:denied'
   99         self.assertRaises(os_policy.PolicyNotAuthorized, policy_engine.authorize,
  100                           self.context, action, self.target)
  101 
  102     def test_authorize_bad_action_noraise(self):
  103         action = "example:denied"
  104         result = policy_engine.authorize(self.context, action, self.target, False)
  105         self.assertFalse(result)
  106 
  107     def test_authorize_good_action(self):
  108         action = "example:allowed"
  109         result = policy_engine.authorize(self.context, action, self.target)
  110         self.assertTrue(result)
  111 
  112     @requests_mock.mock()
  113     def test_authorize_http_true(self, req_mock):
  114         req_mock.post('http://www.example.com/',
  115                       text='True')
  116         action = "example:get_http"
  117         target = {}
  118         result = policy_engine.authorize(self.context, action, target)
  119         self.assertTrue(result)
  120 
  121     @requests_mock.mock()
  122     def test_authorize_http_false(self, req_mock):
  123         req_mock.post('http://www.example.com/',
  124                       text='False')
  125         action = "example:get_http"
  126         target = {}
  127         self.assertRaises(os_policy.PolicyNotAuthorized, policy_engine.authorize,
  128                           self.context, action, target)
  129 
  130     def test_templatized_authorization(self):
  131         target_mine = {'project_id': 'fake'}
  132         target_not_mine = {'project_id': 'another'}
  133         action = "example:my_file"
  134         policy_engine.authorize(self.context, action, target_mine)
  135         self.assertRaises(os_policy.PolicyNotAuthorized, policy_engine.authorize,
  136                           self.context, action, target_not_mine)
  137 
  138     def test_early_AND_authorization(self):
  139         action = "example:early_and_fail"
  140         self.assertRaises(os_policy.PolicyNotAuthorized, policy_engine.authorize,
  141                           self.context, action, self.target)
  142 
  143     def test_early_OR_authorization(self):
  144         action = "example:early_or_success"
  145         policy_engine.authorize(self.context, action, self.target)
  146 
  147     def test_ignore_case_role_check(self):
  148         lowercase_action = "example:lowercase_admin"
  149         uppercase_action = "example:uppercase_admin"
  150         # NOTE(dprince) we mix case in the Admin role here to ensure
  151         # case is ignored
  152         admin_context = context.RequestContext('admin',
  153                                                'fake',
  154                                                roles=['AdMiN'])
  155         policy_engine.authorize(admin_context, lowercase_action, self.target)
  156         policy_engine.authorize(admin_context, uppercase_action, self.target)
  157 
  158     @mock.patch.object(policy_engine.LOG, 'warning')
  159     def test_warning_when_deprecated_user_based_rule_used(self, mock_warning):
  160         policy_engine._warning_for_deprecated_user_based_rules(
  161             [("os_compute_api:servers:index",
  162               "project_id:%(project_id)s or user_id:%(user_id)s")])
  163         mock_warning.assert_called_once_with(
  164             u"The user_id attribute isn't supported in the rule "
  165             "'%s'. All the user_id based policy enforcement will be removed "
  166             "in the future.", "os_compute_api:servers:index")
  167 
  168     @mock.patch.object(policy_engine.LOG, 'warning')
  169     def test_no_warning_for_user_based_resource(self, mock_warning):
  170         policy_engine._warning_for_deprecated_user_based_rules(
  171             [("os_compute_api:os-keypairs:index",
  172               "user_id:%(user_id)s")])
  173         mock_warning.assert_not_called()
  174 
  175     @mock.patch.object(policy_engine.LOG, 'warning')
  176     def test_no_warning_for_no_user_based_rule(self, mock_warning):
  177         policy_engine._warning_for_deprecated_user_based_rules(
  178             [("os_compute_api:servers:index",
  179               "project_id:%(project_id)s")])
  180         mock_warning.assert_not_called()
  181 
  182     @mock.patch.object(policy_engine.LOG, 'warning')
  183     def test_verify_deprecated_policy_using_old_action(self, mock_warning):
  184         policy_engine._ENFORCER.load_rules(True)
  185         old_policy = "old_action_not_default"
  186         new_policy = "new_action"
  187         default_rule = "rule:admin_api"
  188 
  189         using_old_action = policy_engine.verify_deprecated_policy(
  190             old_policy, new_policy, default_rule, self.context)
  191 
  192         mock_warning.assert_called_once_with(
  193             "Start using the new action '{0}'. The existing action '{1}' is "
  194             "being deprecated and will be removed in "
  195             "future release.".format(new_policy, old_policy))
  196         self.assertTrue(using_old_action)
  197 
  198     def test_verify_deprecated_policy_using_new_action(self):
  199         policy_engine._ENFORCER.load_rules(True)
  200         old_policy = "old_action_default"
  201         new_policy = "new_action"
  202         default_rule = "rule:admin_api"
  203 
  204         using_old_action = policy_engine.verify_deprecated_policy(
  205             old_policy, new_policy, default_rule, self.context)
  206 
  207         self.assertFalse(using_old_action)
  208 
  209 
  210 class IsAdminCheckTestCase(base.BaseTestCase):
  211     def setUp(self):
  212         super(IsAdminCheckTestCase, self).setUp()
  213         policy_engine.init()
  214 
  215     def test_init_true(self):
  216         check = policy_engine.IsAdminCheck('is_admin', 'True')
  217 
  218         self.assertEqual(check.kind, 'is_admin')
  219         self.assertEqual(check.match, 'True')
  220         self.assertTrue(check.expected)
  221 
  222     def test_init_false(self):
  223         check = policy_engine.IsAdminCheck('is_admin', 'nottrue')
  224 
  225         self.assertEqual(check.kind, 'is_admin')
  226         self.assertEqual(check.match, 'False')
  227         self.assertFalse(check.expected)
  228 
  229     def test_call_true(self):
  230         check = policy_engine.IsAdminCheck('is_admin', 'True')
  231 
  232         self.assertTrue(check('target', dict(is_admin=True),
  233                               policy_engine._ENFORCER))
  234         self.assertFalse(check('target', dict(is_admin=False),
  235                                policy_engine._ENFORCER))
  236 
  237     def test_call_false(self):
  238         check = policy_engine.IsAdminCheck('is_admin', 'False')
  239 
  240         self.assertFalse(check('target', dict(is_admin=True),
  241                                policy_engine._ENFORCER))
  242         self.assertTrue(check('target', dict(is_admin=False),
  243                               policy_engine._ENFORCER))
  244 
  245 
  246 class AdminRolePolicyTestCase(base.BaseTestCase):
  247     def setUp(self):
  248         super(AdminRolePolicyTestCase, self).setUp()
  249         self.noadmin_context = context.RequestContext('fake', 'fake',
  250                                                       roles=['member'])
  251         self.admin_context = context.RequestContext('fake', 'fake',
  252                                                     roles=['admin'])
  253 
  254         admin_rule = [
  255             os_policy.RuleDefault('example.admin', 'role:admin'),
  256         ]
  257         policy_engine.reset()
  258         policy_engine.init(policy_file=None)
  259         policy_engine._ENFORCER.register_defaults(admin_rule)
  260         policy_engine._ENFORCER.load_rules(True)
  261         self.target = {}
  262 
  263     def test_authorize_admin_actions_with_admin_context(self):
  264         for action in policy_engine.get_rules().keys():
  265             policy_engine.authorize(self.admin_context, action, self.target)
  266 
  267     def test_authorize_admin_actions_with_nonadmin_context_throws(self):
  268         """Check if non-admin context passed to admin actions throws
  269            Policy not authorized exception
  270         """
  271         for action in policy_engine.get_rules().keys():
  272             self.assertRaises(os_policy.PolicyNotAuthorized,
  273                               policy_engine.authorize,
  274                               self.noadmin_context, action, self.target)