"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/models/token_model.py" (13 May 2020, 22156 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "token_model.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 16.0.1_vs_17.0.0.

    1 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 # not use this file except in compliance with the License. You may obtain
    3 # a copy of the License at
    4 #
    5 #      http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 # Unless required by applicable law or agreed to in writing, software
    8 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 # License for the specific language governing permissions and limitations
   11 # under the License.
   12 
   13 """Unified in-memory token model."""
   14 
   15 from oslo_log import log
   16 from oslo_serialization import jsonutils
   17 from oslo_serialization import msgpackutils
   18 from oslo_utils import reflection
   19 
   20 from keystone.common import cache
   21 from keystone.common import provider_api
   22 from keystone import exception
   23 from keystone.i18n import _
   24 
   25 LOG = log.getLogger(__name__)
   26 PROVIDERS = provider_api.ProviderAPIs
   27 
   28 # supported token versions
   29 V3 = 'v3.0'
   30 VERSIONS = frozenset([V3])
   31 
   32 # minimum access rules support
   33 ACCESS_RULES_MIN_VERSION = 1.0
   34 
   35 
   36 class TokenModel(object):
   37     """An object that represents a token emitted by keystone.
   38 
   39     This is a queryable object that other parts of keystone can use to reason
   40     about a user's authentication or authorization.
   41     """
   42 
   43     def __init__(self):
   44         self.user_id = None
   45         self.__user = None
   46         self.__user_domain = None
   47 
   48         self.methods = None
   49         self.audit_id = None
   50         self.parent_audit_id = None
   51 
   52         self.__expires_at = None
   53         self.__issued_at = None
   54 
   55         self.system = None
   56 
   57         self.domain_id = None
   58         self.__domain = None
   59 
   60         self.project_id = None
   61         self.__project = None
   62         self.__project_domain = None
   63 
   64         self.trust_id = None
   65         self.__trust = None
   66         self.__trustor = None
   67         self.__trustee = None
   68         self.__trust_project = None
   69         self.__trust_project_domain = None
   70 
   71         self.is_federated = False
   72         self.identity_provider_id = None
   73         self.protocol_id = None
   74         self.federated_groups = None
   75 
   76         self.access_token_id = None
   77         self.__access_token = None
   78 
   79         self.application_credential_id = None
   80         self.__application_credential = None
   81 
   82     def __repr__(self):
   83         """Return string representation of TokenModel."""
   84         desc = ('<%(type)s (audit_id=%(audit_id)s, '
   85                 'audit_chain_id=%(audit_ids)s) at %(loc)s>')
   86         self_cls_name = reflection.get_class_name(self, fully_qualified=False)
   87         return desc % {'type': self_cls_name,
   88                        'audit_id': self.audit_id,
   89                        'audit_ids': self.audit_ids,
   90                        'loc': hex(id(self))}
   91 
   92     @property
   93     def audit_ids(self):
   94         if self.parent_audit_id:
   95             return [self.audit_id, self.parent_audit_id]
   96         return [self.audit_id]
   97 
   98     @property
   99     def expires_at(self):
  100         return self.__expires_at
  101 
  102     @expires_at.setter
  103     def expires_at(self, value):
  104         if not isinstance(value, str):
  105             raise ValueError('expires_at must be a string.')
  106         self.__expires_at = value
  107 
  108     @property
  109     def issued_at(self):
  110         return self.__issued_at
  111 
  112     @issued_at.setter
  113     def issued_at(self, value):
  114         if not isinstance(value, str):
  115             raise ValueError('issued_at must be a string.')
  116         self.__issued_at = value
  117 
  118     @property
  119     def unscoped(self):
  120         return not any(
  121             [self.system_scoped, self.domain_scoped, self.project_scoped,
  122              self.trust_scoped]
  123         )
  124 
  125     @property
  126     def system_scoped(self):
  127         return self.system is not None
  128 
  129     @property
  130     def user(self):
  131         if not self.__user:
  132             if self.user_id:
  133                 self.__user = PROVIDERS.identity_api.get_user(self.user_id)
  134         return self.__user
  135 
  136     @property
  137     def user_domain(self):
  138         if not self.__user_domain:
  139             if self.user:
  140                 self.__user_domain = PROVIDERS.resource_api.get_domain(
  141                     self.user['domain_id']
  142                 )
  143         return self.__user_domain
  144 
  145     @property
  146     def domain(self):
  147         if not self.__domain:
  148             if self.domain_id:
  149                 self.__domain = PROVIDERS.resource_api.get_domain(
  150                     self.domain_id
  151                 )
  152         return self.__domain
  153 
  154     @property
  155     def domain_scoped(self):
  156         return self.domain_id is not None
  157 
  158     @property
  159     def project(self):
  160         if not self.__project:
  161             if self.project_id:
  162                 self.__project = PROVIDERS.resource_api.get_project(
  163                     self.project_id
  164                 )
  165         return self.__project
  166 
  167     @property
  168     def project_scoped(self):
  169         return self.project_id is not None
  170 
  171     @property
  172     def project_domain(self):
  173         if not self.__project_domain:
  174             if self.project and self.project.get('domain_id'):
  175                 self.__project_domain = PROVIDERS.resource_api.get_domain(
  176                     self.project['domain_id']
  177                 )
  178         return self.__project_domain
  179 
  180     @property
  181     def application_credential(self):
  182         if not self.__application_credential:
  183             if self.application_credential_id:
  184                 app_cred_api = PROVIDERS.application_credential_api
  185                 self.__application_credential = (
  186                     app_cred_api.get_application_credential(
  187                         self.application_credential_id
  188                     )
  189                 )
  190         return self.__application_credential
  191 
  192     @property
  193     def oauth_scoped(self):
  194         return self.access_token_id is not None
  195 
  196     @property
  197     def access_token(self):
  198         if not self.__access_token:
  199             if self.access_token_id:
  200                 self.__access_token = PROVIDERS.oauth_api.get_access_token(
  201                     self.access_token_id
  202                 )
  203         return self.__access_token
  204 
  205     @property
  206     def trust_scoped(self):
  207         return self.trust_id is not None
  208 
  209     @property
  210     def trust(self):
  211         if not self.__trust:
  212             if self.trust_id:
  213                 self.__trust = PROVIDERS.trust_api.get_trust(self.trust_id)
  214         return self.__trust
  215 
  216     @property
  217     def trustor(self):
  218         if not self.__trustor:
  219             if self.trust:
  220                 self.__trustor = PROVIDERS.identity_api.get_user(
  221                     self.trust['trustor_user_id']
  222                 )
  223         return self.__trustor
  224 
  225     @property
  226     def trustee(self):
  227         if not self.__trustee:
  228             if self.trust:
  229                 self.__trustee = PROVIDERS.identity_api.get_user(
  230                     self.trust['trustee_user_id']
  231                 )
  232         return self.__trustee
  233 
  234     @property
  235     def trust_project(self):
  236         if not self.__trust_project:
  237             if self.trust:
  238                 self.__trust_project = PROVIDERS.resource_api.get_project(
  239                     self.trust['project_id']
  240                 )
  241         return self.__trust_project
  242 
  243     @property
  244     def trust_project_domain(self):
  245         if not self.__trust_project_domain:
  246             if self.trust:
  247                 self.__trust_project_domain = (
  248                     PROVIDERS.resource_api.get_domain(
  249                         self.trust_project['domain_id']
  250                     )
  251                 )
  252         return self.__trust_project_domain
  253 
  254     def _get_system_roles(self):
  255         roles = []
  256         groups = PROVIDERS.identity_api.list_groups_for_user(self.user_id)
  257         all_group_roles = []
  258         assignments = []
  259         for group in groups:
  260             group_roles = (
  261                 PROVIDERS.assignment_api.list_system_grants_for_group(
  262                     group['id']
  263                 )
  264             )
  265             for role in group_roles:
  266                 all_group_roles.append(role)
  267                 assignment = {'group_id': group['id'], 'role_id': role['id']}
  268                 assignments.append(assignment)
  269         user_roles = PROVIDERS.assignment_api.list_system_grants_for_user(
  270             self.user_id
  271         )
  272         for role in user_roles:
  273             assignment = {'user_id': self.user_id, 'role_id': role['id']}
  274             assignments.append(assignment)
  275 
  276         # NOTE(lbragstad): The whole reason we need to build out a list of
  277         # "assignments" as opposed to just using the nice list of roles we
  278         # already have is because the add_implied_roles() method operates on a
  279         # list of assignment dictionaries (containing role_id,
  280         # user_id/group_id, project_id, et cetera). That method could probably
  281         # be fixed to be more clear by operating on actual roles instead of
  282         # just assignments.
  283         assignments = PROVIDERS.assignment_api.add_implied_roles(assignments)
  284         for assignment in assignments:
  285             role = PROVIDERS.role_api.get_role(assignment['role_id'])
  286             roles.append({'id': role['id'], 'name': role['name']})
  287 
  288         return roles
  289 
  290     def _get_trust_roles(self):
  291         roles = []
  292         # If redelegated_trust_id is set, then we must traverse the trust_chain
  293         # in order to determine who the original trustor is. We need to do this
  294         # because the user ID of the original trustor helps us determine scope
  295         # in the redelegated context.
  296         if self.trust.get('redelegated_trust_id'):
  297             trust_chain = PROVIDERS.trust_api.get_trust_pedigree(
  298                 self.trust_id
  299             )
  300             original_trustor_id = trust_chain[-1]['trustor_user_id']
  301         else:
  302             original_trustor_id = self.trustor['id']
  303 
  304         trust_roles = [
  305             {'role_id': role['id']} for role in self.trust['roles']
  306         ]
  307         effective_trust_roles = (
  308             PROVIDERS.assignment_api.add_implied_roles(trust_roles)
  309         )
  310         effective_trust_role_ids = (
  311             set([r['role_id'] for r in effective_trust_roles])
  312         )
  313 
  314         current_effective_trustor_roles = (
  315             PROVIDERS.assignment_api.get_roles_for_trustor_and_project(
  316                 original_trustor_id, self.trust.get('project_id')
  317             )
  318         )
  319 
  320         for trust_role_id in effective_trust_role_ids:
  321             if trust_role_id in current_effective_trustor_roles:
  322                 role = PROVIDERS.role_api.get_role(trust_role_id)
  323                 if role['domain_id'] is None:
  324                     roles.append(role)
  325             else:
  326                 raise exception.Forbidden(
  327                     _('Trustee has no delegated roles.'))
  328 
  329         return roles
  330 
  331     def _get_oauth_roles(self):
  332         roles = []
  333         access_token_roles = self.access_token['role_ids']
  334         access_token_roles = [
  335             {'role_id': r} for r in jsonutils.loads(access_token_roles)]
  336         effective_access_token_roles = (
  337             PROVIDERS.assignment_api.add_implied_roles(access_token_roles)
  338         )
  339         user_roles = [r['id'] for r in self._get_project_roles()]
  340         for role in effective_access_token_roles:
  341             if role['role_id'] in user_roles:
  342                 role = PROVIDERS.role_api.get_role(role['role_id'])
  343                 roles.append({'id': role['id'], 'name': role['name']})
  344         return roles
  345 
  346     def _get_federated_roles(self):
  347         roles = []
  348         group_ids = [group['id'] for group in self.federated_groups]
  349         federated_roles = PROVIDERS.assignment_api.get_roles_for_groups(
  350             group_ids, self.project_id, self.domain_id
  351         )
  352         for group_id in group_ids:
  353             group_roles = (
  354                 PROVIDERS.assignment_api.list_system_grants_for_group(
  355                     group_id
  356                 )
  357             )
  358             for role in group_roles:
  359                 federated_roles.append(role)
  360         user_roles = PROVIDERS.assignment_api.list_system_grants_for_user(
  361             self.user_id
  362         )
  363         for role in user_roles:
  364             federated_roles.append(role)
  365         if self.domain_id:
  366             domain_roles = (
  367                 PROVIDERS.assignment_api.get_roles_for_user_and_domain(
  368                     self.user_id, self.domain_id
  369                 )
  370             )
  371             for role in domain_roles:
  372                 federated_roles.append(role)
  373         if self.project_id:
  374             project_roles = (
  375                 PROVIDERS.assignment_api.get_roles_for_user_and_project(
  376                     self.user_id, self.project_id
  377                 )
  378             )
  379             for role in project_roles:
  380                 federated_roles.append(role)
  381         # NOTE(lbragstad): Remove duplicate role references from a list of
  382         # roles. It is often suggested that this be done with:
  383         #
  384         # roles = [dict(t) for t in set([tuple(d.items()) for d in roles])]
  385         #
  386         # But that doesn't actually remove duplicates in all cases and
  387         # causes transient failures because dictionaries are unordered
  388         # objects. This means {'id': 1, 'foo': 'bar'} and {'foo': 'bar',
  389         # 'id': 1} won't actually resolve to a single entity in the above
  390         # logic since they are both considered unique. By using `in` we're
  391         # performing a containment check, which also does a deep comparison
  392         # of the objects, which is what we want.
  393         for role in federated_roles:
  394             if not isinstance(role, dict):
  395                 role = PROVIDERS.role_api.get_role(role)
  396             if role not in roles:
  397                 roles.append(role)
  398 
  399         return roles
  400 
  401     def _get_domain_roles(self):
  402         roles = []
  403         domain_roles = (
  404             PROVIDERS.assignment_api.get_roles_for_user_and_domain(
  405                 self.user_id, self.domain_id
  406             )
  407         )
  408         for role_id in domain_roles:
  409             role = PROVIDERS.role_api.get_role(role_id)
  410             roles.append({'id': role['id'], 'name': role['name']})
  411 
  412         return roles
  413 
  414     def _get_project_roles(self):
  415         roles = []
  416         project_roles = (
  417             PROVIDERS.assignment_api.get_roles_for_user_and_project(
  418                 self.user_id, self.project_id
  419             )
  420         )
  421         for role_id in project_roles:
  422             r = PROVIDERS.role_api.get_role(role_id)
  423             roles.append({'id': r['id'], 'name': r['name']})
  424 
  425         return roles
  426 
  427     def _get_application_credential_roles(self):
  428         roles = []
  429         app_cred_roles = self.application_credential['roles']
  430         assignment_list = PROVIDERS.assignment_api.list_role_assignments(
  431             user_id=self.user_id,
  432             project_id=self.project_id,
  433             domain_id=self.domain_id,
  434             effective=True)
  435         user_roles = list(set([x['role_id'] for x in assignment_list]))
  436 
  437         for role in app_cred_roles:
  438             if role['id'] in user_roles:
  439                 roles.append({'id': role['id'], 'name': role['name']})
  440 
  441         return roles
  442 
  443     @property
  444     def roles(self):
  445         if self.system_scoped:
  446             roles = self._get_system_roles()
  447         elif self.trust_scoped:
  448             roles = self._get_trust_roles()
  449         elif self.oauth_scoped:
  450             roles = self._get_oauth_roles()
  451         elif self.is_federated and not self.unscoped:
  452             roles = self._get_federated_roles()
  453         elif self.domain_scoped:
  454             roles = self._get_domain_roles()
  455         elif self.application_credential_id and self.project_id:
  456             roles = self._get_application_credential_roles()
  457         elif self.project_scoped:
  458             roles = self._get_project_roles()
  459         else:
  460             roles = []
  461         return roles
  462 
  463     def _validate_token_resources(self):
  464         if self.project and not self.project.get('enabled'):
  465             msg = ('Unable to validate token because project %(id)s is '
  466                    'disabled') % {'id': self.project_id}
  467             tr_msg = _('Unable to validate token because project %(id)s is '
  468                        'disabled') % {'id': self.project_id}
  469             LOG.warning(msg)
  470             raise exception.ProjectNotFound(tr_msg)
  471         if self.project and not self.project_domain.get('enabled'):
  472             msg = ('Unable to validate token because domain %(id)s is '
  473                    'disabled') % {'id': self.project_domain['id']}
  474             tr_msg = _('Unable to validate token because domain %(id)s is '
  475                        'disabled') % {'id': self.project_domain['id']}
  476             LOG.warning(msg)
  477             raise exception.DomainNotFound(tr_msg)
  478 
  479     def _validate_token_user(self):
  480         if self.trust_scoped:
  481             if self.user_id != self.trustee['id']:
  482                 raise exception.Forbidden(_('User is not a trustee.'))
  483             try:
  484                 PROVIDERS.resource_api.assert_domain_enabled(
  485                     self.trustor['domain_id']
  486                 )
  487             except AssertionError:
  488                 raise exception.TokenNotFound(_('Trustor domain is disabled.'))
  489             try:
  490                 PROVIDERS.resource_api.assert_domain_enabled(
  491                     self.trustee['domain_id']
  492                 )
  493             except AssertionError:
  494                 raise exception.TokenNotFound(_('Trustee domain is disabled.'))
  495 
  496             try:
  497                 PROVIDERS.identity_api.assert_user_enabled(
  498                     self.trustor['id']
  499                 )
  500             except AssertionError:
  501                 raise exception.Forbidden(_('Trustor is disabled.'))
  502 
  503         if not self.user_domain.get('enabled'):
  504             msg = ('Unable to validate token because domain %(id)s is '
  505                    'disabled') % {'id': self.user_domain['id']}
  506             tr_msg = _('Unable to validate token because domain %(id)s is '
  507                        'disabled') % {'id': self.user_domain['id']}
  508             LOG.warning(msg)
  509             raise exception.DomainNotFound(tr_msg)
  510 
  511     def _validate_system_scope(self):
  512         if self.system_scoped and not self.roles:
  513             msg = ('User %(user_id)s has no access to the system'
  514                    ) % {'user_id': self.user_id}
  515             tr_msg = _('User %(user_id)s has no access to the system'
  516                        ) % {'user_id': self.user_id}
  517             LOG.debug(msg)
  518             raise exception.Unauthorized(tr_msg)
  519 
  520     def _validate_domain_scope(self):
  521         if self.domain_scoped and not self.roles:
  522             msg = (
  523                 'User %(user_id)s has no access to domain %(domain_id)s'
  524             ) % {'user_id': self.user_id, 'domain_id': self.domain_id}
  525             tr_msg = _(
  526                 'User %(user_id)s has no access to domain %(domain_id)s'
  527             ) % {'user_id': self.user_id, 'domain_id': self.domain_id}
  528             LOG.debug(msg)
  529             raise exception.Unauthorized(tr_msg)
  530 
  531     def _validate_project_scope(self):
  532         if self.project_scoped and not self.roles:
  533             msg = (
  534                 'User %(user_id)s has no access to project %(project_id)s'
  535             ) % {'user_id': self.user_id, 'project_id': self.project_id}
  536             tr_msg = _(
  537                 'User %(user_id)s has no access to project %(project_id)s'
  538             ) % {'user_id': self.user_id, 'project_id': self.project_id}
  539             LOG.debug(msg)
  540             raise exception.Unauthorized(tr_msg)
  541 
  542     def _validate_trust_scope(self):
  543         trust_roles = []
  544         if self.trust_id:
  545             refs = [{'role_id': role['id']} for role in self.trust['roles']]
  546             effective_trust_roles = PROVIDERS.assignment_api.add_implied_roles(
  547                 refs
  548             )
  549             effective_trust_role_ids = (
  550                 set([r['role_id'] for r in effective_trust_roles])
  551             )
  552             current_effective_trustor_roles = (
  553                 PROVIDERS.assignment_api.get_roles_for_trustor_and_project(
  554                     self.trustor['id'], self.trust.get('project_id')
  555                 )
  556             )
  557             # Go through each of the effective trust roles, making sure the
  558             # trustor still has them, if any have been removed, then we
  559             # will treat the trust as invalid
  560             for trust_role_id in effective_trust_role_ids:
  561                 if trust_role_id in current_effective_trustor_roles:
  562                     role = PROVIDERS.role_api.get_role(trust_role_id)
  563                     if role['domain_id'] is None:
  564                         trust_roles.append(role)
  565                 else:
  566                     raise exception.Forbidden(
  567                         _('Trustee has no delegated roles.'))
  568 
  569     def mint(self, token_id, issued_at):
  570         """Set the ``id`` and ``issued_at`` attributes of a token.
  571 
  572         The process of building a token requires setting attributes about the
  573         authentication and authorization context, like ``user_id`` and
  574         ``project_id`` for example. Once a Token object accurately represents
  575         this information it should be "minted". Tokens are minted when they get
  576         an ``id`` attribute and their creation time is recorded.
  577 
  578         """
  579         self._validate_token_resources()
  580         self._validate_token_user()
  581         self._validate_system_scope()
  582         self._validate_domain_scope()
  583         self._validate_project_scope()
  584         self._validate_trust_scope()
  585 
  586         self.id = token_id
  587         self.issued_at = issued_at
  588 
  589 
  590 class _TokenModelHandler(object):
  591     identity = 126
  592     handles = (TokenModel,)
  593 
  594     def __init__(self, registry):
  595         self._registry = registry
  596 
  597     def serialize(self, obj):
  598         serialized = msgpackutils.dumps(obj.__dict__, registry=self._registry)
  599         return serialized
  600 
  601     def deserialize(self, data):
  602         token_data = msgpackutils.loads(data, registry=self._registry)
  603         try:
  604             token_model = TokenModel()
  605             for k, v in iter(token_data.items()):
  606                 setattr(token_model, k, v)
  607         except Exception:
  608             LOG.debug(
  609                 "Failed to deserialize TokenModel. Data is %s", token_data
  610             )
  611             raise exception.CacheDeserializationError(
  612                 TokenModel.__name__, token_data
  613             )
  614         return token_model
  615 
  616 
  617 cache.register_model_handler(_TokenModelHandler)