"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/api/trusts.py" (13 May 2020, 20650 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "trusts.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 16.0.1_vs_17.0.0.

    1 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 #    not use this file except in compliance with the License. You may obtain
    3 #    a copy of the License at
    4 #
    5 #         http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 #    Unless required by applicable law or agreed to in writing, software
    8 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 #    License for the specific language governing permissions and limitations
   11 #    under the License.
   12 
   13 # This file handles all flask-restful resources for /v3/OS-TRUST
   14 
   15 # TODO(morgan): Deprecate /v3/OS-TRUST/trusts path in favour of /v3/trusts.
   16 # /v3/OS-TRUST should remain indefinitely.
   17 
   18 import flask
   19 import flask_restful
   20 import http.client
   21 from oslo_log import log
   22 from oslo_policy import _checks as op_checks
   23 
   24 from keystone.api._shared import json_home_relations
   25 from keystone.common import context
   26 from keystone.common import json_home
   27 from keystone.common import provider_api
   28 from keystone.common import rbac_enforcer
   29 from keystone.common.rbac_enforcer import policy
   30 from keystone.common import utils
   31 from keystone.common import validation
   32 from keystone import exception
   33 from keystone.i18n import _
   34 from keystone.server import flask as ks_flask
   35 from keystone.trust import schema
   36 
   37 
   38 LOG = log.getLogger(__name__)
   39 ENFORCER = rbac_enforcer.RBACEnforcer
   40 PROVIDERS = provider_api.ProviderAPIs
   41 
   42 _build_resource_relation = json_home_relations.os_trust_resource_rel_func
   43 _build_parameter_relation = json_home_relations.os_trust_parameter_rel_func
   44 
   45 TRUST_ID_PARAMETER_RELATION = _build_parameter_relation(
   46     parameter_name='trust_id')
   47 
   48 
   49 def _build_trust_target_enforcement():
   50     target = {}
   51     # NOTE(cmurphy) unlike other APIs, in the event the trust doesn't exist or
   52     # has 0 remaining uses, we actually do expect it to return a 404 and not a
   53     # 403, so don't catch NotFound here (lp#1840288)
   54     target['trust'] = PROVIDERS.trust_api.get_trust(
   55         flask.request.view_args.get('trust_id')
   56     )
   57 
   58     return target
   59 
   60 
   61 def _trustor_trustee_only(trust):
   62     user_id = flask.request.environ.get(context.REQUEST_CONTEXT_ENV).user_id
   63     if user_id not in [trust.get('trustee_user_id'),
   64                        trust.get('trustor_user_id')]:
   65         raise exception.ForbiddenAction(
   66             action=_('Requested user has no relation to this trust'))
   67 
   68 
   69 def _normalize_trust_expires_at(trust):
   70     # correct isotime
   71     if trust.get('expires_at') is not None:
   72         trust['expires_at'] = utils.isotime(trust['expires_at'],
   73                                             subsecond=True)
   74 
   75 
   76 def _normalize_trust_roles(trust):
   77     # fill in role data
   78     trust_full_roles = []
   79     for trust_role in trust.get('roles', []):
   80         trust_role = trust_role['id']
   81         try:
   82             matching_role = PROVIDERS.role_api.get_role(trust_role)
   83             full_role = ks_flask.ResourceBase.wrap_member(
   84                 matching_role, collection_name='roles', member_name='role')
   85             trust_full_roles.append(full_role['role'])
   86         except exception.RoleNotFound:
   87             pass
   88 
   89     trust['roles'] = trust_full_roles
   90     trust['roles_links'] = {
   91         'self': ks_flask.base_url(path='/%s/roles' % trust['id']),
   92         'next': None,
   93         'previous': None}
   94 
   95 
   96 class TrustResource(ks_flask.ResourceBase):
   97     collection_key = 'trusts'
   98     member_key = 'trust'
   99     api_prefix = '/OS-TRUST'
  100     json_home_resource_rel_func = _build_resource_relation
  101     json_home_parameter_rel_func = _build_parameter_relation
  102 
  103     def _check_unrestricted(self):
  104         if self.oslo_context.is_admin:
  105             return
  106         token = self.auth_context['token']
  107         if 'application_credential' in token.methods:
  108             if not token.application_credential['unrestricted']:
  109                 action = _("Using method 'application_credential' is not "
  110                            "allowed for managing trusts.")
  111                 raise exception.ForbiddenAction(action=action)
  112 
  113     def _find_redelegated_trust(self):
  114         # Check if delegated via trust
  115         redelegated_trust = None
  116         if self.oslo_context.is_delegated_auth:
  117             src_trust_id = self.oslo_context.trust_id
  118             if not src_trust_id:
  119                 action = _('Redelegation allowed for delegated by trust only')
  120                 raise exception.ForbiddenAction(action=action)
  121             redelegated_trust = PROVIDERS.trust_api.get_trust(src_trust_id)
  122         return redelegated_trust
  123 
  124     @staticmethod
  125     def _parse_expiration_date(expiration_date):
  126         if expiration_date is not None:
  127             return utils.parse_expiration_date(expiration_date)
  128         return None
  129 
  130     def _require_trustor_has_role_in_project(self, trust):
  131         trustor_roles = self._get_trustor_roles(trust)
  132         for trust_role in trust['roles']:
  133             matching_roles = [x for x in trustor_roles
  134                               if x == trust_role['id']]
  135             if not matching_roles:
  136                 raise exception.RoleNotFound(role_id=trust_role['id'])
  137 
  138     def _get_trustor_roles(self, trust):
  139         original_trust = trust.copy()
  140         while original_trust.get('redelegated_trust_id'):
  141             original_trust = PROVIDERS.trust_api.get_trust(
  142                 original_trust['redelegated_trust_id'])
  143 
  144         if not ((trust.get('project_id')) in [None, '']):
  145             # Check project exists.
  146             PROVIDERS.resource_api.get_project(trust['project_id'])
  147             # Get a list of roles including any domain specific roles
  148             assignment_list = PROVIDERS.assignment_api.list_role_assignments(
  149                 user_id=original_trust['trustor_user_id'],
  150                 project_id=original_trust['project_id'],
  151                 effective=True, strip_domain_roles=False)
  152             return list({x['role_id'] for x in assignment_list})
  153         else:
  154             return []
  155 
  156     def _normalize_role_list(self, trust_roles):
  157         roles = []
  158         for role in trust_roles:
  159             if role.get('id'):
  160                 roles.append({'id': role['id']})
  161             else:
  162                 roles.append(
  163                     PROVIDERS.role_api.get_unique_role_by_name(role['name']))
  164         return roles
  165 
  166     def _get_trust(self, trust_id):
  167         ENFORCER.enforce_call(action='identity:get_trust',
  168                               build_target=_build_trust_target_enforcement)
  169 
  170         # NOTE(cmurphy) look up trust before doing is_admin authorization - to
  171         # maintain the API contract, we expect a missing trust to raise a 404
  172         # before we get to enforcement (lp#1840288)
  173         trust = PROVIDERS.trust_api.get_trust(trust_id)
  174 
  175         if self.oslo_context.is_admin:
  176             # policies are not loaded for the is_admin context, so need to
  177             # block access here
  178             raise exception.ForbiddenAction(
  179                 action=_('Requested user has no relation to this trust'))
  180 
  181         # NOTE(cmurphy) As of Train, the default policies enforce the
  182         # identity:get_trust rule. However, in case the
  183         # identity:get_trust rule has been locally overridden by the
  184         # default that would have been produced by the sample config, we need
  185         # to enforce it again and warn that the behavior is changing.
  186         rules = policy._ENFORCER._enforcer.rules.get('identity:get_trust')
  187         # rule check_str is ""
  188         if isinstance(rules, op_checks.TrueCheck):
  189             LOG.warning(
  190                 "The policy check string for rule \"identity:get_trust\" "
  191                 "has been overridden to \"always true\". In the next release, "
  192                 "this will cause the" "\"identity:get_trust\" action to "
  193                 "be fully permissive as hardcoded enforcement will be "
  194                 "removed. To correct this issue, either stop overriding the "
  195                 "\"identity:get_trust\" rule in config to accept the "
  196                 "defaults, or explicitly set a rule that is not empty."
  197             )
  198             _trustor_trustee_only(trust)
  199 
  200         _normalize_trust_expires_at(trust)
  201         _normalize_trust_roles(trust)
  202         return self.wrap_member(trust)
  203 
  204     def _list_trusts(self):
  205         trustor_user_id = flask.request.args.get('trustor_user_id')
  206         trustee_user_id = flask.request.args.get('trustee_user_id')
  207         if trustor_user_id:
  208             target = {'trust': {'trustor_user_id': trustor_user_id}}
  209             ENFORCER.enforce_call(action='identity:list_trusts_for_trustor',
  210                                   target_attr=target)
  211         elif trustee_user_id:
  212             target = {'trust': {'trustee_user_id': trustee_user_id}}
  213             ENFORCER.enforce_call(action='identity:list_trusts_for_trustee',
  214                                   target_attr=target)
  215         else:
  216             ENFORCER.enforce_call(action='identity:list_trusts')
  217 
  218         trusts = []
  219 
  220         # NOTE(cmurphy) As of Train, the default policies enforce the
  221         # identity:list_trusts rule and there are new policies in-code to
  222         # enforce identity:list_trusts_for_trustor and
  223         # identity:list_trusts_for_trustee. However, in case the
  224         # identity:list_trusts rule has been locally overridden by the default
  225         # that would have been produced by the sample config, we need to
  226         # enforce it again and warn that the behavior is changing.
  227         rules = policy._ENFORCER._enforcer.rules.get('identity:list_trusts')
  228         # rule check_str is ""
  229         if isinstance(rules, op_checks.TrueCheck):
  230             LOG.warning(
  231                 "The policy check string for rule \"identity:list_trusts\" "
  232                 "has been overridden to \"always true\". In the next release, "
  233                 "this will cause the \"identity:list_trusts\" action to be "
  234                 "fully permissive as hardcoded enforcement will be removed. "
  235                 "To correct this issue, either stop overriding the "
  236                 "\"identity:list_trusts\" rule in config to accept the "
  237                 "defaults, or explicitly set a rule that is not empty."
  238             )
  239             if not flask.request.args:
  240                 # NOTE(morgan): Admin can list all trusts.
  241                 ENFORCER.enforce_call(action='admin_required')
  242 
  243         if not flask.request.args:
  244             trusts += PROVIDERS.trust_api.list_trusts()
  245         elif trustor_user_id:
  246             trusts += PROVIDERS.trust_api.list_trusts_for_trustor(
  247                 trustor_user_id)
  248         elif trustee_user_id:
  249             trusts += PROVIDERS.trust_api.list_trusts_for_trustee(
  250                 trustee_user_id)
  251 
  252         for trust in trusts:
  253             # get_trust returns roles, list_trusts does not
  254             # It seems in some circumstances, roles does not
  255             # exist in the query response, so check first
  256             if 'roles' in trust:
  257                 del trust['roles']
  258 
  259             if trust.get('expires_at') is not None:
  260                 trust['expires_at'] = utils.isotime(trust['expires_at'],
  261                                                     subsecond=True)
  262 
  263         return self.wrap_collection(trusts)
  264 
  265     def get(self, trust_id=None):
  266         """Dispatch for GET/HEAD or LIST trusts."""
  267         if trust_id is not None:
  268             return self._get_trust(trust_id=trust_id)
  269         else:
  270             return self._list_trusts()
  271 
  272     def post(self):
  273         """Create a new trust.
  274 
  275         The User creating the trust must be the trustor.
  276         """
  277         ENFORCER.enforce_call(action='identity:create_trust')
  278         trust = self.request_body_json.get('trust', {})
  279         validation.lazy_validate(schema.trust_create, trust)
  280         self._check_unrestricted()
  281 
  282         if trust.get('project_id') and not trust.get('roles'):
  283             action = _('At least one role should be specified')
  284             raise exception.ForbiddenAction(action=action)
  285 
  286         if self.oslo_context.user_id != trust.get('trustor_user_id'):
  287             action = _("The authenticated user should match the trustor")
  288             raise exception.ForbiddenAction(action=action)
  289 
  290         # Ensure the trustee exists
  291         PROVIDERS.identity_api.get_user(trust['trustee_user_id'])
  292 
  293         # Normalize roles
  294         trust['roles'] = self._normalize_role_list(trust.get('roles', []))
  295         self._require_trustor_has_role_in_project(trust)
  296         trust['expires_at'] = self._parse_expiration_date(
  297             trust.get('expires_at'))
  298         trust = self._assign_unique_id(trust)
  299         redelegated_trust = self._find_redelegated_trust()
  300         return_trust = PROVIDERS.trust_api.create_trust(
  301             trust_id=trust['id'],
  302             trust=trust,
  303             roles=trust['roles'],
  304             redelegated_trust=redelegated_trust,
  305             initiator=self.audit_initiator)
  306         _normalize_trust_expires_at(return_trust)
  307         _normalize_trust_roles(return_trust)
  308         return self.wrap_member(return_trust), http.client.CREATED
  309 
  310     def delete(self, trust_id):
  311         ENFORCER.enforce_call(action='identity:delete_trust',
  312                               build_target=_build_trust_target_enforcement)
  313         self._check_unrestricted()
  314 
  315         # NOTE(cmurphy) As of Train, the default policies enforce the
  316         # identity:delete_trust rule. However, in case the
  317         # identity:delete_trust rule has been locally overridden by the
  318         # default that would have been produced by the sample config, we need
  319         # to enforce it again and warn that the behavior is changing.
  320         rules = policy._ENFORCER._enforcer.rules.get('identity:delete_trust')
  321         # rule check_str is ""
  322         if isinstance(rules, op_checks.TrueCheck):
  323             LOG.warning(
  324                 "The policy check string for rule \"identity:delete_trust\" "
  325                 "has been overridden to \"always true\". In the next release, "
  326                 "this will cause the" "\"identity:delete_trust\" action to "
  327                 "be fully permissive as hardcoded enforcement will be "
  328                 "removed. To correct this issue, either stop overriding the "
  329                 "\"identity:delete_trust\" rule in config to accept the "
  330                 "defaults, or explicitly set a rule that is not empty."
  331             )
  332             trust = PROVIDERS.trust_api.get_trust(trust_id)
  333             if (self.oslo_context.user_id != trust.get('trustor_user_id') and
  334                     not self.oslo_context.is_admin):
  335                 action = _('Only admin or trustor can delete a trust')
  336                 raise exception.ForbiddenAction(action=action)
  337         PROVIDERS.trust_api.delete_trust(trust_id,
  338                                          initiator=self.audit_initiator)
  339         return '', http.client.NO_CONTENT
  340 
  341 
  342 # NOTE(morgan): Since this Resource is not being used with the automatic
  343 # URL additions and does not have a collection key/member_key, we use
  344 # the flask-restful Resource, not the keystone ResourceBase
  345 class RolesForTrustListResource(flask_restful.Resource):
  346 
  347     @property
  348     def oslo_context(self):
  349         return flask.request.environ.get(context.REQUEST_CONTEXT_ENV, None)
  350 
  351     def get(self, trust_id):
  352         ENFORCER.enforce_call(action='identity:list_roles_for_trust',
  353                               build_target=_build_trust_target_enforcement)
  354 
  355         # NOTE(morgan): This duplicates a little of the .get_trust from the
  356         # main resource, as it needs some of the same logic. However, due to
  357         # how flask-restful works, this should be fully encapsulated
  358 
  359         if self.oslo_context.is_admin:
  360             # policies are not loaded for the is_admin context, so need to
  361             # block access here
  362             raise exception.ForbiddenAction(
  363                 action=_('Requested user has no relation to this trust'))
  364 
  365         trust = PROVIDERS.trust_api.get_trust(trust_id)
  366 
  367         # NOTE(cmurphy) As of Train, the default policies enforce the
  368         # identity:list_roles_for_trust rule. However, in case the
  369         # identity:list_roles_for_trust rule has been locally overridden by the
  370         # default that would have been produced by the sample config, we need
  371         # to enforce it again and warn that the behavior is changing.
  372         rules = policy._ENFORCER._enforcer.rules.get(
  373             'identity:list_roles_for_trust')
  374         # rule check_str is ""
  375         if isinstance(rules, op_checks.TrueCheck):
  376             LOG.warning(
  377                 "The policy check string for rule "
  378                 "\"identity:list_roles_for_trust\" has been overridden to "
  379                 "\"always true\". In the next release, this will cause the "
  380                 "\"identity:list_roles_for_trust\" action to be fully "
  381                 "permissive as hardcoded enforcement will be removed. To "
  382                 "correct this issue, either stop overriding the "
  383                 "\"identity:get_trust\" rule in config to accept the "
  384                 "defaults, or explicitly set a rule that is not empty."
  385             )
  386             _trustor_trustee_only(trust)
  387 
  388         _normalize_trust_expires_at(trust)
  389         _normalize_trust_roles(trust)
  390         return {'roles': trust['roles'],
  391                 'links': trust['roles_links']}
  392 
  393 
  394 # NOTE(morgan): Since this Resource is not being used with the automatic
  395 # URL additions and does not have a collection key/member_key, we use
  396 # the flask-restful Resource, not the keystone ResourceBase
  397 class RoleForTrustResource(flask_restful.Resource):
  398 
  399     @property
  400     def oslo_context(self):
  401         return flask.request.environ.get(context.REQUEST_CONTEXT_ENV, None)
  402 
  403     def get(self, trust_id, role_id):
  404         """Get a role that has been assigned to a trust."""
  405         ENFORCER.enforce_call(action='identity:get_role_for_trust',
  406                               build_target=_build_trust_target_enforcement)
  407 
  408         if self.oslo_context.is_admin:
  409             # policies are not loaded for the is_admin context, so need to
  410             # block access here
  411             raise exception.ForbiddenAction(
  412                 action=_('Requested user has no relation to this trust'))
  413 
  414         trust = PROVIDERS.trust_api.get_trust(trust_id)
  415 
  416         # NOTE(cmurphy) As of Train, the default policies enforce the
  417         # identity:get_role_for_trust rule. However, in case the
  418         # identity:get_role_for_trust rule has been locally overridden by the
  419         # default that would have been produced by the sample config, we need
  420         # to enforce it again and warn that the behavior is changing.
  421         rules = policy._ENFORCER._enforcer.rules.get(
  422             'identity:get_role_for_trust')
  423         # rule check_str is ""
  424         if isinstance(rules, op_checks.TrueCheck):
  425             LOG.warning(
  426                 "The policy check string for rule "
  427                 "\"identity:get_role_for_trust\" has been overridden to "
  428                 "\"always true\". In the next release, this will cause the "
  429                 "\"identity:get_role_for_trust\" action to be fully "
  430                 "permissive as hardcoded enforcement will be removed. To "
  431                 "correct this issue, either stop overriding the "
  432                 "\"identity:get_role_for_trust\" rule in config to accept the "
  433                 "defaults, or explicitly set a rule that is not empty."
  434             )
  435             _trustor_trustee_only(trust)
  436 
  437         if not any(role['id'] == role_id for role in trust['roles']):
  438             raise exception.RoleNotFound(role_id=role_id)
  439 
  440         role = PROVIDERS.role_api.get_role(role_id)
  441         return ks_flask.ResourceBase.wrap_member(role, collection_name='roles',
  442                                                  member_name='role')
  443 
  444 
  445 class TrustAPI(ks_flask.APIBase):
  446     _name = 'trusts'
  447     _import_name = __name__
  448     resources = [TrustResource]
  449     resource_mapping = [
  450         ks_flask.construct_resource_map(
  451             resource=RolesForTrustListResource,
  452             url='/trusts/<string:trust_id>/roles',
  453             resource_kwargs={},
  454             rel='trust_roles',
  455             path_vars={
  456                 'trust_id': TRUST_ID_PARAMETER_RELATION},
  457             resource_relation_func=_build_resource_relation),
  458         ks_flask.construct_resource_map(
  459             resource=RoleForTrustResource,
  460             url='/trusts/<string:trust_id>/roles/<string:role_id>',
  461             resource_kwargs={},
  462             rel='trust_role',
  463             path_vars={
  464                 'trust_id': TRUST_ID_PARAMETER_RELATION,
  465                 'role_id': json_home.Parameters.ROLE_ID},
  466             resource_relation_func=_build_resource_relation),
  467     ]
  468     _api_url_prefix = '/OS-TRUST'
  469 
  470 
  471 APIs = (TrustAPI,)