"Fossies" - the Fresh Open Source Software Archive

Member "keystone-18.0.0/keystone/credential/core.py" (14 Oct 2020, 10050 Bytes) of package /linux/misc/openstack/keystone-18.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "core.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 17.0.0_vs_18.0.0.

    1 # Copyright 2013 OpenStack Foundation
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 
   15 """Main entry point into the Credential service."""
   16 
   17 import json
   18 
   19 from keystone.common import cache
   20 from keystone.common import driver_hints
   21 from keystone.common import manager
   22 from keystone.common import provider_api
   23 import keystone.conf
   24 from keystone import exception
   25 from keystone import notifications
   26 
   27 
   28 CONF = keystone.conf.CONF
   29 MEMOIZE = cache.get_memoization_decorator(group='credential')
   30 PROVIDERS = provider_api.ProviderAPIs
   31 
   32 
   33 class Manager(manager.Manager):
   34     """Default pivot point for the Credential backend.
   35 
   36     See :mod:`keystone.common.manager.Manager` for more details on how this
   37     dynamically calls the backend.
   38 
   39     """
   40 
   41     driver_namespace = 'keystone.credential'
   42     _provides_api = 'credential_api'
   43 
   44     _CRED = 'credential'
   45 
   46     def __init__(self):
   47         super(Manager, self).__init__(CONF.credential.driver)
   48 
   49     def _decrypt_credential(self, credential):
   50         """Return a decrypted credential reference."""
   51         if credential['type'] == 'ec2':
   52             decrypted_blob = json.loads(
   53                 PROVIDERS.credential_provider_api.decrypt(
   54                     credential['encrypted_blob'],
   55                 )
   56             )
   57         else:
   58             decrypted_blob = PROVIDERS.credential_provider_api.decrypt(
   59                 credential['encrypted_blob']
   60             )
   61         credential['blob'] = decrypted_blob
   62         credential.pop('key_hash', None)
   63         credential.pop('encrypted_blob', None)
   64         return credential
   65 
   66     def _encrypt_credential(self, credential):
   67         """Return an encrypted credential reference."""
   68         credential_copy = credential.copy()
   69         if credential.get('type', None) == 'ec2':
   70             # NOTE(lbragstad): When dealing with ec2 credentials, it's possible
   71             # for the `blob` to be a dictionary. Let's make sure we are
   72             # encrypting a string otherwise encryption will fail.
   73             encrypted_blob, key_hash = (
   74                 PROVIDERS.credential_provider_api.encrypt(
   75                     json.dumps(credential['blob'])
   76                 )
   77             )
   78         else:
   79             encrypted_blob, key_hash = (
   80                 PROVIDERS.credential_provider_api.encrypt(
   81                     credential['blob']
   82                 )
   83             )
   84         credential_copy['encrypted_blob'] = encrypted_blob
   85         credential_copy['key_hash'] = key_hash
   86         credential_copy.pop('blob', None)
   87         return credential_copy
   88 
   89     def _assert_limit_not_exceeded(self, user_id):
   90         user_limit = CONF.credential.user_limit
   91         if user_limit >= 0:
   92             cred_count = len(self.list_credentials_for_user(user_id))
   93             if cred_count >= user_limit:
   94                 raise exception.CredentialLimitExceeded(
   95                     limit=user_limit)
   96 
   97     @manager.response_truncated
   98     def list_credentials(self, hints=None):
   99         credentials = self.driver.list_credentials(
  100             hints or driver_hints.Hints()
  101         )
  102         for credential in credentials:
  103             credential = self._decrypt_credential(credential)
  104         return credentials
  105 
  106     def list_credentials_for_user(self, user_id, type=None):
  107         credentials = self._list_credentials_for_user(user_id, type)
  108         for credential in credentials:
  109             credential = self._decrypt_credential(credential)
  110         return credentials
  111 
  112     @MEMOIZE
  113     def _list_credentials_for_user(self, user_id, type):
  114         """List credentials for a specific user."""
  115         return self.driver.list_credentials_for_user(user_id, type)
  116 
  117     def get_credential(self, credential_id):
  118         """Return a credential reference."""
  119         credential = self._get_credential(credential_id)
  120         return self._decrypt_credential(credential)
  121 
  122     @MEMOIZE
  123     def _get_credential(self, credential_id):
  124         return self.driver.get_credential(credential_id)
  125 
  126     def create_credential(self, credential_id, credential,
  127                           initiator=None):
  128         """Create a credential."""
  129         credential_copy = self._encrypt_credential(credential)
  130         user_id = credential_copy['user_id']
  131         self._assert_limit_not_exceeded(user_id)
  132         ref = self.driver.create_credential(credential_id, credential_copy)
  133         if MEMOIZE.should_cache(ref):
  134             self._get_credential.set(ref,
  135                                      credential_copy,
  136                                      credential_id)
  137             self._list_credentials_for_user.invalidate(self,
  138                                                        ref['user_id'],
  139                                                        ref['type'])
  140             self._list_credentials_for_user.invalidate(self,
  141                                                        ref['user_id'],
  142                                                        None)
  143         ref.pop('key_hash', None)
  144         ref.pop('encrypted_blob', None)
  145         ref['blob'] = credential['blob']
  146         notifications.Audit.created(
  147             self._CRED,
  148             credential_id,
  149             initiator)
  150         return ref
  151 
  152     def _validate_credential_update(self, credential_id, credential):
  153         # ec2 credentials require a "project_id" to be functional. Before we
  154         # update, check the case where a non-ec2 credential changes its type
  155         # to be "ec2", but has no associated "project_id", either in the
  156         # request or already set in the database
  157         if (credential.get('type', '').lower() == 'ec2' and
  158                 not credential.get('project_id')):
  159             existing_cred = self.get_credential(credential_id)
  160             if not existing_cred['project_id']:
  161                 raise exception.ValidationError(attribute='project_id',
  162                                                 target='credential')
  163 
  164     def update_credential(self, credential_id, credential):
  165         """Update an existing credential."""
  166         self._validate_credential_update(credential_id, credential)
  167         if 'blob' in credential:
  168             credential_copy = self._encrypt_credential(credential)
  169         else:
  170             credential_copy = credential.copy()
  171             existing_credential = self.get_credential(credential_id)
  172             existing_blob = existing_credential['blob']
  173         ref = self.driver.update_credential(credential_id, credential_copy)
  174         if MEMOIZE.should_cache(ref):
  175             self._get_credential.set(ref, self, credential_id)
  176             self._list_credentials_for_user.invalidate(self,
  177                                                        ref['user_id'],
  178                                                        ref['type'])
  179             self._list_credentials_for_user.invalidate(self,
  180                                                        ref['user_id'],
  181                                                        None)
  182         ref.pop('key_hash', None)
  183         ref.pop('encrypted_blob', None)
  184         # If the update request contains a `blob` attribute - we should return
  185         # that in the update response. If not, then we should return the
  186         # existing `blob` attribute since it wasn't updated.
  187         if credential.get('blob'):
  188             ref['blob'] = credential['blob']
  189         else:
  190             ref['blob'] = existing_blob
  191         return ref
  192 
  193     def delete_credential(self, credential_id,
  194                           initiator=None):
  195         """Delete a credential."""
  196         cred = self.get_credential(credential_id)
  197         self.driver.delete_credential(credential_id)
  198         self._get_credential.invalidate(self, credential_id)
  199         self._list_credentials_for_user.invalidate(self,
  200                                                    cred['user_id'],
  201                                                    cred['type'])
  202         self._list_credentials_for_user.invalidate(self,
  203                                                    cred['user_id'],
  204                                                    None)
  205         notifications.Audit.deleted(
  206             self._CRED, credential_id, initiator)
  207 
  208     def delete_credentials_for_project(self, project_id):
  209         """Delete all credentials for a project."""
  210         hints = driver_hints.Hints()
  211         hints.add_filter('project_id', project_id)
  212         creds = self.driver.list_credentials(hints)
  213 
  214         self.driver.delete_credentials_for_project(project_id)
  215         for cred in creds:
  216             self._get_credential.invalidate(self, cred['id'])
  217             self._list_credentials_for_user.invalidate(self,
  218                                                        cred['user_id'],
  219                                                        cred['type'])
  220             self._list_credentials_for_user.invalidate(self,
  221                                                        cred['user_id'],
  222                                                        None)
  223 
  224     def delete_credentials_for_user(self, user_id):
  225         """Delete all credentials for a user."""
  226         creds = self.driver.list_credentials_for_user(user_id)
  227         self.driver.delete_credentials_for_user(user_id)
  228         for cred in creds:
  229             self._get_credential.invalidate(self, cred['id'])
  230             self._list_credentials_for_user.invalidate(self,
  231                                                        user_id,
  232                                                        cred['type'])
  233             self._list_credentials_for_user.invalidate(self,
  234                                                        cred['user_id'],
  235                                                        None)