"Fossies" - the Fresh Open Source Software Archive

Member "monasca-api-3.1.0/monasca_api/common/policy/policy_engine.py" (27 Sep 2019, 8717 Bytes) of package /linux/misc/openstack/monasca-api-3.1.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "policy_engine.py" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 5.0.0_vs_6.0.0.

    1 # Copyright 2017 OP5 AB
    2 # Copyright 2017 FUJITSU LIMITED
    3 # Copyright (c) 2011 OpenStack Foundation
    4 # All Rights Reserved.
    5 #
    6 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    7 #    not use this file except in compliance with the License. You may obtain
    8 #    a copy of the License at
    9 #
   10 #         http://www.apache.org/licenses/LICENSE-2.0
   11 #
   12 #    Unless required by applicable law or agreed to in writing, software
   13 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   14 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   15 #    License for the specific language governing permissions and limitations
   16 #    under the License.
   17 
   18 
   19 import copy
   20 import re
   21 import sys
   22 
   23 import logging
   24 from oslo_config import cfg
   25 from oslo_policy import policy
   26 
   27 from monasca_api.common.policy.i18n import _LW
   28 
   29 CONF = cfg.CONF
   30 LOG = logging.getLogger(__name__)
   31 POLICIES = None
   32 USER_BASED_RESOURCES = ['os-keypairs']
   33 KEY_EXPR = re.compile(r'%\((\w+)\)s')
   34 
   35 
   36 _ENFORCER = None
   37 # oslo_policy will read the policy configuration file again when the file
   38 # is changed in runtime so the old policy rules will be saved to
   39 # saved_file_rules and used to compare with new rules to determine
   40 # whether the rules were updated.
   41 saved_file_rules = []
   42 
   43 
   44 def reset():
   45     """Reset Enforcer class."""
   46     global _ENFORCER
   47     if _ENFORCER:
   48         _ENFORCER.clear()
   49         _ENFORCER = None
   50 
   51 
   52 def init(policy_file=None, rules=None, default_rule=None, use_conf=True):
   53     """Init an Enforcer class.
   54 
   55        :param policy_file: Custom policy file to use, if none is specified,
   56                            `CONF.policy_file` will be used.
   57        :param rules: Default dictionary / Rules to use. It will be
   58                      considered just in the first instantiation.
   59        :param default_rule: Default rule to use, CONF.default_rule will
   60                             be used if none is specified.
   61        :param use_conf: Whether to load rules from config file.
   62     """
   63 
   64     global _ENFORCER
   65     global saved_file_rules
   66 
   67     if not _ENFORCER:
   68         _ENFORCER = policy.Enforcer(CONF,
   69                                     policy_file=policy_file,
   70                                     rules=rules,
   71                                     default_rule=default_rule,
   72                                     use_conf=use_conf
   73                                     )
   74         register_rules(_ENFORCER)
   75         _ENFORCER.load_rules()
   76     # Only the rules which are loaded from file may be changed
   77     current_file_rules = _ENFORCER.file_rules
   78     current_file_rules = _serialize_rules(current_file_rules)
   79 
   80     if saved_file_rules != current_file_rules:
   81         _warning_for_deprecated_user_based_rules(current_file_rules)
   82         saved_file_rules = copy.deepcopy(current_file_rules)
   83 
   84 
   85 def _serialize_rules(rules):
   86     """Serialize all the Rule object as string.
   87 
   88     New string is used to compare the rules list.
   89     """
   90     result = [(rule_name, str(rule)) for rule_name, rule in rules.items()]
   91     return sorted(result, key=lambda rule: rule[0])
   92 
   93 
   94 def _warning_for_deprecated_user_based_rules(rules):
   95     """Warning user based policy enforcement used in the rule but the rule
   96     doesn't support it.
   97     """
   98     for rule in rules:
   99         # We will skip the warning for the resources which support user based
  100         # policy enforcement.
  101         if [resource for resource in USER_BASED_RESOURCES
  102                 if resource in rule[0]]:
  103             continue
  104         if 'user_id' in KEY_EXPR.findall(rule[1]):
  105             LOG.warning(_LW("The user_id attribute isn't supported in the "
  106                             "rule '%s'. All the user_id based policy "
  107                             "enforcement will be removed in the "
  108                             "future."), rule[0])
  109 
  110 
  111 def register_rules(enforcer):
  112     """Register default policy rules."""
  113     rules = POLICIES.list_rules()
  114     enforcer.register_defaults(rules)
  115 
  116 
  117 def authorize(context, action, target, do_raise=True):
  118     """Verify that the action is valid on the target in this context.
  119 
  120     :param context: monasca project context
  121     :param action: String representing the action to be checked. This
  122                    should be colon separated for clarity.
  123     :param target: Dictionary representing the object of the action for
  124                    object creation. This should be a dictionary representing
  125                    the location of the object e.g.
  126                    ``{'project_id': 'context.project_id'}``
  127     :param do_raise: if True (the default), raises PolicyNotAuthorized,
  128                      if False returns False
  129     :type context: object
  130     :type action: str
  131     :type target: dict
  132     :type do_raise: bool
  133     :return: returns a non-False value (not necessarily True) if authorized,
  134              and the False if not authorized and do_raise if False
  135 
  136     :raises oslo_policy.policy.PolicyNotAuthorized: if verification fails
  137     """
  138     init()
  139     credentials = context.to_policy_values()
  140     try:
  141         result = _ENFORCER.authorize(action, target, credentials,
  142                                      do_raise=do_raise, action=action)
  143         return result
  144     except policy.PolicyNotRegistered:
  145         LOG.exception('Policy not registered')
  146         raise
  147     except Exception:
  148         LOG.debug('Policy check for %(action)s failed with credentials '
  149                   '%(credentials)s',
  150                   {'action': action, 'credentials': credentials})
  151         raise
  152 
  153 
  154 def check_is_admin(context):
  155     """Check if roles contains 'admin' role according to policy settings."""
  156     init()
  157     credentials = context.to_policy_values()
  158     target = credentials
  159     return _ENFORCER.authorize('admin_required', target, credentials)
  160 
  161 
  162 def set_rules(rules, overwrite=True, use_conf=False):  # pragma: no cover
  163     """Set rules based on the provided dict of rules.
  164 
  165     Note:
  166         Used in tests only.
  167 
  168     :param rules: New rules to use. It should be an instance of dict
  169     :param overwrite: Whether to overwrite current rules or update them
  170                       with the new rules.
  171     :param use_conf: Whether to reload rules from config file.
  172     """
  173     init(use_conf=False)
  174     _ENFORCER.set_rules(rules, overwrite, use_conf)
  175 
  176 
  177 def verify_deprecated_policy(old_policy, new_policy, default_rule, context):
  178     """Check the rule of the deprecated policy action
  179 
  180     If the current rule of the deprecated policy action is set to a non-default
  181     value, then a warning message is logged stating that the new policy
  182     action should be used to dictate permissions as the old policy action is
  183     being deprecated.
  184 
  185     :param old_policy: policy action that is being deprecated
  186     :param new_policy: policy action that is replacing old_policy
  187     :param default_rule: the old_policy action default rule value
  188     :param context: the monasca context
  189     """
  190 
  191     if _ENFORCER:
  192         current_rule = str(_ENFORCER.rules[old_policy])
  193     else:
  194         current_rule = None
  195 
  196     if current_rule != default_rule:
  197         LOG.warning("Start using the new action '{0}'. The existing "
  198                     "action '{1}' is being deprecated and will be "
  199                     "removed in future release.".format(new_policy,
  200                                                         old_policy))
  201         target = {'project_id': context.project_id,
  202                   'user_id': context.user_id}
  203 
  204         return authorize(context=context, action=old_policy, target=target)
  205     else:
  206         return False
  207 
  208 
  209 def get_rules():
  210     if _ENFORCER:
  211         return _ENFORCER.rules
  212 
  213 
  214 def get_enforcer():
  215     # This method is for use by oslopolicy CLI scripts. Those scripts need the
  216     # 'output-file' and 'namespace' options, but having those in sys.argv means
  217     # loading the project config options will fail as those are not expected to
  218     # be present. So we pass in an arg list with those stripped out.
  219     conf_args = []
  220     # Start at 1 because cfg.CONF expects the equivalent of sys.argv[1:]
  221     i = 1
  222     while i < len(sys.argv):
  223         if sys.argv[i].strip('-') in ['namespace', 'output-file']:
  224             i += 2
  225             continue
  226         conf_args.append(sys.argv[i])
  227         i += 1
  228 
  229     cfg.CONF(conf_args, project='monasca')
  230     init()
  231     return _ENFORCER
  232 
  233 
  234 @policy.register('is_admin')
  235 class IsAdminCheck(policy.Check):
  236     """An explicit check for is_admin."""
  237 
  238     def __init__(self, kind, match):
  239         """Initialize the check."""
  240 
  241         self.expected = (match.lower() == 'true')
  242 
  243         super(IsAdminCheck, self).__init__(kind, str(self.expected))
  244 
  245     def __call__(self, target, creds, enforcer):
  246         """Determine whether is_admin matches the requested value."""
  247 
  248         return creds['is_admin'] == self.expected