"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/api/roles.py" (13 May 2020, 12263 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "roles.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 16.0.1_vs_17.0.0.

    1 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 #    not use this file except in compliance with the License. You may obtain
    3 #    a copy of the License at
    4 #
    5 #         http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 #    Unless required by applicable law or agreed to in writing, software
    8 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 #    License for the specific language governing permissions and limitations
   11 #    under the License.
   12 
   13 # This file handles all flask-restful resources for /v3/roles
   14 
   15 import flask
   16 import flask_restful
   17 import http.client
   18 
   19 from keystone.api._shared import implied_roles as shared
   20 from keystone.assignment import schema
   21 from keystone.common import json_home
   22 from keystone.common import provider_api
   23 from keystone.common import rbac_enforcer
   24 from keystone.common import validation
   25 import keystone.conf
   26 from keystone.server import flask as ks_flask
   27 
   28 
   29 CONF = keystone.conf.CONF
   30 ENFORCER = rbac_enforcer.RBACEnforcer
   31 PROVIDERS = provider_api.ProviderAPIs
   32 
   33 
   34 class RoleResource(ks_flask.ResourceBase):
   35     collection_key = 'roles'
   36     member_key = 'role'
   37     get_member_from_driver = PROVIDERS.deferred_provider_lookup(
   38         api='role_api', method='get_role')
   39 
   40     def _is_domain_role(self, role):
   41         return bool(role.get('domain_id'))
   42 
   43     def get(self, role_id=None):
   44         """Get role or list roles.
   45 
   46         GET/HEAD /v3/roles
   47         GET/HEAD /v3/roles/{role_id}
   48         """
   49         if role_id is not None:
   50             return self._get_role(role_id)
   51         return self._list_roles()
   52 
   53     def _get_role(self, role_id):
   54         err = None
   55         role = {}
   56         try:
   57             role = PROVIDERS.role_api.get_role(role_id)
   58         except Exception as e:  # nosec
   59             # We don't raise out here, we raise out after enforcement, this
   60             # ensures we do not leak role existence. Do nothing yet, process
   61             # enforcement before raising out an error.
   62             err = e
   63         finally:
   64             # NOTE(morgan): There are a couple of cases to be aware of here
   65             # if there is an exception (e is not None), then we are enforcing
   66             # on "get_role" to be safe. If the role is not a "domain_role",
   67             # we are enforcing on "get_role". If the role is "domain_role" we
   68             # are inforcing on "get_domain_role"
   69             if err is not None or not self._is_domain_role(role):
   70                 ENFORCER.enforce_call(action='identity:get_role')
   71                 if err:
   72                     # reraise the error after enforcement if needed.
   73                     raise err
   74             else:
   75                 ENFORCER.enforce_call(action='identity:get_domain_role',
   76                                       member_target_type='role',
   77                                       member_target=role)
   78         return self.wrap_member(role)
   79 
   80     def _list_roles(self):
   81         filters = ['name', 'domain_id']
   82         domain_filter = flask.request.args.get('domain_id')
   83         if domain_filter:
   84             ENFORCER.enforce_call(action='identity:list_domain_roles',
   85                                   filters=filters)
   86         else:
   87             ENFORCER.enforce_call(action='identity:list_roles',
   88                                   filters=filters)
   89 
   90         hints = self.build_driver_hints(filters)
   91         if not domain_filter:
   92             # NOTE(jamielennox): To handle the default case of not domain_id
   93             # defined the role_assignment backend does some hackery to
   94             # distinguish between global and domain scoped roles. This backend
   95             # behaviour relies upon a value of domain_id being set (not just
   96             # defaulting to None). Manually set the filter if its not
   97             # provided.
   98             hints.add_filter('domain_id', None)
   99         refs = PROVIDERS.role_api.list_roles(hints=hints)
  100         return self.wrap_collection(refs, hints=hints)
  101 
  102     def post(self):
  103         """Create role.
  104 
  105         POST /v3/roles
  106         """
  107         role = self.request_body_json.get('role', {})
  108         if self._is_domain_role(role):
  109             ENFORCER.enforce_call(action='identity:create_domain_role')
  110         else:
  111             ENFORCER.enforce_call(action='identity:create_role')
  112         validation.lazy_validate(schema.role_create, role)
  113         role = self._assign_unique_id(role)
  114         role = self._normalize_dict(role)
  115         ref = PROVIDERS.role_api.create_role(
  116             role['id'], role, initiator=self.audit_initiator)
  117         return self.wrap_member(ref), http.client.CREATED
  118 
  119     def patch(self, role_id):
  120         """Update role.
  121 
  122         PATCH /v3/roles/{role_id}
  123         """
  124         err = None
  125         role = {}
  126         try:
  127             role = PROVIDERS.role_api.get_role(role_id)
  128         except Exception as e:  # nosec
  129             # We don't raise out here, we raise out after enforcement, this
  130             # ensures we do not leak role existence. Do nothing yet, process
  131             # enforcement before raising out an error.
  132             err = e
  133         finally:
  134             if err is not None or not self._is_domain_role(role):
  135                 ENFORCER.enforce_call(action='identity:update_role')
  136                 if err:
  137                     raise err
  138             else:
  139                 ENFORCER.enforce_call(action='identity:update_domain_role',
  140                                       member_target_type='role',
  141                                       member_target=role)
  142         request_body_role = self.request_body_json.get('role', {})
  143         validation.lazy_validate(schema.role_update, request_body_role)
  144         self._require_matching_id(request_body_role)
  145         ref = PROVIDERS.role_api.update_role(
  146             role_id, request_body_role, initiator=self.audit_initiator)
  147         return self.wrap_member(ref)
  148 
  149     def delete(self, role_id):
  150         """Delete role.
  151 
  152         DELETE /v3/roles/{role_id}
  153         """
  154         err = None
  155         role = {}
  156         try:
  157             role = PROVIDERS.role_api.get_role(role_id)
  158         except Exception as e:  # nosec
  159             # We don't raise out here, we raise out after enforcement, this
  160             # ensures we do not leak role existence. Do nothing yet, process
  161             # enforcement before raising out an error.
  162             err = e
  163         finally:
  164             if err is not None or not self._is_domain_role(role):
  165                 ENFORCER.enforce_call(action='identity:delete_role')
  166                 if err:
  167                     raise err
  168             else:
  169                 ENFORCER.enforce_call(action='identity:delete_domain_role',
  170                                       member_target_type='role',
  171                                       member_target=role)
  172         PROVIDERS.role_api.delete_role(role_id, initiator=self.audit_initiator)
  173         return None, http.client.NO_CONTENT
  174 
  175 
  176 def _build_enforcement_target_ref():
  177     ref = {}
  178     if flask.request.view_args:
  179         ref['prior_role'] = PROVIDERS.role_api.get_role(
  180             flask.request.view_args.get('prior_role_id'))
  181         if flask.request.view_args.get('implied_role_id'):
  182             ref['implied_role'] = PROVIDERS.role_api.get_role(
  183                 flask.request.view_args['implied_role_id'])
  184     return ref
  185 
  186 
  187 class RoleImplicationListResource(flask_restful.Resource):
  188     def get(self, prior_role_id):
  189         """List Implied Roles.
  190 
  191         GET/HEAD /v3/roles/{prior_role_id}/implies
  192         """
  193         ENFORCER.enforce_call(action='identity:list_implied_roles',
  194                               build_target=_build_enforcement_target_ref)
  195         ref = PROVIDERS.role_api.list_implied_roles(prior_role_id)
  196         implied_ids = [r['implied_role_id'] for r in ref]
  197         response_json = shared.role_inference_response(prior_role_id)
  198         response_json['role_inference']['implies'] = []
  199         for implied_id in implied_ids:
  200             implied_role = PROVIDERS.role_api.get_role(implied_id)
  201             response_json['role_inference']['implies'].append(
  202                 shared.build_implied_role_response_data(implied_role))
  203         response_json['links'] = {
  204             'self': ks_flask.base_url(
  205                 path='/roles/%s/implies' % prior_role_id)}
  206         return response_json
  207 
  208 
  209 class RoleImplicationResource(flask_restful.Resource):
  210 
  211     def head(self, prior_role_id, implied_role_id=None):
  212         # TODO(morgan): deprecate "check_implied_role" policy, as a user must
  213         # have both check_implied_role and get_implied_role to use the head
  214         # action. This enforcement of HEAD is historical for
  215         # consistent policy enforcement behavior even if it is superfluous.
  216         # Alternatively we can keep check_implied_role and reference
  217         # ._get_implied_role instead.
  218         ENFORCER.enforce_call(action='identity:check_implied_role',
  219                               build_target=_build_enforcement_target_ref)
  220         self.get(prior_role_id, implied_role_id)
  221         # NOTE(morgan): Our API here breaks HTTP Spec. This should be evaluated
  222         # for a future fix. This should just return the above "get" however,
  223         # we document and implment this as a NO_CONTENT response. NO_CONTENT
  224         # here is incorrect. It is maintained as is for API contract reasons.
  225         return None, http.client.NO_CONTENT
  226 
  227     def get(self, prior_role_id, implied_role_id):
  228         """Get implied role.
  229 
  230         GET/HEAD /v3/roles/{prior_role_id}/implies/{implied_role_id}
  231         """
  232         ENFORCER.enforce_call(
  233             action='identity:get_implied_role',
  234             build_target=_build_enforcement_target_ref)
  235         return self._get_implied_role(prior_role_id, implied_role_id)
  236 
  237     def _get_implied_role(self, prior_role_id, implied_role_id):
  238         # Isolate this logic so it can be re-used without added enforcement
  239         PROVIDERS.role_api.get_implied_role(
  240             prior_role_id, implied_role_id)
  241         implied_role_ref = PROVIDERS.role_api.get_role(implied_role_id)
  242         response_json = shared.role_inference_response(prior_role_id)
  243         response_json['role_inference'][
  244             'implies'] = shared.build_implied_role_response_data(
  245             implied_role_ref)
  246         response_json['links'] = {
  247             'self': ks_flask.base_url(
  248                 path='/roles/%(prior)s/implies/%(implies)s' % {
  249                     'prior': prior_role_id, 'implies': implied_role_id})}
  250         return response_json
  251 
  252     def put(self, prior_role_id, implied_role_id):
  253         """Create implied role.
  254 
  255         PUT /v3/roles/{prior_role_id}/implies/{implied_role_id}
  256         """
  257         ENFORCER.enforce_call(action='identity:create_implied_role',
  258                               build_target=_build_enforcement_target_ref)
  259         PROVIDERS.role_api.create_implied_role(prior_role_id, implied_role_id)
  260         response_json = self._get_implied_role(prior_role_id, implied_role_id)
  261         return response_json, http.client.CREATED
  262 
  263     def delete(self, prior_role_id, implied_role_id):
  264         """Delete implied role.
  265 
  266         DELETE /v3/roles/{prior_role_id}/implies/{implied_role_id}
  267         """
  268         ENFORCER.enforce_call(action='identity:delete_implied_role',
  269                               build_target=_build_enforcement_target_ref)
  270         PROVIDERS.role_api.delete_implied_role(prior_role_id, implied_role_id)
  271         return None, http.client.NO_CONTENT
  272 
  273 
  274 class RoleAPI(ks_flask.APIBase):
  275     _name = 'roles'
  276     _import_name = __name__
  277     resources = [RoleResource]
  278     resource_mapping = [
  279         ks_flask.construct_resource_map(
  280             resource=RoleImplicationListResource,
  281             url='/roles/<string:prior_role_id>/implies',
  282             resource_kwargs={},
  283             rel='implied_roles',
  284             path_vars={'prior_role_id': json_home.Parameters.ROLE_ID}),
  285         ks_flask.construct_resource_map(
  286             resource=RoleImplicationResource,
  287             resource_kwargs={},
  288             url=('/roles/<string:prior_role_id>/'
  289                  'implies/<string:implied_role_id>'),
  290             rel='implied_role',
  291             path_vars={
  292                 'prior_role_id': json_home.Parameters.ROLE_ID,
  293                 'implied_role_id': json_home.Parameters.ROLE_ID})
  294     ]
  295 
  296 
  297 APIs = (RoleAPI,)