"Fossies" - the Fresh Open Source Software Archive

Member "barbican-12.0.0/barbican/api/controllers/__init__.py" (14 Apr 2021, 8527 Bytes) of package /linux/misc/openstack/barbican-12.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "__init__.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 11.0.0_vs_12.0.0.

    1 #  Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 #  not use this file except in compliance with the License. You may obtain
    3 #  a copy of the License at
    4 #
    5 #       http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 #  Unless required by applicable law or agreed to in writing, software
    8 #  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 #  License for the specific language governing permissions and limitations
   11 #  under the License.
   12 
   13 import collections.abc
   14 
   15 from oslo_policy import policy
   16 import pecan
   17 from webob import exc
   18 
   19 from barbican import api
   20 from barbican.common import accept
   21 from barbican.common import utils
   22 from barbican import i18n as u
   23 
   24 LOG = utils.getLogger(__name__)
   25 
   26 
   27 def is_json_request_accept(req):
   28     """Test if http request 'accept' header configured for JSON response.
   29 
   30     :param req: HTTP request
   31     :return: True if need to return JSON response.
   32     """
   33     return (
   34         type(req.accept) is accept.NoHeaderType or
   35         type(req.accept) is accept.ValidHeaderType and (
   36             req.accept.header_value == 'application/json' or
   37             req.accept.header_value == '*/*'
   38         )
   39     )
   40 
   41 
   42 def _get_barbican_context(req):
   43     if 'barbican.context' in req.environ:
   44         return req.environ['barbican.context']
   45     else:
   46         return None
   47 
   48 
   49 def _do_enforce_rbac(inst, req, action_name, ctx, **kwargs):
   50     """Enforce RBAC based on 'request' information."""
   51     if action_name and ctx:
   52 
   53         # Enforce special case: secret GET decryption
   54         if 'secret:get' == action_name and not is_json_request_accept(req):
   55             action_name = 'secret:decrypt'  # Override to perform special rules
   56 
   57         target_name, target_data = inst.get_acl_tuple(req, **kwargs)
   58         policy_dict = {}
   59         if target_name and target_data:
   60             policy_dict['target'] = {target_name: target_data}
   61 
   62         policy_dict.update(kwargs)
   63         # Enforce access controls.
   64         if ctx.policy_enforcer:
   65             ctx.policy_enforcer.authorize(action_name, flatten(policy_dict),
   66                                           ctx, do_raise=True)
   67 
   68 
   69 def enforce_rbac(action_name='default'):
   70     """Decorator handling RBAC enforcement on behalf of REST verb methods."""
   71 
   72     def rbac_decorator(fn):
   73         def enforcer(inst, *args, **kwargs):
   74             # Enforce RBAC rules.
   75 
   76             # context placed here by context.py
   77             # middleware
   78             ctx = _get_barbican_context(pecan.request)
   79             external_project_id = None
   80             if ctx:
   81                 external_project_id = ctx.project_id
   82 
   83             _do_enforce_rbac(inst, pecan.request, action_name, ctx, **kwargs)
   84             # insert external_project_id as the first arg to the guarded method
   85             args = list(args)
   86             args.insert(0, external_project_id)
   87             # Execute guarded method now.
   88             return fn(inst, *args, **kwargs)
   89 
   90         return enforcer
   91 
   92     return rbac_decorator
   93 
   94 
   95 def handle_exceptions(operation_name=u._('System')):
   96     """Decorator handling generic exceptions from REST methods."""
   97 
   98     def exceptions_decorator(fn):
   99 
  100         def handler(inst, *args, **kwargs):
  101             try:
  102                 return fn(inst, *args, **kwargs)
  103             except exc.HTTPError:
  104                 LOG.exception('Webob error seen')
  105                 raise  # Already converted to Webob exception, just reraise
  106             # In case PolicyNotAuthorized, we do not want to expose payload by
  107             # logging exception, so just LOG.error
  108             except policy.PolicyNotAuthorized as pna:
  109                 status, message = api.generate_safe_exception_message(
  110                     operation_name, pna)
  111                 LOG.error(message)
  112                 pecan.abort(status, message)
  113             except Exception as e:
  114                 # In case intervening modules have disabled logging.
  115                 LOG.logger.disabled = False
  116 
  117                 status, message = api.generate_safe_exception_message(
  118                     operation_name, e)
  119                 LOG.exception(message)
  120                 pecan.abort(status, message)
  121 
  122         return handler
  123 
  124     return exceptions_decorator
  125 
  126 
  127 def _do_enforce_content_types(pecan_req, valid_content_types):
  128     """Content type enforcement
  129 
  130     Check to see that content type in the request is one of the valid
  131     types passed in by our caller.
  132     """
  133     if pecan_req.content_type not in valid_content_types:
  134         m = u._(
  135             "Unexpected content type. Expected content types "
  136             "are: {expected}"
  137         ).format(
  138             expected=valid_content_types
  139         )
  140         pecan.abort(415, m)
  141 
  142 
  143 def enforce_content_types(valid_content_types=[]):
  144     """Decorator handling content type enforcement on behalf of REST verbs."""
  145 
  146     def content_types_decorator(fn):
  147 
  148         def content_types_enforcer(inst, *args, **kwargs):
  149             _do_enforce_content_types(pecan.request, valid_content_types)
  150             return fn(inst, *args, **kwargs)
  151 
  152         return content_types_enforcer
  153 
  154     return content_types_decorator
  155 
  156 
  157 def flatten(d, parent_key=''):
  158     """Flatten a nested dictionary
  159 
  160     Converts a dictionary with nested values to a single level flat
  161     dictionary, with dotted notation for each key.
  162 
  163     """
  164     items = []
  165     for k, v in d.items():
  166         new_key = parent_key + '.' + k if parent_key else k
  167         if isinstance(v, collections.abc.MutableMapping):
  168             items.extend(flatten(v, new_key).items())
  169         else:
  170             items.append((new_key, v))
  171     return dict(items)
  172 
  173 
  174 class ACLMixin(object):
  175 
  176     def get_acl_tuple(self, req, **kwargs):
  177         return None, None
  178 
  179     def get_acl_dict_for_user(self, req, acl_list):
  180         """Get acl operation found for token user in acl list.
  181 
  182         Token user is looked into users list present for each acl operation.
  183         If there is a match, it means that ACL data is applicable for policy
  184         logic. Policy logic requires data as dictionary so this method capture
  185         acl's operation, project_access data in that format.
  186 
  187         For operation value, matching ACL record's operation is stored in dict
  188         as key and value both.
  189         project_access flag is intended to make secret/container private for a
  190         given operation. It doesn't require user match. So its captured in dict
  191         format where key is prefixed with related operation and flag is used as
  192         its value.
  193 
  194         Then for acl related policy logic, this acl dict data is combined with
  195         target entity (secret or container) creator_id and project id. The
  196         whole dict serves as target in policy enforcement logic i.e. right
  197         hand side of policy rule.
  198 
  199         Following is sample outcome where secret or container has ACL defined
  200         and token user is among the ACL users defined for 'read' and 'list'
  201         operation.
  202 
  203         {'read': 'read', 'list': 'list', 'read_project_access': True,
  204         'list_project_access': True }
  205 
  206         Its possible that ACLs are defined without any user, they just
  207         have project_access flag set. This means only creator can read or list
  208         ACL entities. In that case, dictionary output can be as follows.
  209 
  210         {'read_project_access': False, 'list_project_access': False }
  211 
  212         """
  213         ctxt = _get_barbican_context(req)
  214         if not ctxt:
  215             return {}
  216         acl_dict = {acl.operation: acl.operation for acl in acl_list
  217                     if ctxt.user in acl.to_dict_fields().get('users', [])}
  218         co_dict = {'%s_project_access' % acl.operation: acl.project_access for
  219                    acl in acl_list if acl.project_access is not None}
  220         if not co_dict:
  221             """
  222             The co_dict is empty when the entity (secret or container) has no
  223             acls in its acl_list.  This causes any policy with
  224 
  225                 "%(target.secret.read_project_access)s"
  226             or
  227                 "%(target.container.read_project_access)s"
  228 
  229             to always evaluate to False.  This is probelmatic because we want
  230             to allow project access by default (with additional role checks).
  231             To work around this we allow read here.
  232 
  233             When the entity has an acl, co_dict will use the value from the
  234             database, and this if statement will be skipped.
  235             """
  236             co_dict = {'read_project_access': True}
  237         acl_dict.update(co_dict)
  238 
  239         return acl_dict