"Fossies" - the Fresh Open Source Software Archive

Member "zun-7.0.0/zun/common/policy.py" (14 Apr 2021, 7160 Bytes) of package /linux/misc/openstack/zun-7.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "policy.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 6.0.0_vs_7.0.0.

    1 # Copyright (c) 2015 OpenStack Foundation
    2 # All Rights Reserved.
    3 #
    4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 #    not use this file except in compliance with the License. You may obtain
    6 #    a copy of the License at
    7 #
    8 #         http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 #    Unless required by applicable law or agreed to in writing, software
   11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 #    License for the specific language governing permissions and limitations
   14 #    under the License.
   15 
   16 """Policy Engine For zun."""
   17 
   18 from oslo_log import log as logging
   19 from oslo_policy import opts
   20 from oslo_policy import policy
   21 from oslo_utils import excutils
   22 
   23 from zun.common import exception
   24 from zun.common import policies
   25 import zun.conf
   26 
   27 _ENFORCER = None
   28 CONF = zun.conf.CONF
   29 LOG = logging.getLogger(__name__)
   30 
   31 # TODO(gmann): Remove setting the default value of config policy_file
   32 # once oslo_policy change the default value to 'policy.yaml'.
   33 # https://github.com/openstack/oslo.policy/blob/a626ad12fe5a3abd49d70e3e5b95589d279ab578/oslo_policy/opts.py#L49
   34 DEFAULT_POLICY_FILE = 'policy.yaml'
   35 opts.set_defaults(CONF, DEFAULT_POLICY_FILE)
   36 
   37 
   38 # we can get a policy enforcer by this init.
   39 # oslo policy support change policy rule dynamically.
   40 # at present, policy.enforce will reload the policy rules when it checks
   41 # the policy files have been touched.
   42 def init(policy_file=None, rules=None,
   43          default_rule=None, use_conf=True, overwrite=True):
   44     """Init an Enforcer class.
   45 
   46         :param policy_file: Custom policy file to use, if none is
   47                             specified, ``conf.policy_file`` will be
   48                             used.
   49         :param rules: Default dictionary / Rules to use. It will be
   50                       considered just in the first instantiation. If
   51                       :meth:`load_rules` with ``force_reload=True``,
   52                       :meth:`clear` or :meth:`set_rules` with
   53                       ``overwrite=True`` is called this will be overwritten.
   54         :param default_rule: Default rule to use, conf.default_rule will
   55                              be used if none is specified.
   56         :param use_conf: Whether to load rules from cache or config file.
   57         :param overwrite: Whether to overwrite existing rules when reload rules
   58                           from config file.
   59     """
   60     global _ENFORCER
   61     if not _ENFORCER:
   62         # https://docs.openstack.org/oslo.policy/latest/user/usage.html
   63         _ENFORCER = policy.Enforcer(CONF,
   64                                     policy_file=policy_file,
   65                                     rules=rules,
   66                                     default_rule=default_rule,
   67                                     use_conf=use_conf,
   68                                     overwrite=overwrite)
   69         register_rules(_ENFORCER)
   70     return _ENFORCER
   71 
   72 
   73 def register_rules(enforcer):
   74     enforcer.register_defaults(policies.list_rules())
   75 
   76 
   77 def enforce(context, rule=None, target=None,
   78             do_raise=True, exc=None, *args, **kwargs):
   79 
   80     """Checks authorization of a rule against the target and credentials.
   81 
   82         :param dict context: As much information about the user performing the
   83                              action as possible.
   84         :param rule: The rule to evaluate.
   85         :param dict target: As much information about the object being operated
   86                             on as possible.
   87         :param do_raise: Whether to raise an exception or not if check
   88                          fails.
   89         :param exc: Class of the exception to raise if the check fails.
   90                     Any remaining arguments passed to :meth:`enforce` (both
   91                     positional and keyword arguments) will be passed to
   92                     the exception class. If not specified,
   93                     :class:`PolicyNotAuthorized` will be used.
   94 
   95         :return: ``False`` if the policy does not allow the action and `exc` is
   96                  not provided; otherwise, returns a value that evaluates to
   97                  ``True``.  Note: for rules using the "case" expression, this
   98                  ``True`` value will be the specified string from the
   99                  expression.
  100     """
  101     enforcer = init()
  102     credentials = context.to_policy_values()
  103     if not exc:
  104         exc = exception.PolicyNotAuthorized
  105     if target is None:
  106         target = {'project_id': context.project_id,
  107                   'user_id': context.user_id}
  108     return enforcer.enforce(rule, target, credentials,
  109                             do_raise=do_raise, exc=exc, *args, **kwargs)
  110 
  111 
  112 def authorize(context, action, target, do_raise=True, exc=None,
  113               might_not_exist=False):
  114     """Verifies that the action is valid on the target in this context.
  115 
  116        :param context: zun context
  117        :param action: string representing the action to be checked
  118            this should be colon separated for clarity.
  119            i.e. ``network:attach_external_network``
  120        :param target: dictionary representing the object of the action
  121            for object creation this should be a dictionary representing the
  122            location of the object e.g. ``{'project_id': context.project_id}``
  123        :param do_raise: if True (the default), raises PolicyNotAuthorized;
  124            if False, returns False
  125        :param exc: Class of the exception to raise if the check fails.
  126             Any remaining arguments passed to :meth:`authorize` (both
  127             positional and keyword arguments) will be passed to
  128             the exception class. If not specified,
  129             :class:`PolicyNotAuthorized` will be used.
  130        :param might_not_exist: If True the policy check is skipped (and the
  131             function returns True) if the specified policy does not exist.
  132             Defaults to false.
  133 
  134        :raises zun.common.exception.PolicyNotAuthorized: if verification fails
  135            and do_raise is True. Or if 'exc' is specified it will raise an
  136            exception of that type.
  137 
  138        :return: returns a non-False value (not necessarily "True") if
  139            authorized, and the exact value False if not authorized and
  140            do_raise is False.
  141     """
  142     credentials = context.to_policy_values()
  143     if not exc:
  144         exc = exception.PolicyNotAuthorized
  145     if might_not_exist and not (_ENFORCER.rules and action in _ENFORCER.rules):
  146         return True
  147     try:
  148         result = _ENFORCER.enforce(action, target, credentials,
  149                                    do_raise=do_raise, exc=exc, action=action)
  150     except Exception:
  151         with excutils.save_and_reraise_exception():
  152             LOG.debug('Policy check for %(action)s failed with credentials '
  153                       '%(credentials)s',
  154                       {'action': action, 'credentials': credentials})
  155     return result
  156 
  157 
  158 def check_is_admin(context):
  159     """Whether or not user is admin according to policy setting.
  160 
  161     """
  162     init()
  163     target = {}
  164     credentials = context.to_policy_values()
  165     return _ENFORCER.enforce('context_is_admin', target, credentials)