"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/unit/test_revoke.py" (13 May 2020, 20378 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_revoke.py": 16.0.1_vs_17.0.0.

    1 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 # not use this file except in compliance with the License. You may obtain
    3 # a copy of the License at
    4 #
    5 #      http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 # Unless required by applicable law or agreed to in writing, software
    8 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 # License for the specific language governing permissions and limitations
   11 # under the License.
   12 
   13 
   14 import datetime
   15 from unittest import mock
   16 import uuid
   17 
   18 from oslo_utils import timeutils
   19 from testtools import matchers
   20 
   21 from keystone.common import provider_api
   22 from keystone.common import utils
   23 import keystone.conf
   24 from keystone import exception
   25 from keystone.models import revoke_model
   26 from keystone.revoke.backends import sql
   27 from keystone.tests import unit
   28 from keystone.tests.unit import ksfixtures
   29 from keystone.tests.unit import test_backend_sql
   30 from keystone.token import provider
   31 
   32 
   33 CONF = keystone.conf.CONF
   34 PROVIDERS = provider_api.ProviderAPIs
   35 
   36 
   37 def _future_time():
   38     expire_delta = datetime.timedelta(seconds=1000)
   39     future_time = timeutils.utcnow() + expire_delta
   40     return future_time
   41 
   42 
   43 def _sample_blank_token():
   44     issued_delta = datetime.timedelta(minutes=-2)
   45     issued_at = timeutils.utcnow() + issued_delta
   46     token_data = revoke_model.blank_token_data(issued_at)
   47     return token_data
   48 
   49 
   50 class RevokeTests(object):
   51 
   52     def _assertTokenRevoked(self, token_data):
   53         self.assertRaises(exception.TokenNotFound,
   54                           PROVIDERS.revoke_api.check_token,
   55                           token=token_data)
   56 
   57     def _assertTokenNotRevoked(self, token_data):
   58         self.assertIsNone(PROVIDERS.revoke_api.check_token(token_data))
   59 
   60     def test_list(self):
   61         PROVIDERS.revoke_api.revoke_by_user(user_id=1)
   62         self.assertEqual(1, len(PROVIDERS.revoke_api.list_events()))
   63 
   64         PROVIDERS.revoke_api.revoke_by_user(user_id=2)
   65         self.assertEqual(2, len(PROVIDERS.revoke_api.list_events()))
   66 
   67     def test_list_since(self):
   68         PROVIDERS.revoke_api.revoke_by_user(user_id=1)
   69         PROVIDERS.revoke_api.revoke_by_user(user_id=2)
   70         past = timeutils.utcnow() - datetime.timedelta(seconds=1000)
   71         self.assertEqual(
   72             2, len(PROVIDERS.revoke_api.list_events(last_fetch=past))
   73         )
   74         future = timeutils.utcnow() + datetime.timedelta(seconds=1000)
   75         self.assertEqual(
   76             0, len(PROVIDERS.revoke_api.list_events(last_fetch=future))
   77         )
   78 
   79     def test_list_revoked_user(self):
   80         revocation_backend = sql.Revoke()
   81 
   82         # This simulates creating a token for a specific user. When we revoke
   83         # the token we should have a single revocation event in the list. We
   84         # are going to assert that the token values match the only revocation
   85         # event in the backend.
   86         first_token = _sample_blank_token()
   87         first_token['user_id'] = uuid.uuid4().hex
   88         PROVIDERS.revoke_api.revoke_by_user(user_id=first_token['user_id'])
   89         self._assertTokenRevoked(first_token)
   90         self.assertEqual(
   91             1, len(revocation_backend.list_events(token=first_token))
   92         )
   93 
   94         # This simulates creating a separate token for a separate user. We are
   95         # going to revoke the token just like we did for the previous token.
   96         # We should have two revocation events stored in the backend but only
   97         # one should match the values of the second token.
   98         second_token = _sample_blank_token()
   99         second_token['user_id'] = uuid.uuid4().hex
  100         PROVIDERS.revoke_api.revoke_by_user(user_id=second_token['user_id'])
  101         self._assertTokenRevoked(second_token)
  102         self.assertEqual(
  103             1, len(revocation_backend.list_events(token=second_token))
  104         )
  105         # This simulates creating another separate token for a separate user,
  106         # but we're not going to issue a revocation event. Even though we have
  107         # two revocation events persisted in the backend, neither of them
  108         # should match the values of the third token. If they did - our
  109         # revocation event matching would be too heavy handed, which would
  110         # result in over-generalized revocation patterns.
  111         third_token = _sample_blank_token()
  112         third_token['user_id'] = uuid.uuid4().hex
  113         self._assertTokenNotRevoked(third_token)
  114         self.assertEqual(
  115             0, len(revocation_backend.list_events(token=third_token))
  116         )
  117         # This gets a token but overrides the user_id of the token to be None.
  118         # Technically this should never happen because tokens must belong to
  119         # a user. What we're testing here is that the two revocation events
  120         # we've created won't match None values for the user_id.
  121         fourth_token = _sample_blank_token()
  122         fourth_token['user_id'] = None
  123         self._assertTokenNotRevoked(fourth_token)
  124         self.assertEqual(
  125             0, len(revocation_backend.list_events(token=fourth_token))
  126         )
  127 
  128     def test_list_revoked_project(self):
  129         revocation_backend = sql.Revoke()
  130         token = _sample_blank_token()
  131 
  132         # Create a token for a project, revoke token, check the token we
  133         # created has been revoked, and check the list returned a match for
  134         # the token when passed in.
  135         first_token = _sample_blank_token()
  136         first_token['project_id'] = uuid.uuid4().hex
  137         revocation_backend.revoke(revoke_model.RevokeEvent(
  138             project_id=first_token['project_id']))
  139         self._assertTokenRevoked(first_token)
  140         self.assertEqual(1, len(revocation_backend.list_events(
  141             token=first_token)))
  142 
  143         # Create a second token, revoke it, check the token has been revoked,
  144         # and check the list to make sure that even though we now have 2
  145         # revoked events in the revocation list, it will only return 1 because
  146         # only one match for our second_token should exist
  147         second_token = _sample_blank_token()
  148         second_token['project_id'] = uuid.uuid4().hex
  149         revocation_backend.revoke(revoke_model.RevokeEvent(
  150             project_id=second_token['project_id']))
  151         self._assertTokenRevoked(second_token)
  152         self.assertEqual(
  153             1, len(revocation_backend.list_events(token=second_token)))
  154 
  155         # This gets a token but overrides project_id of the token to be None.
  156         # We expect that since there are two events which both have populated
  157         # project_ids, this should not match this third_token with any other
  158         # event in the list so we should receive 0.
  159         third_token = _sample_blank_token()
  160         third_token['project_id'] = None
  161         self._assertTokenNotRevoked(token)
  162         self.assertEqual(0, len(revocation_backend.list_events(token=token)))
  163 
  164     def test_list_revoked_audit(self):
  165         revocation_backend = sql.Revoke()
  166 
  167         # Create a token with audit_id set, revoke it, check it is revoked,
  168         # check to make sure that list_events matches the token to the event we
  169         # just revoked.
  170         first_token = _sample_blank_token()
  171         first_token['audit_id'] = provider.random_urlsafe_str()
  172         PROVIDERS.revoke_api.revoke_by_audit_id(
  173             audit_id=first_token['audit_id'])
  174         self._assertTokenRevoked(first_token)
  175         self.assertEqual(
  176             1, len(revocation_backend.list_events(token=first_token)))
  177 
  178         # Create a second token, revoke it, check it is revoked, check to make
  179         # sure that list events only finds 1 match since there are 2 and they
  180         # dont both have different populated audit_id fields
  181         second_token = _sample_blank_token()
  182         second_token['audit_id'] = provider.random_urlsafe_str()
  183         PROVIDERS.revoke_api.revoke_by_audit_id(
  184             audit_id=second_token['audit_id'])
  185         self._assertTokenRevoked(second_token)
  186         self.assertEqual(
  187             1, len(revocation_backend.list_events(token=second_token)))
  188 
  189         # Create a third token with audit_id set to None to make sure that
  190         # since there are no events currently revoked with audit_id None this
  191         # finds no matches
  192         third_token = _sample_blank_token()
  193         third_token['audit_id'] = None
  194         self._assertTokenNotRevoked(third_token)
  195         self.assertEqual(
  196             0, len(revocation_backend.list_events(token=third_token)))
  197 
  198     def test_list_revoked_since(self):
  199         revocation_backend = sql.Revoke()
  200         token = _sample_blank_token()
  201         PROVIDERS.revoke_api.revoke_by_user(user_id=None)
  202         PROVIDERS.revoke_api.revoke_by_user(user_id=None)
  203         self.assertEqual(2, len(revocation_backend.list_events(token=token)))
  204         future = timeutils.utcnow() + datetime.timedelta(seconds=1000)
  205         token['issued_at'] = future
  206         self.assertEqual(0, len(revocation_backend.list_events(token=token)))
  207 
  208     def test_list_revoked_multiple_filters(self):
  209         revocation_backend = sql.Revoke()
  210 
  211         # create token that sets key/value filters in list_revoked
  212         first_token = _sample_blank_token()
  213         first_token['user_id'] = uuid.uuid4().hex
  214         first_token['project_id'] = uuid.uuid4().hex
  215         first_token['audit_id'] = provider.random_urlsafe_str()
  216         # revoke event and then verify that there is only one revocation
  217         # and verify the only revoked event is the token
  218         PROVIDERS.revoke_api.revoke(revoke_model.RevokeEvent(
  219             user_id=first_token['user_id'],
  220             project_id=first_token['project_id'],
  221             audit_id=first_token['audit_id']))
  222         self._assertTokenRevoked(first_token)
  223         self.assertEqual(
  224             1, len(revocation_backend.list_events(token=first_token)))
  225         # If a token has None values which the event contains it shouldn't
  226         # match and not be revoked
  227         second_token = _sample_blank_token()
  228         self._assertTokenNotRevoked(second_token)
  229         self.assertEqual(
  230             0, len(revocation_backend.list_events(token=second_token)))
  231         # If an event column and corresponding dict value don't match, Then
  232         # it should not add the event in the list. Demonstrate for project
  233         third_token = _sample_blank_token()
  234         third_token['project_id'] = uuid.uuid4().hex
  235         self._assertTokenNotRevoked(third_token)
  236         self.assertEqual(
  237             0, len(revocation_backend.list_events(token=third_token)))
  238         # A revoked event with user_id as null and token user_id non null
  239         # should still be return an event and be revoked if other non null
  240         # event fields match non null token fields
  241         fourth_token = _sample_blank_token()
  242         fourth_token['user_id'] = uuid.uuid4().hex
  243         fourth_token['project_id'] = uuid.uuid4().hex
  244         fourth_token['audit_id'] = provider.random_urlsafe_str()
  245         PROVIDERS.revoke_api.revoke(revoke_model.RevokeEvent(
  246             project_id=fourth_token['project_id'],
  247             audit_id=fourth_token['audit_id']))
  248         self._assertTokenRevoked(fourth_token)
  249         self.assertEqual(
  250             1, len(revocation_backend.list_events(token=fourth_token)))
  251 
  252     def _user_field_test(self, field_name):
  253         token = _sample_blank_token()
  254         token[field_name] = uuid.uuid4().hex
  255         PROVIDERS.revoke_api.revoke_by_user(user_id=token[field_name])
  256         self._assertTokenRevoked(token)
  257         token2 = _sample_blank_token()
  258         token2[field_name] = uuid.uuid4().hex
  259         self._assertTokenNotRevoked(token2)
  260 
  261     def test_revoke_by_user(self):
  262         self._user_field_test('user_id')
  263 
  264     def test_revoke_by_user_matches_trustee(self):
  265         self._user_field_test('trustee_id')
  266 
  267     def test_revoke_by_user_matches_trustor(self):
  268         self._user_field_test('trustor_id')
  269 
  270     def test_by_domain_user(self):
  271         revocation_backend = sql.Revoke()
  272         # If revoke a domain, then a token for a user in the domain is revoked
  273         user_id = uuid.uuid4().hex
  274         domain_id = uuid.uuid4().hex
  275 
  276         token_data = _sample_blank_token()
  277         token_data['user_id'] = user_id
  278         token_data['identity_domain_id'] = domain_id
  279 
  280         self._assertTokenNotRevoked(token_data)
  281         self.assertEqual(
  282             0, len(revocation_backend.list_events(token=token_data)))
  283 
  284         PROVIDERS.revoke_api.revoke(
  285             revoke_model.RevokeEvent(domain_id=domain_id)
  286         )
  287 
  288         self._assertTokenRevoked(token_data)
  289         self.assertEqual(
  290             1, len(revocation_backend.list_events(token=token_data)))
  291 
  292     def test_by_domain_project(self):
  293         revocation_backend = sql.Revoke()
  294 
  295         token_data = _sample_blank_token()
  296         token_data['user_id'] = uuid.uuid4().hex
  297         token_data['identity_domain_id'] = uuid.uuid4().hex
  298         token_data['project_id'] = uuid.uuid4().hex
  299         token_data['assignment_domain_id'] = uuid.uuid4().hex
  300 
  301         self._assertTokenNotRevoked(token_data)
  302         self.assertEqual(
  303             0, len(revocation_backend.list_events(token=token_data)))
  304 
  305         # If revoke a domain, then a token scoped to a project in the domain
  306         # is revoked.
  307         PROVIDERS.revoke_api.revoke(revoke_model.RevokeEvent(
  308             domain_id=token_data['assignment_domain_id']))
  309 
  310         self._assertTokenRevoked(token_data)
  311         self.assertEqual(
  312             1, len(revocation_backend.list_events(token=token_data)))
  313 
  314     def test_by_domain_domain(self):
  315         revocation_backend = sql.Revoke()
  316 
  317         token_data = _sample_blank_token()
  318         token_data['user_id'] = uuid.uuid4().hex
  319         token_data['identity_domain_id'] = uuid.uuid4().hex
  320         token_data['assignment_domain_id'] = uuid.uuid4().hex
  321 
  322         self._assertTokenNotRevoked(token_data)
  323         self.assertEqual(
  324             0, len(revocation_backend.list_events(token=token_data)))
  325 
  326         # If revoke a domain, then a token scoped to the domain is revoked.
  327         PROVIDERS.revoke_api.revoke(revoke_model.RevokeEvent(
  328             domain_id=token_data['assignment_domain_id']))
  329 
  330         self._assertTokenRevoked(token_data)
  331         self.assertEqual(
  332             1, len(revocation_backend.list_events(token=token_data)))
  333 
  334     def test_revoke_by_audit_id(self):
  335         token = _sample_blank_token()
  336         # Audit ID and Audit Chain ID are populated with the same value
  337         # if the token is an original token
  338         token['audit_id'] = uuid.uuid4().hex
  339         token['audit_chain_id'] = token['audit_id']
  340         PROVIDERS.revoke_api.revoke_by_audit_id(audit_id=token['audit_id'])
  341         self._assertTokenRevoked(token)
  342 
  343         token2 = _sample_blank_token()
  344         token2['audit_id'] = uuid.uuid4().hex
  345         token2['audit_chain_id'] = token2['audit_id']
  346         self._assertTokenNotRevoked(token2)
  347 
  348     def test_revoke_by_audit_chain_id(self):
  349         revocation_backend = sql.Revoke()
  350 
  351         # Create our first token with audit_id
  352         audit_id = provider.random_urlsafe_str()
  353         token = _sample_blank_token()
  354         # Audit ID and Audit Chain ID are populated with the same value
  355         # if the token is an original token
  356         token['audit_id'] = audit_id
  357         token['audit_chain_id'] = audit_id
  358         # Check that the token is not revoked
  359         self._assertTokenNotRevoked(token)
  360         self.assertEqual(0, len(revocation_backend.list_events(token=token)))
  361 
  362         # Revoked token by audit chain id using the audit_id
  363         PROVIDERS.revoke_api.revoke_by_audit_chain_id(audit_id)
  364         # Check that the token is now revoked
  365         self._assertTokenRevoked(token)
  366         self.assertEqual(1, len(revocation_backend.list_events(token=token)))
  367 
  368     @mock.patch.object(timeutils, 'utcnow')
  369     def test_expired_events_are_removed(self, mock_utcnow):
  370         def _sample_token_values():
  371             token = _sample_blank_token()
  372             token['expires_at'] = utils.isotime(_future_time(),
  373                                                 subsecond=True)
  374             return token
  375 
  376         now = datetime.datetime.utcnow()
  377         now_plus_2h = now + datetime.timedelta(hours=2)
  378         mock_utcnow.return_value = now
  379 
  380         # Build a token and validate it. This will seed the cache for the
  381         # future 'synchronize' call.
  382         token_values = _sample_token_values()
  383 
  384         audit_chain_id = uuid.uuid4().hex
  385         PROVIDERS.revoke_api.revoke_by_audit_chain_id(audit_chain_id)
  386         token_values['audit_chain_id'] = audit_chain_id
  387         self.assertRaises(exception.TokenNotFound,
  388                           PROVIDERS.revoke_api.check_token,
  389                           token_values)
  390 
  391         # Move our clock forward by 2h, build a new token and validate it.
  392         # 'synchronize' should now be exercised and remove old expired events
  393         mock_utcnow.return_value = now_plus_2h
  394         PROVIDERS.revoke_api.revoke_by_audit_chain_id(audit_chain_id)
  395         # two hours later, it should still be not found
  396         self.assertRaises(exception.TokenNotFound,
  397                           PROVIDERS.revoke_api.check_token,
  398                           token_values)
  399 
  400     def test_delete_group_without_role_does_not_revoke_users(self):
  401         revocation_backend = sql.Revoke()
  402         domain = unit.new_domain_ref()
  403         PROVIDERS.resource_api.create_domain(domain['id'], domain)
  404         # Create two groups. Group1 will be used to test deleting a group,
  405         # without role assignments and users in the group, doesn't create
  406         # revoked events. Group2 will show that deleting a group with role
  407         # assignment and users in the group does create revoked events
  408         group1 = unit.new_group_ref(domain_id=domain['id'])
  409         group1 = PROVIDERS.identity_api.create_group(group1)
  410         group2 = unit.new_group_ref(domain_id=domain['id'])
  411         group2 = PROVIDERS.identity_api.create_group(group2)
  412         role = unit.new_role_ref()
  413         PROVIDERS.role_api.create_role(role['id'], role)
  414         user1 = unit.new_user_ref(domain_id=domain['id'])
  415         user1 = PROVIDERS.identity_api.create_user(user1)
  416         user2 = unit.new_user_ref(domain_id=domain['id'])
  417         user2 = PROVIDERS.identity_api.create_user(user2)
  418 
  419         # Add two users to the group, verify they are added, delete group, and
  420         # check that the revocaiton events have not been created
  421         PROVIDERS.identity_api.add_user_to_group(
  422             user_id=user1['id'], group_id=group1['id']
  423         )
  424         PROVIDERS.identity_api.add_user_to_group(
  425             user_id=user2['id'], group_id=group1['id']
  426         )
  427         self.assertEqual(
  428             2, len(PROVIDERS.identity_api.list_users_in_group(group1['id'])))
  429         PROVIDERS.identity_api.delete_group(group1['id'])
  430         self.assertEqual(0, len(revocation_backend.list_events()))
  431 
  432         # Assign a role to the group, add two users to the group, verify that
  433         # the role has been assigned to the group, verify the users have been
  434         # added to the group, delete the group, check that the revocation
  435         # events have been created
  436         PROVIDERS.assignment_api.create_grant(
  437             group_id=group2['id'], domain_id=domain['id'], role_id=role['id']
  438         )
  439         grants = PROVIDERS.assignment_api.list_role_assignments(
  440             role_id=role['id']
  441         )
  442         self.assertThat(grants, matchers.HasLength(1))
  443         PROVIDERS.identity_api.add_user_to_group(
  444             user_id=user1['id'], group_id=group2['id']
  445         )
  446         PROVIDERS.identity_api.add_user_to_group(
  447             user_id=user2['id'], group_id=group2['id']
  448         )
  449         self.assertEqual(
  450             2, len(PROVIDERS.identity_api.list_users_in_group(group2['id'])))
  451         PROVIDERS.identity_api.delete_group(group2['id'])
  452         self.assertEqual(2, len(revocation_backend.list_events()))
  453 
  454 
  455 class FernetSqlRevokeTests(test_backend_sql.SqlTests, RevokeTests):
  456     def config_overrides(self):
  457         super(FernetSqlRevokeTests, self).config_overrides()
  458         self.config_fixture.config(
  459             group='token',
  460             provider='fernet',
  461             revoke_by_id=False)
  462         self.useFixture(
  463             ksfixtures.KeyRepository(
  464                 self.config_fixture,
  465                 'fernet_tokens',
  466                 CONF.fernet_tokens.max_active_keys
  467             )
  468         )