"Fossies" - the Fresh Open Source Software Archive

Member "keystone-18.0.0/keystone/tests/unit/test_v3_credential.py" (14 Oct 2020, 41633 Bytes) of package /linux/misc/openstack/keystone-18.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_v3_credential.py": 17.0.0_vs_18.0.0.

    1 # Copyright 2013 OpenStack Foundation
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 
   15 import hashlib
   16 import json
   17 from unittest import mock
   18 import uuid
   19 
   20 import http.client
   21 from keystoneclient.contrib.ec2 import utils as ec2_utils
   22 from oslo_db import exception as oslo_db_exception
   23 from testtools import matchers
   24 import urllib
   25 
   26 from keystone.api import ec2tokens
   27 from keystone.common import provider_api
   28 from keystone.common import utils
   29 from keystone.credential.providers import fernet as credential_fernet
   30 from keystone import exception
   31 from keystone import oauth1
   32 from keystone.tests import unit
   33 from keystone.tests.unit import ksfixtures
   34 from keystone.tests.unit import test_v3
   35 
   36 
   37 PROVIDERS = provider_api.ProviderAPIs
   38 CRED_TYPE_EC2 = ec2tokens.CRED_TYPE_EC2
   39 
   40 
   41 class CredentialBaseTestCase(test_v3.RestfulTestCase):
   42 
   43     def setUp(self):
   44         super(CredentialBaseTestCase, self).setUp()
   45         self.useFixture(
   46             ksfixtures.KeyRepository(
   47                 self.config_fixture,
   48                 'credential',
   49                 credential_fernet.MAX_ACTIVE_KEYS
   50             )
   51         )
   52 
   53     def _create_dict_blob_credential(self):
   54         blob, credential = unit.new_ec2_credential(user_id=self.user['id'],
   55                                                    project_id=self.project_id)
   56 
   57         # Store the blob as a dict *not* JSON ref bug #1259584
   58         # This means we can test the dict->json workaround, added
   59         # as part of the bugfix for backwards compatibility works.
   60         credential['blob'] = blob
   61         credential_id = credential['id']
   62 
   63         # Create direct via the DB API to avoid validation failure
   64         PROVIDERS.credential_api.create_credential(credential_id, credential)
   65 
   66         return json.dumps(blob), credential_id
   67 
   68     def _test_get_token(self, access, secret):
   69         """Test signature validation with the access/secret provided."""
   70         signer = ec2_utils.Ec2Signer(secret)
   71         params = {'SignatureMethod': 'HmacSHA256',
   72                   'SignatureVersion': '2',
   73                   'AWSAccessKeyId': access}
   74         request = {'host': 'foo',
   75                    'verb': 'GET',
   76                    'path': '/bar',
   77                    'params': params}
   78         signature = signer.generate(request)
   79 
   80         # Now make a request to validate the signed dummy request via the
   81         # ec2tokens API.  This proves the v3 ec2 credentials actually work.
   82         sig_ref = {'access': access,
   83                    'signature': signature,
   84                    'host': 'foo',
   85                    'verb': 'GET',
   86                    'path': '/bar',
   87                    'params': params}
   88         r = self.post(
   89             '/ec2tokens',
   90             body={'ec2Credentials': sig_ref},
   91             expected_status=http.client.OK)
   92         self.assertValidTokenResponse(r)
   93         return r.result['token']
   94 
   95 
   96 class CredentialTestCase(CredentialBaseTestCase):
   97     """Test credential CRUD."""
   98 
   99     def setUp(self):
  100 
  101         super(CredentialTestCase, self).setUp()
  102 
  103         self.credential = unit.new_credential_ref(user_id=self.user['id'],
  104                                                   project_id=self.project_id)
  105 
  106         PROVIDERS.credential_api.create_credential(
  107             self.credential['id'],
  108             self.credential)
  109 
  110     def test_credential_api_delete_credentials_for_project(self):
  111         PROVIDERS.credential_api.delete_credentials_for_project(
  112             self.project_id
  113         )
  114         # Test that the credential that we created in .setUp no longer exists
  115         # once we delete all credentials for self.project_id
  116         self.assertRaises(exception.CredentialNotFound,
  117                           PROVIDERS.credential_api.get_credential,
  118                           credential_id=self.credential['id'])
  119 
  120     def test_credential_api_delete_credentials_for_user(self):
  121         PROVIDERS.credential_api.delete_credentials_for_user(self.user_id)
  122         # Test that the credential that we created in .setUp no longer exists
  123         # once we delete all credentials for self.user_id
  124         self.assertRaises(exception.CredentialNotFound,
  125                           PROVIDERS.credential_api.get_credential,
  126                           credential_id=self.credential['id'])
  127 
  128     def test_list_credentials(self):
  129         """Call ``GET /credentials``."""
  130         r = self.get('/credentials')
  131         self.assertValidCredentialListResponse(r, ref=self.credential)
  132 
  133     def test_list_credentials_filtered_by_user_id(self):
  134         """Call ``GET  /credentials?user_id={user_id}``."""
  135         credential = unit.new_credential_ref(user_id=uuid.uuid4().hex)
  136         PROVIDERS.credential_api.create_credential(
  137             credential['id'], credential
  138         )
  139 
  140         r = self.get('/credentials?user_id=%s' % self.user['id'])
  141         self.assertValidCredentialListResponse(r, ref=self.credential)
  142         for cred in r.result['credentials']:
  143             self.assertEqual(self.user['id'], cred['user_id'])
  144 
  145     def test_list_credentials_filtered_by_type(self):
  146         """Call ``GET  /credentials?type={type}``."""
  147         PROVIDERS.assignment_api.create_system_grant_for_user(
  148             self.user_id, self.role_id
  149         )
  150         token = self.get_system_scoped_token()
  151 
  152         # The type ec2 was chosen, instead of a random string,
  153         # because the type must be in the list of supported types
  154         ec2_credential = unit.new_credential_ref(user_id=uuid.uuid4().hex,
  155                                                  project_id=self.project_id,
  156                                                  type=CRED_TYPE_EC2)
  157 
  158         ec2_resp = PROVIDERS.credential_api.create_credential(
  159             ec2_credential['id'], ec2_credential)
  160 
  161         # The type cert was chosen for the same reason as ec2
  162         r = self.get('/credentials?type=cert', token=token)
  163 
  164         # Testing the filter for two different types
  165         self.assertValidCredentialListResponse(r, ref=self.credential)
  166         for cred in r.result['credentials']:
  167             self.assertEqual('cert', cred['type'])
  168 
  169         r_ec2 = self.get('/credentials?type=ec2', token=token)
  170         self.assertThat(r_ec2.result['credentials'], matchers.HasLength(1))
  171         cred_ec2 = r_ec2.result['credentials'][0]
  172 
  173         self.assertValidCredentialListResponse(r_ec2, ref=ec2_resp)
  174         self.assertEqual(CRED_TYPE_EC2, cred_ec2['type'])
  175         self.assertEqual(ec2_credential['id'], cred_ec2['id'])
  176 
  177     def test_list_credentials_filtered_by_type_and_user_id(self):
  178         """Call ``GET  /credentials?user_id={user_id}&type={type}``."""
  179         user1_id = uuid.uuid4().hex
  180         user2_id = uuid.uuid4().hex
  181 
  182         PROVIDERS.assignment_api.create_system_grant_for_user(
  183             self.user_id, self.role_id
  184         )
  185         token = self.get_system_scoped_token()
  186 
  187         # Creating credentials for two different users
  188         credential_user1_ec2 = unit.new_credential_ref(user_id=user1_id,
  189                                                        type=CRED_TYPE_EC2)
  190         credential_user1_cert = unit.new_credential_ref(user_id=user1_id)
  191         credential_user2_cert = unit.new_credential_ref(user_id=user2_id)
  192 
  193         PROVIDERS.credential_api.create_credential(
  194             credential_user1_ec2['id'], credential_user1_ec2)
  195         PROVIDERS.credential_api.create_credential(
  196             credential_user1_cert['id'], credential_user1_cert)
  197         PROVIDERS.credential_api.create_credential(
  198             credential_user2_cert['id'], credential_user2_cert)
  199 
  200         r = self.get(
  201             '/credentials?user_id=%s&type=ec2' % user1_id, token=token
  202         )
  203         self.assertValidCredentialListResponse(r, ref=credential_user1_ec2)
  204         self.assertThat(r.result['credentials'], matchers.HasLength(1))
  205         cred = r.result['credentials'][0]
  206         self.assertEqual(CRED_TYPE_EC2, cred['type'])
  207         self.assertEqual(user1_id, cred['user_id'])
  208 
  209     def test_create_credential(self):
  210         """Call ``POST /credentials``."""
  211         ref = unit.new_credential_ref(user_id=self.user['id'])
  212         r = self.post(
  213             '/credentials',
  214             body={'credential': ref})
  215         self.assertValidCredentialResponse(r, ref)
  216 
  217     def test_get_credential(self):
  218         """Call ``GET /credentials/{credential_id}``."""
  219         r = self.get(
  220             '/credentials/%(credential_id)s' % {
  221                 'credential_id': self.credential['id']})
  222         self.assertValidCredentialResponse(r, self.credential)
  223 
  224     def test_update_credential(self):
  225         """Call ``PATCH /credentials/{credential_id}``."""
  226         ref = unit.new_credential_ref(user_id=self.user['id'],
  227                                       project_id=self.project_id)
  228         del ref['id']
  229         r = self.patch(
  230             '/credentials/%(credential_id)s' % {
  231                 'credential_id': self.credential['id']},
  232             body={'credential': ref})
  233         self.assertValidCredentialResponse(r, ref)
  234 
  235     def test_update_credential_to_ec2_type(self):
  236         """Call ``PATCH /credentials/{credential_id}``."""
  237         # Create a credential without providing a project_id
  238         ref = unit.new_credential_ref(user_id=self.user['id'])
  239         r = self.post(
  240             '/credentials',
  241             body={'credential': ref})
  242         self.assertValidCredentialResponse(r, ref)
  243         credential_id = r.result.get('credential')['id']
  244 
  245         # Updating the credential to ec2 requires a project_id
  246         update_ref = {'type': 'ec2', 'project_id': self.project_id}
  247         self.patch(
  248             '/credentials/%(credential_id)s' % {
  249                 'credential_id': credential_id},
  250             body={'credential': update_ref})
  251 
  252     def test_update_credential_to_ec2_missing_project_id(self):
  253         """Call ``PATCH /credentials/{credential_id}``."""
  254         # Create a credential without providing a project_id
  255         ref = unit.new_credential_ref(user_id=self.user['id'])
  256         r = self.post(
  257             '/credentials',
  258             body={'credential': ref})
  259         self.assertValidCredentialResponse(r, ref)
  260         credential_id = r.result.get('credential')['id']
  261 
  262         # Updating such credential to ec2 type without providing a project_id
  263         # will fail
  264         update_ref = {'type': 'ec2'}
  265         self.patch(
  266             '/credentials/%(credential_id)s' % {
  267                 'credential_id': credential_id},
  268             body={'credential': update_ref},
  269             expected_status=http.client.BAD_REQUEST)
  270 
  271     def test_update_credential_to_ec2_with_previously_set_project_id(self):
  272         """Call ``PATCH /credentials/{credential_id}``."""
  273         # Create a credential providing a project_id
  274         ref = unit.new_credential_ref(user_id=self.user['id'],
  275                                       project_id=self.project_id)
  276         r = self.post(
  277             '/credentials',
  278             body={'credential': ref})
  279         self.assertValidCredentialResponse(r, ref)
  280         credential_id = r.result.get('credential')['id']
  281 
  282         # Since the created credential above already has a project_id, the
  283         # update request will not fail
  284         update_ref = {'type': 'ec2'}
  285         self.patch(
  286             '/credentials/%(credential_id)s' % {
  287                 'credential_id': credential_id},
  288             body={'credential': update_ref})
  289 
  290     def test_update_credential_non_owner(self):
  291         """Call ``PATCH /credentials/{credential_id}``."""
  292         alt_user = unit.create_user(
  293             PROVIDERS.identity_api, domain_id=self.domain_id)
  294         alt_user_id = alt_user['id']
  295         alt_project = unit.new_project_ref(domain_id=self.domain_id)
  296         alt_project_id = alt_project['id']
  297         PROVIDERS.resource_api.create_project(
  298             alt_project['id'], alt_project)
  299         alt_role = unit.new_role_ref(name='reader')
  300         alt_role_id = alt_role['id']
  301         PROVIDERS.role_api.create_role(alt_role_id, alt_role)
  302         PROVIDERS.assignment_api.add_role_to_user_and_project(
  303             alt_user_id, alt_project_id, alt_role_id)
  304         auth = self.build_authentication_request(
  305             user_id=alt_user_id,
  306             password=alt_user['password'],
  307             project_id=alt_project_id)
  308         ref = unit.new_credential_ref(user_id=alt_user_id,
  309                                       project_id=alt_project_id)
  310         r = self.post(
  311             '/credentials',
  312             auth=auth,
  313             body={'credential': ref})
  314         self.assertValidCredentialResponse(r, ref)
  315         credential_id = r.result.get('credential')['id']
  316 
  317         # Cannot change the credential to be owned by another user
  318         update_ref = {'user_id': self.user_id, 'project_id': self.project_id}
  319         self.patch(
  320             '/credentials/%(credential_id)s' % {
  321                 'credential_id': credential_id},
  322             expected_status=403,
  323             auth=auth,
  324             body={'credential': update_ref})
  325 
  326     def test_update_ec2_credential_change_trust_id(self):
  327         """Call ``PATCH /credentials/{credential_id}``."""
  328         blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
  329                                             project_id=self.project_id)
  330         blob['trust_id'] = uuid.uuid4().hex
  331         ref['blob'] = json.dumps(blob)
  332         r = self.post(
  333             '/credentials',
  334             body={'credential': ref})
  335         self.assertValidCredentialResponse(r, ref)
  336         credential_id = r.result.get('credential')['id']
  337         # Try changing to a different trust
  338         blob['trust_id'] = uuid.uuid4().hex
  339         update_ref = {'blob': json.dumps(blob)}
  340         self.patch(
  341             '/credentials/%(credential_id)s' % {
  342                 'credential_id': credential_id},
  343             body={'credential': update_ref},
  344             expected_status=http.client.BAD_REQUEST)
  345         # Try removing the trust
  346         del blob['trust_id']
  347         update_ref = {'blob': json.dumps(blob)}
  348         self.patch(
  349             '/credentials/%(credential_id)s' % {
  350                 'credential_id': credential_id},
  351             body={'credential': update_ref},
  352             expected_status=http.client.BAD_REQUEST)
  353 
  354     def test_update_ec2_credential_change_app_cred_id(self):
  355         """Call ``PATCH /credentials/{credential_id}``."""
  356         blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
  357                                             project_id=self.project_id)
  358         blob['app_cred_id'] = uuid.uuid4().hex
  359         ref['blob'] = json.dumps(blob)
  360         r = self.post(
  361             '/credentials',
  362             body={'credential': ref})
  363         self.assertValidCredentialResponse(r, ref)
  364         credential_id = r.result.get('credential')['id']
  365         # Try changing to a different app cred
  366         blob['app_cred_id'] = uuid.uuid4().hex
  367         update_ref = {'blob': json.dumps(blob)}
  368         self.patch(
  369             '/credentials/%(credential_id)s' % {
  370                 'credential_id': credential_id},
  371             body={'credential': update_ref},
  372             expected_status=http.client.BAD_REQUEST)
  373         # Try removing the app cred
  374         del blob['app_cred_id']
  375         update_ref = {'blob': json.dumps(blob)}
  376         self.patch(
  377             '/credentials/%(credential_id)s' % {
  378                 'credential_id': credential_id},
  379             body={'credential': update_ref},
  380             expected_status=http.client.BAD_REQUEST)
  381 
  382     def test_update_ec2_credential_change_access_token_id(self):
  383         """Call ``PATCH /credentials/{credential_id}``."""
  384         blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
  385                                             project_id=self.project_id)
  386         blob['access_token_id'] = uuid.uuid4().hex
  387         ref['blob'] = json.dumps(blob)
  388         r = self.post(
  389             '/credentials',
  390             body={'credential': ref})
  391         self.assertValidCredentialResponse(r, ref)
  392         credential_id = r.result.get('credential')['id']
  393         # Try changing to a different access token
  394         blob['access_token_id'] = uuid.uuid4().hex
  395         update_ref = {'blob': json.dumps(blob)}
  396         self.patch(
  397             '/credentials/%(credential_id)s' % {
  398                 'credential_id': credential_id},
  399             body={'credential': update_ref},
  400             expected_status=http.client.BAD_REQUEST)
  401         # Try removing the access token
  402         del blob['access_token_id']
  403         update_ref = {'blob': json.dumps(blob)}
  404         self.patch(
  405             '/credentials/%(credential_id)s' % {
  406                 'credential_id': credential_id},
  407             body={'credential': update_ref},
  408             expected_status=http.client.BAD_REQUEST)
  409 
  410     def test_update_ec2_credential_change_access_id(self):
  411         """Call ``PATCH /credentials/{credential_id}``."""
  412         blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
  413                                             project_id=self.project_id)
  414         blob['access_id'] = uuid.uuid4().hex
  415         ref['blob'] = json.dumps(blob)
  416         r = self.post(
  417             '/credentials',
  418             body={'credential': ref})
  419         self.assertValidCredentialResponse(r, ref)
  420         credential_id = r.result.get('credential')['id']
  421         # Try changing to a different access_id
  422         blob['access_id'] = uuid.uuid4().hex
  423         update_ref = {'blob': json.dumps(blob)}
  424         self.patch(
  425             '/credentials/%(credential_id)s' % {
  426                 'credential_id': credential_id},
  427             body={'credential': update_ref},
  428             expected_status=http.client.BAD_REQUEST)
  429         # Try removing the access_id
  430         del blob['access_id']
  431         update_ref = {'blob': json.dumps(blob)}
  432         self.patch(
  433             '/credentials/%(credential_id)s' % {
  434                 'credential_id': credential_id},
  435             body={'credential': update_ref},
  436             expected_status=http.client.BAD_REQUEST)
  437 
  438     def test_delete_credential(self):
  439         """Call ``DELETE /credentials/{credential_id}``."""
  440         self.delete(
  441             '/credentials/%(credential_id)s' % {
  442                 'credential_id': self.credential['id']})
  443 
  444     def test_delete_credential_retries_on_deadlock(self):
  445         patcher = mock.patch('sqlalchemy.orm.query.Query.delete',
  446                              autospec=True)
  447 
  448         class FakeDeadlock(object):
  449             def __init__(self, mock_patcher):
  450                 self.deadlock_count = 2
  451                 self.mock_patcher = mock_patcher
  452                 self.patched = True
  453 
  454             def __call__(self, *args, **kwargs):
  455                 if self.deadlock_count > 1:
  456                     self.deadlock_count -= 1
  457                 else:
  458                     self.mock_patcher.stop()
  459                     self.patched = False
  460                 raise oslo_db_exception.DBDeadlock
  461 
  462         sql_delete_mock = patcher.start()
  463         side_effect = FakeDeadlock(patcher)
  464         sql_delete_mock.side_effect = side_effect
  465 
  466         try:
  467             PROVIDERS.credential_api.delete_credentials_for_user(
  468                 user_id=self.user['id'])
  469         finally:
  470             if side_effect.patched:
  471                 patcher.stop()
  472 
  473         # initial attempt + 1 retry
  474         self.assertEqual(sql_delete_mock.call_count, 2)
  475 
  476     def test_create_ec2_credential(self):
  477         """Call ``POST /credentials`` for creating ec2 credential."""
  478         blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
  479                                             project_id=self.project_id)
  480         r = self.post('/credentials', body={'credential': ref})
  481         self.assertValidCredentialResponse(r, ref)
  482         # Assert credential id is same as hash of access key id for
  483         # ec2 credentials
  484         access = blob['access'].encode('utf-8')
  485         self.assertEqual(hashlib.sha256(access).hexdigest(),
  486                          r.result['credential']['id'])
  487         # Create second ec2 credential with the same access key id and check
  488         # for conflict.
  489         self.post(
  490             '/credentials',
  491             body={'credential': ref}, expected_status=http.client.CONFLICT)
  492 
  493     def test_get_ec2_dict_blob(self):
  494         """Ensure non-JSON blob data is correctly converted."""
  495         expected_blob, credential_id = self._create_dict_blob_credential()
  496 
  497         r = self.get(
  498             '/credentials/%(credential_id)s' % {
  499                 'credential_id': credential_id})
  500 
  501         # use json.loads to transform the blobs back into Python dictionaries
  502         # to avoid problems with the keys being in different orders.
  503         self.assertEqual(json.loads(expected_blob),
  504                          json.loads(r.result['credential']['blob']))
  505 
  506     def test_list_ec2_dict_blob(self):
  507         """Ensure non-JSON blob data is correctly converted."""
  508         expected_blob, credential_id = self._create_dict_blob_credential()
  509 
  510         list_r = self.get('/credentials')
  511         list_creds = list_r.result['credentials']
  512         list_ids = [r['id'] for r in list_creds]
  513         self.assertIn(credential_id, list_ids)
  514         # use json.loads to transform the blobs back into Python dictionaries
  515         # to avoid problems with the keys being in different orders.
  516         for r in list_creds:
  517             if r['id'] == credential_id:
  518                 self.assertEqual(json.loads(expected_blob),
  519                                  json.loads(r['blob']))
  520 
  521     def test_create_non_ec2_credential(self):
  522         """Test creating non-ec2 credential.
  523 
  524         Call ``POST /credentials``.
  525         """
  526         blob, ref = unit.new_cert_credential(user_id=self.user['id'])
  527 
  528         r = self.post('/credentials', body={'credential': ref})
  529         self.assertValidCredentialResponse(r, ref)
  530         # Assert credential id is not same as hash of access key id for
  531         # non-ec2 credentials
  532         access = blob['access'].encode('utf-8')
  533         self.assertNotEqual(hashlib.sha256(access).hexdigest(),
  534                             r.result['credential']['id'])
  535 
  536     def test_create_ec2_credential_with_missing_project_id(self):
  537         """Test Creating ec2 credential with missing project_id.
  538 
  539         Call ``POST /credentials``.
  540         """
  541         _, ref = unit.new_ec2_credential(user_id=self.user['id'],
  542                                          project_id=None)
  543         # Assert bad request status when missing project_id
  544         self.post(
  545             '/credentials',
  546             body={'credential': ref}, expected_status=http.client.BAD_REQUEST)
  547 
  548     def test_create_ec2_credential_with_invalid_blob(self):
  549         """Test creating ec2 credential with invalid blob.
  550 
  551         Call ``POST /credentials``.
  552         """
  553         ref = unit.new_credential_ref(user_id=self.user['id'],
  554                                       project_id=self.project_id,
  555                                       blob='{"abc":"def"d}',
  556                                       type=CRED_TYPE_EC2)
  557         # Assert bad request status when request contains invalid blob
  558         response = self.post(
  559             '/credentials',
  560             body={'credential': ref}, expected_status=http.client.BAD_REQUEST)
  561         self.assertValidErrorResponse(response)
  562 
  563     def test_create_credential_with_admin_token(self):
  564         # Make sure we can create credential with the static admin token
  565         ref = unit.new_credential_ref(user_id=self.user['id'])
  566         r = self.post(
  567             '/credentials',
  568             body={'credential': ref},
  569             token=self.get_admin_token())
  570         self.assertValidCredentialResponse(r, ref)
  571 
  572 
  573 class TestCredentialTrustScoped(CredentialBaseTestCase):
  574     """Test credential with trust scoped token."""
  575 
  576     def setUp(self):
  577         super(TestCredentialTrustScoped, self).setUp()
  578 
  579         self.trustee_user = unit.new_user_ref(domain_id=self.domain_id)
  580         password = self.trustee_user['password']
  581         self.trustee_user = PROVIDERS.identity_api.create_user(
  582             self.trustee_user
  583         )
  584         self.trustee_user['password'] = password
  585         self.trustee_user_id = self.trustee_user['id']
  586         self.useFixture(
  587             ksfixtures.KeyRepository(
  588                 self.config_fixture,
  589                 'credential',
  590                 credential_fernet.MAX_ACTIVE_KEYS
  591             )
  592         )
  593 
  594     def config_overrides(self):
  595         super(TestCredentialTrustScoped, self).config_overrides()
  596         self.config_fixture.config(group='trust')
  597 
  598     def test_trust_scoped_ec2_credential(self):
  599         """Test creating trust scoped ec2 credential.
  600 
  601         Call ``POST /credentials``.
  602         """
  603         # Create the trust
  604         ref = unit.new_trust_ref(
  605             trustor_user_id=self.user_id,
  606             trustee_user_id=self.trustee_user_id,
  607             project_id=self.project_id,
  608             impersonation=True,
  609             expires=dict(minutes=1),
  610             role_ids=[self.role_id])
  611         del ref['id']
  612         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
  613         trust = self.assertValidTrustResponse(r)
  614 
  615         # Get a trust scoped token
  616         auth_data = self.build_authentication_request(
  617             user_id=self.trustee_user['id'],
  618             password=self.trustee_user['password'],
  619             trust_id=trust['id'])
  620         r = self.v3_create_token(auth_data)
  621         self.assertValidProjectScopedTokenResponse(r, self.user)
  622         trust_id = r.result['token']['OS-TRUST:trust']['id']
  623         token_id = r.headers.get('X-Subject-Token')
  624 
  625         # Create the credential with the trust scoped token
  626         blob, ref = unit.new_ec2_credential(user_id=self.user_id,
  627                                             project_id=self.project_id)
  628         r = self.post('/credentials', body={'credential': ref}, token=token_id)
  629 
  630         # We expect the response blob to contain the trust_id
  631         ret_ref = ref.copy()
  632         ret_blob = blob.copy()
  633         ret_blob['trust_id'] = trust_id
  634         ret_ref['blob'] = json.dumps(ret_blob)
  635         self.assertValidCredentialResponse(r, ref=ret_ref)
  636 
  637         # Assert credential id is same as hash of access key id for
  638         # ec2 credentials
  639         access = blob['access'].encode('utf-8')
  640         self.assertEqual(hashlib.sha256(access).hexdigest(),
  641                          r.result['credential']['id'])
  642 
  643         # Create a role assignment to ensure that it is ignored and only the
  644         # trust-delegated roles are used
  645         role = unit.new_role_ref(name='reader')
  646         role_id = role['id']
  647         PROVIDERS.role_api.create_role(role_id, role)
  648         PROVIDERS.assignment_api.add_role_to_user_and_project(
  649             self.user_id, self.project_id, role_id)
  650 
  651         ret_blob = json.loads(r.result['credential']['blob'])
  652         ec2token = self._test_get_token(
  653             access=ret_blob['access'], secret=ret_blob['secret'])
  654         ec2_roles = [role['id'] for role in ec2token['roles']]
  655         self.assertIn(self.role_id, ec2_roles)
  656         self.assertNotIn(role_id, ec2_roles)
  657 
  658         # Create second ec2 credential with the same access key id and check
  659         # for conflict.
  660         self.post(
  661             '/credentials',
  662             body={'credential': ref},
  663             token=token_id,
  664             expected_status=http.client.CONFLICT)
  665 
  666 
  667 class TestCredentialAppCreds(CredentialBaseTestCase):
  668     """Test credential with application credential token."""
  669 
  670     def setUp(self):
  671         super(TestCredentialAppCreds, self).setUp()
  672         self.useFixture(
  673             ksfixtures.KeyRepository(
  674                 self.config_fixture,
  675                 'credential',
  676                 credential_fernet.MAX_ACTIVE_KEYS
  677             )
  678         )
  679 
  680     def test_app_cred_ec2_credential(self):
  681         """Test creating ec2 credential from an application credential.
  682 
  683         Call ``POST /credentials``.
  684         """
  685         # Create the app cred
  686         ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}])
  687         del ref['id']
  688         r = self.post('/users/%s/application_credentials' % self.user_id,
  689                       body={'application_credential': ref})
  690         app_cred = r.result['application_credential']
  691 
  692         # Get an application credential token
  693         auth_data = self.build_authentication_request(
  694             app_cred_id=app_cred['id'],
  695             secret=app_cred['secret'])
  696         r = self.v3_create_token(auth_data)
  697         token_id = r.headers.get('X-Subject-Token')
  698 
  699         # Create the credential with the app cred token
  700         blob, ref = unit.new_ec2_credential(user_id=self.user_id,
  701                                             project_id=self.project_id)
  702         r = self.post('/credentials', body={'credential': ref}, token=token_id)
  703 
  704         # We expect the response blob to contain the app_cred_id
  705         ret_ref = ref.copy()
  706         ret_blob = blob.copy()
  707         ret_blob['app_cred_id'] = app_cred['id']
  708         ret_ref['blob'] = json.dumps(ret_blob)
  709         self.assertValidCredentialResponse(r, ref=ret_ref)
  710 
  711         # Assert credential id is same as hash of access key id for
  712         # ec2 credentials
  713         access = blob['access'].encode('utf-8')
  714         self.assertEqual(hashlib.sha256(access).hexdigest(),
  715                          r.result['credential']['id'])
  716 
  717         # Create a role assignment to ensure that it is ignored and only the
  718         # roles in the app cred are used
  719         role = unit.new_role_ref(name='reader')
  720         role_id = role['id']
  721         PROVIDERS.role_api.create_role(role_id, role)
  722         PROVIDERS.assignment_api.add_role_to_user_and_project(
  723             self.user_id, self.project_id, role_id)
  724 
  725         ret_blob = json.loads(r.result['credential']['blob'])
  726         ec2token = self._test_get_token(
  727             access=ret_blob['access'], secret=ret_blob['secret'])
  728         ec2_roles = [role['id'] for role in ec2token['roles']]
  729         self.assertIn(self.role_id, ec2_roles)
  730         self.assertNotIn(role_id, ec2_roles)
  731 
  732         # Create second ec2 credential with the same access key id and check
  733         # for conflict.
  734         self.post(
  735             '/credentials',
  736             body={'credential': ref},
  737             token=token_id,
  738             expected_status=http.client.CONFLICT)
  739 
  740 
  741 class TestCredentialAccessToken(CredentialBaseTestCase):
  742     """Test credential with access token."""
  743 
  744     def setUp(self):
  745         super(TestCredentialAccessToken, self).setUp()
  746         self.useFixture(
  747             ksfixtures.KeyRepository(
  748                 self.config_fixture,
  749                 'credential',
  750                 credential_fernet.MAX_ACTIVE_KEYS
  751             )
  752         )
  753         self.base_url = 'http://localhost/v3'
  754 
  755     def _urllib_parse_qs_text_keys(self, content):
  756         results = urllib.parse.parse_qs(content)
  757         return {key.decode('utf-8'): value for key, value in results.items()}
  758 
  759     def _create_single_consumer(self):
  760         endpoint = '/OS-OAUTH1/consumers'
  761 
  762         ref = {'description': uuid.uuid4().hex}
  763         resp = self.post(endpoint, body={'consumer': ref})
  764         return resp.result['consumer']
  765 
  766     def _create_request_token(self, consumer, project_id, base_url=None):
  767         endpoint = '/OS-OAUTH1/request_token'
  768         client = oauth1.Client(consumer['key'],
  769                                client_secret=consumer['secret'],
  770                                signature_method=oauth1.SIG_HMAC,
  771                                callback_uri="oob")
  772         headers = {'requested_project_id': project_id}
  773         if not base_url:
  774             base_url = self.base_url
  775         url, headers, body = client.sign(base_url + endpoint,
  776                                          http_method='POST',
  777                                          headers=headers)
  778         return endpoint, headers
  779 
  780     def _create_access_token(self, consumer, token, base_url=None):
  781         endpoint = '/OS-OAUTH1/access_token'
  782         client = oauth1.Client(consumer['key'],
  783                                client_secret=consumer['secret'],
  784                                resource_owner_key=token.key,
  785                                resource_owner_secret=token.secret,
  786                                signature_method=oauth1.SIG_HMAC,
  787                                verifier=token.verifier)
  788         if not base_url:
  789             base_url = self.base_url
  790         url, headers, body = client.sign(base_url + endpoint,
  791                                          http_method='POST')
  792         headers.update({'Content-Type': 'application/json'})
  793         return endpoint, headers
  794 
  795     def _get_oauth_token(self, consumer, token):
  796         client = oauth1.Client(consumer['key'],
  797                                client_secret=consumer['secret'],
  798                                resource_owner_key=token.key,
  799                                resource_owner_secret=token.secret,
  800                                signature_method=oauth1.SIG_HMAC)
  801         endpoint = '/auth/tokens'
  802         url, headers, body = client.sign(self.base_url + endpoint,
  803                                          http_method='POST')
  804         headers.update({'Content-Type': 'application/json'})
  805         ref = {'auth': {'identity': {'oauth1': {}, 'methods': ['oauth1']}}}
  806         return endpoint, headers, ref
  807 
  808     def _authorize_request_token(self, request_id):
  809         if isinstance(request_id, bytes):
  810             request_id = request_id.decode()
  811         return '/OS-OAUTH1/authorize/%s' % (request_id)
  812 
  813     def _get_access_token(self):
  814         consumer = self._create_single_consumer()
  815         consumer_id = consumer['id']
  816         consumer_secret = consumer['secret']
  817         consumer = {'key': consumer_id, 'secret': consumer_secret}
  818 
  819         url, headers = self._create_request_token(consumer, self.project_id)
  820         content = self.post(
  821             url, headers=headers,
  822             response_content_type='application/x-www-form-urlencoded')
  823         credentials = self._urllib_parse_qs_text_keys(content.result)
  824         request_key = credentials['oauth_token'][0]
  825         request_secret = credentials['oauth_token_secret'][0]
  826         request_token = oauth1.Token(request_key, request_secret)
  827 
  828         url = self._authorize_request_token(request_key)
  829         body = {'roles': [{'id': self.role_id}]}
  830         resp = self.put(url, body=body, expected_status=http.client.OK)
  831         verifier = resp.result['token']['oauth_verifier']
  832 
  833         request_token.set_verifier(verifier)
  834         url, headers = self._create_access_token(consumer, request_token)
  835         content = self.post(
  836             url, headers=headers,
  837             response_content_type='application/x-www-form-urlencoded')
  838         credentials = self._urllib_parse_qs_text_keys(content.result)
  839         access_key = credentials['oauth_token'][0]
  840         access_secret = credentials['oauth_token_secret'][0]
  841         access_token = oauth1.Token(access_key, access_secret)
  842 
  843         url, headers, body = self._get_oauth_token(consumer, access_token)
  844         content = self.post(url, headers=headers, body=body)
  845         return access_key, content.headers['X-Subject-Token']
  846 
  847     def test_access_token_ec2_credential(self):
  848         """Test creating ec2 credential from an oauth access token.
  849 
  850         Call ``POST /credentials``.
  851         """
  852         access_key, token_id = self._get_access_token()
  853 
  854         # Create the credential with the access token
  855         blob, ref = unit.new_ec2_credential(user_id=self.user_id,
  856                                             project_id=self.project_id)
  857         r = self.post('/credentials', body={'credential': ref}, token=token_id)
  858 
  859         # We expect the response blob to contain the access_token_id
  860         ret_ref = ref.copy()
  861         ret_blob = blob.copy()
  862         ret_blob['access_token_id'] = access_key.decode('utf-8')
  863         ret_ref['blob'] = json.dumps(ret_blob)
  864         self.assertValidCredentialResponse(r, ref=ret_ref)
  865 
  866         # Assert credential id is same as hash of access key id for
  867         # ec2 credentials
  868         access = blob['access'].encode('utf-8')
  869         self.assertEqual(hashlib.sha256(access).hexdigest(),
  870                          r.result['credential']['id'])
  871 
  872         # Create a role assignment to ensure that it is ignored and only the
  873         # roles in the access token are used
  874         role = unit.new_role_ref(name='reader')
  875         role_id = role['id']
  876         PROVIDERS.role_api.create_role(role_id, role)
  877         PROVIDERS.assignment_api.add_role_to_user_and_project(
  878             self.user_id, self.project_id, role_id)
  879 
  880         ret_blob = json.loads(r.result['credential']['blob'])
  881         ec2token = self._test_get_token(
  882             access=ret_blob['access'], secret=ret_blob['secret'])
  883         ec2_roles = [role['id'] for role in ec2token['roles']]
  884         self.assertIn(self.role_id, ec2_roles)
  885         self.assertNotIn(role_id, ec2_roles)
  886 
  887 
  888 class TestCredentialEc2(CredentialBaseTestCase):
  889     """Test v3 credential compatibility with ec2tokens."""
  890 
  891     def test_ec2_credential_signature_validate(self):
  892         """Test signature validation with a v3 ec2 credential."""
  893         blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
  894                                             project_id=self.project_id)
  895         r = self.post('/credentials', body={'credential': ref})
  896         self.assertValidCredentialResponse(r, ref)
  897         # Assert credential id is same as hash of access key id
  898         access = blob['access'].encode('utf-8')
  899         self.assertEqual(hashlib.sha256(access).hexdigest(),
  900                          r.result['credential']['id'])
  901 
  902         cred_blob = json.loads(r.result['credential']['blob'])
  903         self.assertEqual(blob, cred_blob)
  904         self._test_get_token(access=cred_blob['access'],
  905                              secret=cred_blob['secret'])
  906 
  907     def test_ec2_credential_signature_validate_legacy(self):
  908         """Test signature validation with a legacy v3 ec2 credential."""
  909         cred_json, _ = self._create_dict_blob_credential()
  910         cred_blob = json.loads(cred_json)
  911         self._test_get_token(access=cred_blob['access'],
  912                              secret=cred_blob['secret'])
  913 
  914     def _get_ec2_cred_uri(self):
  915         return '/users/%s/credentials/OS-EC2' % self.user_id
  916 
  917     def _get_ec2_cred(self):
  918         uri = self._get_ec2_cred_uri()
  919         r = self.post(uri, body={'tenant_id': self.project_id})
  920         return r.result['credential']
  921 
  922     def test_ec2_create_credential(self):
  923         """Test ec2 credential creation."""
  924         ec2_cred = self._get_ec2_cred()
  925         self.assertEqual(self.user_id, ec2_cred['user_id'])
  926         self.assertEqual(self.project_id, ec2_cred['tenant_id'])
  927         self.assertIsNone(ec2_cred['trust_id'])
  928         self._test_get_token(access=ec2_cred['access'],
  929                              secret=ec2_cred['secret'])
  930         uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])
  931         self.assertThat(ec2_cred['links']['self'],
  932                         matchers.EndsWith(uri))
  933 
  934     def test_ec2_get_credential(self):
  935         ec2_cred = self._get_ec2_cred()
  936         uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])
  937         r = self.get(uri)
  938         self.assertDictEqual(ec2_cred, r.result['credential'])
  939         self.assertThat(ec2_cred['links']['self'],
  940                         matchers.EndsWith(uri))
  941 
  942     def test_ec2_cannot_get_non_ec2_credential(self):
  943         access_key = uuid.uuid4().hex
  944         cred_id = utils.hash_access_key(access_key)
  945         non_ec2_cred = unit.new_credential_ref(
  946             user_id=self.user_id,
  947             project_id=self.project_id)
  948         non_ec2_cred['id'] = cred_id
  949         PROVIDERS.credential_api.create_credential(cred_id, non_ec2_cred)
  950         uri = '/'.join([self._get_ec2_cred_uri(), access_key])
  951         # if access_key is not found, ec2 controller raises Unauthorized
  952         # exception
  953         self.get(uri, expected_status=http.client.UNAUTHORIZED)
  954 
  955     def test_ec2_list_credentials(self):
  956         """Test ec2 credential listing."""
  957         self._get_ec2_cred()
  958         uri = self._get_ec2_cred_uri()
  959         r = self.get(uri)
  960         cred_list = r.result['credentials']
  961         self.assertEqual(1, len(cred_list))
  962         self.assertThat(r.result['links']['self'],
  963                         matchers.EndsWith(uri))
  964 
  965         # non-EC2 credentials won't be fetched
  966         non_ec2_cred = unit.new_credential_ref(
  967             user_id=self.user_id,
  968             project_id=self.project_id)
  969         non_ec2_cred['type'] = uuid.uuid4().hex
  970         PROVIDERS.credential_api.create_credential(
  971             non_ec2_cred['id'], non_ec2_cred
  972         )
  973         r = self.get(uri)
  974         cred_list_2 = r.result['credentials']
  975         # still one element because non-EC2 credentials are not returned.
  976         self.assertEqual(1, len(cred_list_2))
  977         self.assertEqual(cred_list[0], cred_list_2[0])
  978 
  979     def test_ec2_delete_credential(self):
  980         """Test ec2 credential deletion."""
  981         ec2_cred = self._get_ec2_cred()
  982         uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])
  983         cred_from_credential_api = (
  984             PROVIDERS.credential_api
  985             .list_credentials_for_user(self.user_id, type=CRED_TYPE_EC2))
  986         self.assertEqual(1, len(cred_from_credential_api))
  987         self.delete(uri)
  988         self.assertRaises(exception.CredentialNotFound,
  989                           PROVIDERS.credential_api.get_credential,
  990                           cred_from_credential_api[0]['id'])