"Fossies" - the Fresh Open Source Software Archive

Member "keystone-16.0.2/keystone/tests/unit/identity/test_backend_sql.py" (7 Jun 2021, 43760 Bytes) of package /linux/misc/openstack/keystone-16.0.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_backend_sql.py": 16.0.1_vs_16.0.2.

    1 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 # not use this file except in compliance with the License. You may obtain
    3 # a copy of the License at
    4 #
    5 #      http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 # Unless required by applicable law or agreed to in writing, software
    8 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 # License for the specific language governing permissions and limitations
   11 # under the License.
   12 
   13 import datetime
   14 import uuid
   15 
   16 import freezegun
   17 import passlib.hash
   18 
   19 from keystone.common import password_hashing
   20 from keystone.common import provider_api
   21 from keystone.common import resource_options
   22 from keystone.common import sql
   23 import keystone.conf
   24 from keystone import exception
   25 from keystone.identity.backends import base
   26 from keystone.identity.backends import resource_options as iro
   27 from keystone.identity.backends import sql_model as model
   28 from keystone.tests.unit import test_backend_sql
   29 
   30 
   31 CONF = keystone.conf.CONF
   32 PROVIDERS = provider_api.ProviderAPIs
   33 
   34 
   35 class UserPasswordCreatedAtIntTests(test_backend_sql.SqlTests):
   36     def config_overrides(self):
   37         super(UserPasswordCreatedAtIntTests, self).config_overrides()
   38         self.config_fixture.config(group='security_compliance',
   39                                    password_expires_days=1)
   40 
   41     def test_user_password_created_expired_at_int_matches_created_at(self):
   42         with sql.session_for_read() as session:
   43             user_ref = PROVIDERS.identity_api._get_user(
   44                 session, self.user_foo['id']
   45             )
   46             self.assertIsNotNone(user_ref.password_ref._created_at)
   47             self.assertIsNotNone(user_ref.password_ref._expires_at)
   48             self.assertEqual(user_ref.password_ref._created_at,
   49                              user_ref.password_ref.created_at_int)
   50             self.assertEqual(user_ref.password_ref._expires_at,
   51                              user_ref.password_ref.expires_at_int)
   52             self.assertEqual(user_ref.password_ref.created_at,
   53                              user_ref.password_ref.created_at_int)
   54             self.assertEqual(user_ref.password_ref.expires_at,
   55                              user_ref.password_ref.expires_at_int)
   56 
   57 
   58 class UserPasswordHashingTestsNoCompat(test_backend_sql.SqlTests):
   59     def config_overrides(self):
   60         super(UserPasswordHashingTestsNoCompat, self).config_overrides()
   61         self.config_fixture.config(group='identity',
   62                                    password_hash_algorithm='scrypt')
   63 
   64     def test_configured_algorithm_used(self):
   65         with sql.session_for_read() as session:
   66             user_ref = PROVIDERS.identity_api._get_user(
   67                 session, self.user_foo['id']
   68             )
   69         self.assertEqual(
   70             passlib.hash.scrypt,
   71             password_hashing._get_hasher_from_ident(user_ref.password))
   72 
   73 
   74 class UserResourceOptionTests(test_backend_sql.SqlTests):
   75     def setUp(self):
   76         super(UserResourceOptionTests, self).setUp()
   77         # RESET STATE OF REGISTRY OPTIONS
   78         self.addCleanup(iro.register_user_options)
   79         self.addCleanup(iro.USER_OPTIONS_REGISTRY._registered_options.clear)
   80 
   81         self.option1 = resource_options.ResourceOption('opt1', 'option1')
   82         self.option2 = resource_options.ResourceOption('opt2', 'option2')
   83         self.cleanup_instance('option1', 'option2')
   84 
   85         iro.USER_OPTIONS_REGISTRY._registered_options.clear()
   86         iro.USER_OPTIONS_REGISTRY.register_option(self.option1)
   87         iro.USER_OPTIONS_REGISTRY.register_option(self.option2)
   88 
   89     def test_user_set_option_in_resource_option(self):
   90         user = self._create_user(self._get_user_dict())
   91         opt_value = uuid.uuid4().hex
   92         user['options'][self.option1.option_name] = opt_value
   93         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
   94         self.assertEqual(opt_value,
   95                          new_ref['options'][self.option1.option_name])
   96         raw_ref = self._get_user_ref(user['id'])
   97         self.assertIn(self.option1.option_id, raw_ref._resource_option_mapper)
   98         self.assertEqual(
   99             opt_value,
  100             raw_ref._resource_option_mapper[
  101                 self.option1.option_id].option_value)
  102         api_get_ref = PROVIDERS.identity_api.get_user(user['id'])
  103         # Ensure options are properly set in a .get_user call.
  104         self.assertEqual(opt_value,
  105                          api_get_ref['options'][self.option1.option_name])
  106 
  107     def test_user_add_update_delete_option_in_resource_option(self):
  108         user = self._create_user(self._get_user_dict())
  109 
  110         opt_value = uuid.uuid4().hex
  111         new_opt_value = uuid.uuid4().hex
  112 
  113         # Update user to add the new value option
  114         user['options'][self.option1.option_name] = opt_value
  115         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
  116         self.assertEqual(opt_value,
  117                          new_ref['options'][self.option1.option_name])
  118 
  119         # Update the option Value and confirm it is updated
  120         user['options'][self.option1.option_name] = new_opt_value
  121         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
  122         self.assertEqual(new_opt_value,
  123                          new_ref['options'][self.option1.option_name])
  124 
  125         # Set the option value to None, meaning delete the option
  126         user['options'][self.option1.option_name] = None
  127         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
  128         self.assertNotIn(self.option1.option_name, new_ref['options'])
  129 
  130     def test_user_add_delete_resource_option_existing_option_values(self):
  131         user = self._create_user(self._get_user_dict())
  132 
  133         opt_value = uuid.uuid4().hex
  134         opt2_value = uuid.uuid4().hex
  135 
  136         # Update user to add the new value option
  137         user['options'][self.option1.option_name] = opt_value
  138         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
  139         self.assertEqual(opt_value,
  140                          new_ref['options'][self.option1.option_name])
  141 
  142         # Update the option value for option 2 and confirm it is updated and
  143         # option1's value remains the same. Option 1 is not specified in the
  144         # updated user ref.
  145         del user['options'][self.option1.option_name]
  146         user['options'][self.option2.option_name] = opt2_value
  147         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
  148         self.assertEqual(opt_value,
  149                          new_ref['options'][self.option1.option_name])
  150         self.assertEqual(opt2_value,
  151                          new_ref['options'][self.option2.option_name])
  152         raw_ref = self._get_user_ref(user['id'])
  153         self.assertEqual(
  154             opt_value,
  155             raw_ref._resource_option_mapper[
  156                 self.option1.option_id].option_value)
  157         self.assertEqual(
  158             opt2_value,
  159             raw_ref._resource_option_mapper[
  160                 self.option2.option_id].option_value)
  161 
  162         # Set the option value to None, meaning delete the option, ensure
  163         # option 2 still remains and has the right value
  164         user['options'][self.option1.option_name] = None
  165         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
  166         self.assertNotIn(self.option1.option_name, new_ref['options'])
  167         self.assertEqual(opt2_value,
  168                          new_ref['options'][self.option2.option_name])
  169         raw_ref = self._get_user_ref(user['id'])
  170         self.assertNotIn(raw_ref._resource_option_mapper,
  171                          self.option1.option_id)
  172         self.assertEqual(
  173             opt2_value,
  174             raw_ref._resource_option_mapper[
  175                 self.option2.option_id].option_value)
  176 
  177     def test_unregistered_resource_option_deleted(self):
  178         user = self._create_user(self._get_user_dict())
  179 
  180         opt_value = uuid.uuid4().hex
  181         opt2_value = uuid.uuid4().hex
  182 
  183         # Update user to add the new value option
  184         user['options'][self.option1.option_name] = opt_value
  185         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
  186         self.assertEqual(opt_value,
  187                          new_ref['options'][self.option1.option_name])
  188 
  189         # Update the option value for option 2 and confirm it is updated and
  190         # option1's value remains the same. Option 1 is not specified in the
  191         # updated user ref.
  192         del user['options'][self.option1.option_name]
  193         user['options'][self.option2.option_name] = opt2_value
  194         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
  195         self.assertEqual(opt_value,
  196                          new_ref['options'][self.option1.option_name])
  197         self.assertEqual(opt2_value,
  198                          new_ref['options'][self.option2.option_name])
  199         raw_ref = self._get_user_ref(user['id'])
  200         self.assertEqual(
  201             opt_value,
  202             raw_ref._resource_option_mapper[
  203                 self.option1.option_id].option_value)
  204         self.assertEqual(
  205             opt2_value,
  206             raw_ref._resource_option_mapper[
  207                 self.option2.option_id].option_value)
  208 
  209         # clear registered options and only re-register option1, update user
  210         # and confirm option2 is gone from the ref and returned dict
  211         iro.USER_OPTIONS_REGISTRY._registered_options.clear()
  212         iro.USER_OPTIONS_REGISTRY.register_option(self.option1)
  213         user['name'] = uuid.uuid4().hex
  214         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
  215         self.assertNotIn(self.option2.option_name, new_ref['options'])
  216         self.assertEqual(opt_value,
  217                          new_ref['options'][self.option1.option_name])
  218         raw_ref = self._get_user_ref(user['id'])
  219         self.assertNotIn(raw_ref._resource_option_mapper,
  220                          self.option2.option_id)
  221         self.assertEqual(
  222             opt_value,
  223             raw_ref._resource_option_mapper[
  224                 self.option1.option_id].option_value)
  225 
  226     def _get_user_ref(self, user_id):
  227         with sql.session_for_read() as session:
  228             return session.query(model.User).get(user_id)
  229 
  230     def _create_user(self, user_dict):
  231         user_dict['id'] = uuid.uuid4().hex
  232         with sql.session_for_write() as session:
  233             user_ref = model.User.from_dict(user_dict)
  234             session.add(user_ref)
  235             return base.filter_user(user_ref.to_dict())
  236 
  237     def _get_user_dict(self):
  238         user = {
  239             'name': uuid.uuid4().hex,
  240             'domain_id': CONF.identity.default_domain_id,
  241             'enabled': True,
  242             'password': uuid.uuid4().hex
  243         }
  244         return user
  245 
  246 
  247 class DisableInactiveUserTests(test_backend_sql.SqlTests):
  248     def setUp(self):
  249         super(DisableInactiveUserTests, self).setUp()
  250         self.password = uuid.uuid4().hex
  251         self.user_dict = self._get_user_dict(self.password)
  252         self.max_inactive_days = 90
  253         self.config_fixture.config(
  254             group='security_compliance',
  255             disable_user_account_days_inactive=self.max_inactive_days)
  256 
  257     def test_authenticate_user_disabled_due_to_inactivity(self):
  258         # create user and set last_active_at beyond the max
  259         last_active_at = (
  260             datetime.datetime.utcnow() -
  261             datetime.timedelta(days=self.max_inactive_days + 1))
  262         user = self._create_user(self.user_dict, last_active_at.date())
  263         with self.make_request():
  264             self.assertRaises(exception.UserDisabled,
  265                               PROVIDERS.identity_api.authenticate,
  266                               user_id=user['id'],
  267                               password=self.password)
  268             # verify that the user is actually disabled
  269             user = PROVIDERS.identity_api.get_user(user['id'])
  270             self.assertFalse(user['enabled'])
  271             # set the user to enabled and authenticate
  272             user['enabled'] = True
  273             PROVIDERS.identity_api.update_user(user['id'], user)
  274             user = PROVIDERS.identity_api.authenticate(
  275                 user_id=user['id'], password=self.password
  276             )
  277             self.assertTrue(user['enabled'])
  278 
  279     def test_authenticate_user_not_disabled_due_to_inactivity(self):
  280         # create user and set last_active_at just below the max
  281         last_active_at = (
  282             datetime.datetime.utcnow() -
  283             datetime.timedelta(days=self.max_inactive_days - 1)).date()
  284         user = self._create_user(self.user_dict, last_active_at)
  285         with self.make_request():
  286             user = PROVIDERS.identity_api.authenticate(
  287                 user_id=user['id'], password=self.password
  288             )
  289         self.assertTrue(user['enabled'])
  290 
  291     def test_get_user_disabled_due_to_inactivity(self):
  292         user = PROVIDERS.identity_api.create_user(self.user_dict)
  293         # set last_active_at just beyond the max
  294         last_active_at = (
  295             datetime.datetime.utcnow() -
  296             datetime.timedelta(self.max_inactive_days + 1)).date()
  297         self._update_user_last_active_at(user['id'], last_active_at)
  298         # get user and verify that the user is actually disabled
  299         user = PROVIDERS.identity_api.get_user(user['id'])
  300         self.assertFalse(user['enabled'])
  301         # set enabled and test
  302         user['enabled'] = True
  303         PROVIDERS.identity_api.update_user(user['id'], user)
  304         user = PROVIDERS.identity_api.get_user(user['id'])
  305         self.assertTrue(user['enabled'])
  306 
  307     def test_get_user_not_disabled_due_to_inactivity(self):
  308         user = PROVIDERS.identity_api.create_user(self.user_dict)
  309         self.assertTrue(user['enabled'])
  310         # set last_active_at just below the max
  311         last_active_at = (
  312             datetime.datetime.utcnow() -
  313             datetime.timedelta(self.max_inactive_days - 1)).date()
  314         self._update_user_last_active_at(user['id'], last_active_at)
  315         # get user and verify that the user is still enabled
  316         user = PROVIDERS.identity_api.get_user(user['id'])
  317         self.assertTrue(user['enabled'])
  318 
  319     def test_enabled_after_create_update_user(self):
  320         self.config_fixture.config(group='security_compliance',
  321                                    disable_user_account_days_inactive=90)
  322         # create user without enabled; assert enabled
  323         del self.user_dict['enabled']
  324         user = PROVIDERS.identity_api.create_user(self.user_dict)
  325         user_ref = self._get_user_ref(user['id'])
  326         self.assertTrue(user_ref.enabled)
  327         now = datetime.datetime.utcnow().date()
  328         self.assertGreaterEqual(now, user_ref.last_active_at)
  329         # set enabled and test
  330         user['enabled'] = True
  331         PROVIDERS.identity_api.update_user(user['id'], user)
  332         user_ref = self._get_user_ref(user['id'])
  333         self.assertTrue(user_ref.enabled)
  334         # set disabled and test
  335         user['enabled'] = False
  336         PROVIDERS.identity_api.update_user(user['id'], user)
  337         user_ref = self._get_user_ref(user['id'])
  338         self.assertFalse(user_ref.enabled)
  339         # re-enable user and test
  340         user['enabled'] = True
  341         PROVIDERS.identity_api.update_user(user['id'], user)
  342         user_ref = self._get_user_ref(user['id'])
  343         self.assertTrue(user_ref.enabled)
  344 
  345     def _get_user_dict(self, password):
  346         user = {
  347             'name': uuid.uuid4().hex,
  348             'domain_id': CONF.identity.default_domain_id,
  349             'enabled': True,
  350             'password': password
  351         }
  352         return user
  353 
  354     def _get_user_ref(self, user_id):
  355         with sql.session_for_read() as session:
  356             return session.query(model.User).get(user_id)
  357 
  358     def _create_user(self, user_dict, last_active_at):
  359         user_dict['id'] = uuid.uuid4().hex
  360         with sql.session_for_write() as session:
  361             user_ref = model.User.from_dict(user_dict)
  362             user_ref.last_active_at = last_active_at
  363             session.add(user_ref)
  364             return base.filter_user(user_ref.to_dict())
  365 
  366     def _update_user_last_active_at(self, user_id, last_active_at):
  367         with sql.session_for_write() as session:
  368             user_ref = session.query(model.User).get(user_id)
  369             user_ref.last_active_at = last_active_at
  370             return user_ref
  371 
  372 
  373 class PasswordHistoryValidationTests(test_backend_sql.SqlTests):
  374     def setUp(self):
  375         super(PasswordHistoryValidationTests, self).setUp()
  376         self.max_cnt = 3
  377         self.config_fixture.config(group='security_compliance',
  378                                    unique_last_password_count=self.max_cnt)
  379 
  380     def test_validate_password_history_with_invalid_password(self):
  381         password = uuid.uuid4().hex
  382         user = self._create_user(password)
  383         # Attempt to change to the same password
  384         with self.make_request():
  385             self.assertRaises(exception.PasswordValidationError,
  386                               PROVIDERS.identity_api.change_password,
  387                               user_id=user['id'],
  388                               original_password=password,
  389                               new_password=password)
  390             # Attempt to change to a unique password
  391             new_password = uuid.uuid4().hex
  392             self.assertValidChangePassword(user['id'], password, new_password)
  393             # Attempt to change back to the initial password
  394             self.assertRaises(exception.PasswordValidationError,
  395                               PROVIDERS.identity_api.change_password,
  396                               user_id=user['id'],
  397                               original_password=new_password,
  398                               new_password=password)
  399 
  400     def test_validate_password_history_with_valid_password(self):
  401         passwords = [uuid.uuid4().hex, uuid.uuid4().hex, uuid.uuid4().hex,
  402                      uuid.uuid4().hex]
  403         user = self._create_user(passwords[0])
  404         self.assertValidChangePassword(user['id'], passwords[0], passwords[1])
  405         self.assertValidChangePassword(user['id'], passwords[1], passwords[2])
  406         self.assertValidChangePassword(user['id'], passwords[2], passwords[3])
  407         # Now you should be able to change the password to match the initial
  408         # password because the password history only contains password elements
  409         # 1, 2, 3
  410         self.assertValidChangePassword(user['id'], passwords[3], passwords[0])
  411 
  412     def test_validate_password_history_with_valid_password_only_once(self):
  413         self.config_fixture.config(group='security_compliance',
  414                                    unique_last_password_count=1)
  415         passwords = [uuid.uuid4().hex, uuid.uuid4().hex]
  416         user = self._create_user(passwords[0])
  417         self.assertValidChangePassword(user['id'], passwords[0], passwords[1])
  418         self.assertValidChangePassword(user['id'], passwords[1], passwords[0])
  419 
  420     def test_validate_password_history_but_start_with_password_none(self):
  421         passwords = [uuid.uuid4().hex, uuid.uuid4().hex]
  422         # Create user and confirm password is None
  423         user = self._create_user(None)
  424         user_ref = self._get_user_ref(user['id'])
  425         self.assertIsNone(user_ref.password)
  426         # Admin password reset
  427         user['password'] = passwords[0]
  428         PROVIDERS.identity_api.update_user(user['id'], user)
  429         # Self-service change password
  430         self.assertValidChangePassword(user['id'], passwords[0], passwords[1])
  431         # Attempt to update with a previous password
  432         with self.make_request():
  433             self.assertRaises(exception.PasswordValidationError,
  434                               PROVIDERS.identity_api.change_password,
  435                               user_id=user['id'],
  436                               original_password=passwords[1],
  437                               new_password=passwords[0])
  438 
  439     def test_disable_password_history_and_repeat_same_password(self):
  440         self.config_fixture.config(group='security_compliance',
  441                                    unique_last_password_count=0)
  442         password = uuid.uuid4().hex
  443         user = self._create_user(password)
  444         # Repeatedly change password with the same password
  445         self.assertValidChangePassword(user['id'], password, password)
  446         self.assertValidChangePassword(user['id'], password, password)
  447 
  448     def test_admin_password_reset_is_not_validated_by_password_history(self):
  449         passwords = [uuid.uuid4().hex, uuid.uuid4().hex]
  450         user = self._create_user(passwords[0])
  451         # Attempt to change password to a unique password
  452         user['password'] = passwords[1]
  453         with self.make_request():
  454             PROVIDERS.identity_api.update_user(user['id'], user)
  455             PROVIDERS.identity_api.authenticate(
  456                 user_id=user['id'], password=passwords[1]
  457             )
  458             # Attempt to change password with the same password
  459             user['password'] = passwords[1]
  460             PROVIDERS.identity_api.update_user(user['id'], user)
  461             PROVIDERS.identity_api.authenticate(
  462                 user_id=user['id'], password=passwords[1]
  463             )
  464             # Attempt to change password with the initial password
  465             user['password'] = passwords[0]
  466             PROVIDERS.identity_api.update_user(user['id'], user)
  467             PROVIDERS.identity_api.authenticate(
  468                 user_id=user['id'], password=passwords[0]
  469             )
  470 
  471     def test_truncate_passwords(self):
  472         user = self._create_user(uuid.uuid4().hex)
  473         self._add_passwords_to_history(user, n=4)
  474         user_ref = self._get_user_ref(user['id'])
  475         self.assertEqual(
  476             len(user_ref.local_user.passwords), (self.max_cnt + 1))
  477 
  478     def test_truncate_passwords_when_max_is_default(self):
  479         self.max_cnt = 1
  480         expected_length = self.max_cnt + 1
  481         self.config_fixture.config(group='security_compliance',
  482                                    unique_last_password_count=self.max_cnt)
  483         user = self._create_user(uuid.uuid4().hex)
  484         self._add_passwords_to_history(user, n=4)
  485         user_ref = self._get_user_ref(user['id'])
  486         self.assertEqual(len(user_ref.local_user.passwords), expected_length)
  487         # Start with multiple passwords and then change max_cnt to one
  488         self.max_cnt = 4
  489         self.config_fixture.config(group='security_compliance',
  490                                    unique_last_password_count=self.max_cnt)
  491         self._add_passwords_to_history(user, n=self.max_cnt)
  492         user_ref = self._get_user_ref(user['id'])
  493         self.assertEqual(
  494             len(user_ref.local_user.passwords), (self.max_cnt + 1))
  495         self.max_cnt = 1
  496         self.config_fixture.config(group='security_compliance',
  497                                    unique_last_password_count=self.max_cnt)
  498         self._add_passwords_to_history(user, n=1)
  499         user_ref = self._get_user_ref(user['id'])
  500         self.assertEqual(len(user_ref.local_user.passwords), expected_length)
  501 
  502     def test_truncate_passwords_when_max_is_default_and_no_password(self):
  503         expected_length = 1
  504         self.max_cnt = 1
  505         self.config_fixture.config(group='security_compliance',
  506                                    unique_last_password_count=self.max_cnt)
  507         user = {
  508             'name': uuid.uuid4().hex,
  509             'domain_id': 'default',
  510             'enabled': True,
  511         }
  512         user = PROVIDERS.identity_api.create_user(user)
  513         self._add_passwords_to_history(user, n=1)
  514         user_ref = self._get_user_ref(user['id'])
  515         self.assertEqual(len(user_ref.local_user.passwords), expected_length)
  516 
  517     def _create_user(self, password):
  518         user = {
  519             'name': uuid.uuid4().hex,
  520             'domain_id': 'default',
  521             'enabled': True,
  522             'password': password
  523         }
  524         return PROVIDERS.identity_api.create_user(user)
  525 
  526     def assertValidChangePassword(self, user_id, password, new_password):
  527         with self.make_request():
  528             PROVIDERS.identity_api.change_password(
  529                 user_id=user_id, original_password=password,
  530                 new_password=new_password
  531             )
  532             PROVIDERS.identity_api.authenticate(
  533                 user_id=user_id, password=new_password
  534             )
  535 
  536     def _add_passwords_to_history(self, user, n):
  537         for _ in range(n):
  538             user['password'] = uuid.uuid4().hex
  539             PROVIDERS.identity_api.update_user(user['id'], user)
  540 
  541     def _get_user_ref(self, user_id):
  542         with sql.session_for_read() as session:
  543             return PROVIDERS.identity_api._get_user(session, user_id)
  544 
  545 
  546 class LockingOutUserTests(test_backend_sql.SqlTests):
  547     def setUp(self):
  548         super(LockingOutUserTests, self).setUp()
  549         self.config_fixture.config(
  550             group='security_compliance',
  551             lockout_failure_attempts=6)
  552         self.config_fixture.config(
  553             group='security_compliance',
  554             lockout_duration=5)
  555         # create user
  556         self.password = uuid.uuid4().hex
  557         user_dict = {
  558             'name': uuid.uuid4().hex,
  559             'domain_id': CONF.identity.default_domain_id,
  560             'enabled': True,
  561             'password': self.password
  562         }
  563         self.user = PROVIDERS.identity_api.create_user(user_dict)
  564 
  565     def test_locking_out_user_after_max_failed_attempts(self):
  566         with self.make_request():
  567             # authenticate with wrong password
  568             self.assertRaises(AssertionError,
  569                               PROVIDERS.identity_api.authenticate,
  570                               user_id=self.user['id'],
  571                               password=uuid.uuid4().hex)
  572             # authenticate with correct password
  573             PROVIDERS.identity_api.authenticate(
  574                 user_id=self.user['id'],
  575                 password=self.password
  576             )
  577             # test locking out user after max failed attempts
  578             self._fail_auth_repeatedly(self.user['id'])
  579             self.assertRaises(exception.Unauthorized,
  580                               PROVIDERS.identity_api.authenticate,
  581                               user_id=self.user['id'],
  582                               password=uuid.uuid4().hex)
  583 
  584     def test_lock_out_for_ignored_user(self):
  585         # mark the user as exempt from failed password attempts
  586         # ignore user and reset password, password not expired
  587         self.user['options'][iro.IGNORE_LOCKOUT_ATTEMPT_OPT.option_name] = True
  588         PROVIDERS.identity_api.update_user(self.user['id'], self.user)
  589 
  590         # fail authentication repeatedly the max number of times
  591         self._fail_auth_repeatedly(self.user['id'])
  592         # authenticate with wrong password, account should not be locked
  593         with self.make_request():
  594             self.assertRaises(AssertionError,
  595                               PROVIDERS.identity_api.authenticate,
  596                               user_id=self.user['id'],
  597                               password=uuid.uuid4().hex)
  598             # authenticate with correct password, account should not be locked
  599             PROVIDERS.identity_api.authenticate(
  600                 user_id=self.user['id'],
  601                 password=self.password
  602             )
  603 
  604     def test_set_enabled_unlocks_user(self):
  605         with self.make_request():
  606             # lockout user
  607             self._fail_auth_repeatedly(self.user['id'])
  608             self.assertRaises(exception.Unauthorized,
  609                               PROVIDERS.identity_api.authenticate,
  610                               user_id=self.user['id'],
  611                               password=uuid.uuid4().hex)
  612             # set enabled, user should be unlocked
  613             self.user['enabled'] = True
  614             PROVIDERS.identity_api.update_user(self.user['id'], self.user)
  615             user_ret = PROVIDERS.identity_api.authenticate(
  616                 user_id=self.user['id'],
  617                 password=self.password
  618             )
  619             self.assertTrue(user_ret['enabled'])
  620 
  621     def test_lockout_duration(self):
  622         # freeze time
  623         with freezegun.freeze_time(datetime.datetime.utcnow()) as frozen_time:
  624             with self.make_request():
  625                 # lockout user
  626                 self._fail_auth_repeatedly(self.user['id'])
  627                 self.assertRaises(exception.Unauthorized,
  628                                   PROVIDERS.identity_api.authenticate,
  629                                   user_id=self.user['id'],
  630                                   password=uuid.uuid4().hex)
  631                 # freeze time past the duration, user should be unlocked and
  632                 # failed auth count should get reset
  633                 frozen_time.tick(delta=datetime.timedelta(
  634                     seconds=CONF.security_compliance.lockout_duration + 1))
  635                 PROVIDERS.identity_api.authenticate(
  636                     user_id=self.user['id'],
  637                     password=self.password
  638                 )
  639                 # test failed auth count was reset by authenticating with the
  640                 # wrong password, should raise an assertion error and not
  641                 # account locked
  642                 self.assertRaises(AssertionError,
  643                                   PROVIDERS.identity_api.authenticate,
  644                                   user_id=self.user['id'],
  645                                   password=uuid.uuid4().hex)
  646 
  647     def test_lockout_duration_failed_auth_cnt_resets(self):
  648         # freeze time
  649         with freezegun.freeze_time(datetime.datetime.utcnow()) as frozen_time:
  650             with self.make_request():
  651                 # lockout user
  652                 self._fail_auth_repeatedly(self.user['id'])
  653                 self.assertRaises(exception.Unauthorized,
  654                                   PROVIDERS.identity_api.authenticate,
  655                                   user_id=self.user['id'],
  656                                   password=uuid.uuid4().hex)
  657                 # freeze time past the duration, failed_auth_cnt should reset
  658                 frozen_time.tick(delta=datetime.timedelta(
  659                     seconds=CONF.security_compliance.lockout_duration + 1))
  660                 # repeat failed auth the max times
  661                 self._fail_auth_repeatedly(self.user['id'])
  662                 # test user account is locked
  663                 self.assertRaises(exception.Unauthorized,
  664                                   PROVIDERS.identity_api.authenticate,
  665                                   user_id=self.user['id'],
  666                                   password=uuid.uuid4().hex)
  667 
  668     def _fail_auth_repeatedly(self, user_id):
  669         wrong_password = uuid.uuid4().hex
  670         for _ in range(CONF.security_compliance.lockout_failure_attempts):
  671             with self.make_request():
  672                 self.assertRaises(AssertionError,
  673                                   PROVIDERS.identity_api.authenticate,
  674                                   user_id=user_id,
  675                                   password=wrong_password)
  676 
  677 
  678 class PasswordExpiresValidationTests(test_backend_sql.SqlTests):
  679     def setUp(self):
  680         super(PasswordExpiresValidationTests, self).setUp()
  681         self.password = uuid.uuid4().hex
  682         self.user_dict = self._get_test_user_dict(self.password)
  683         self.config_fixture.config(
  684             group='security_compliance',
  685             password_expires_days=90)
  686 
  687     def test_authenticate_with_expired_password(self):
  688         # set password created_at so that the password will expire
  689         password_created_at = (
  690             datetime.datetime.utcnow() -
  691             datetime.timedelta(
  692                 days=CONF.security_compliance.password_expires_days + 1)
  693         )
  694         user = self._create_user(self.user_dict, password_created_at)
  695         # test password is expired
  696         with self.make_request():
  697             self.assertRaises(exception.PasswordExpired,
  698                               PROVIDERS.identity_api.authenticate,
  699                               user_id=user['id'],
  700                               password=self.password)
  701 
  702     def test_authenticate_with_non_expired_password(self):
  703         # set password created_at so that the password will not expire
  704         password_created_at = (
  705             datetime.datetime.utcnow() -
  706             datetime.timedelta(
  707                 days=CONF.security_compliance.password_expires_days - 1)
  708         )
  709         user = self._create_user(self.user_dict, password_created_at)
  710         # test password is not expired
  711         with self.make_request():
  712             PROVIDERS.identity_api.authenticate(
  713                 user_id=user['id'], password=self.password
  714             )
  715 
  716     def test_authenticate_with_expired_password_for_ignore_user_option(self):
  717         # set user to have the 'ignore_password_expiry' option set to False
  718         self.user_dict.setdefault('options', {})[
  719             iro.IGNORE_PASSWORD_EXPIRY_OPT.option_name] = False
  720         # set password created_at so that the password will expire
  721         password_created_at = (
  722             datetime.datetime.utcnow() -
  723             datetime.timedelta(
  724                 days=CONF.security_compliance.password_expires_days + 1)
  725         )
  726         user = self._create_user(self.user_dict, password_created_at)
  727         with self.make_request():
  728             self.assertRaises(exception.PasswordExpired,
  729                               PROVIDERS.identity_api.authenticate,
  730                               user_id=user['id'],
  731                               password=self.password)
  732 
  733             # update user to explicitly have the expiry option to True
  734             user['options'][
  735                 iro.IGNORE_PASSWORD_EXPIRY_OPT.option_name] = True
  736             user = PROVIDERS.identity_api.update_user(
  737                 user['id'], user
  738             )
  739             # test password is not expired due to ignore option
  740             PROVIDERS.identity_api.authenticate(
  741                 user_id=user['id'], password=self.password
  742             )
  743 
  744     def _get_test_user_dict(self, password):
  745         test_user_dict = {
  746             'id': uuid.uuid4().hex,
  747             'name': uuid.uuid4().hex,
  748             'domain_id': CONF.identity.default_domain_id,
  749             'enabled': True,
  750             'password': password
  751         }
  752         return test_user_dict
  753 
  754     def _create_user(self, user_dict, password_created_at):
  755         # Bypass business logic and go straight for the identity driver
  756         # (SQL in this case)
  757         driver = PROVIDERS.identity_api.driver
  758         driver.create_user(user_dict['id'], user_dict)
  759         with sql.session_for_write() as session:
  760             user_ref = session.query(model.User).get(user_dict['id'])
  761             user_ref.password_ref.created_at = password_created_at
  762             user_ref.password_ref.expires_at = (
  763                 user_ref._get_password_expires_at(password_created_at))
  764             return base.filter_user(user_ref.to_dict())
  765 
  766 
  767 class MinimumPasswordAgeTests(test_backend_sql.SqlTests):
  768     def setUp(self):
  769         super(MinimumPasswordAgeTests, self).setUp()
  770         self.config_fixture.config(
  771             group='security_compliance',
  772             minimum_password_age=1)
  773         self.initial_password = uuid.uuid4().hex
  774         self.user = self._create_new_user(self.initial_password)
  775 
  776     def test_user_cannot_change_password_before_min_age(self):
  777         # user can change password after create
  778         new_password = uuid.uuid4().hex
  779         self.assertValidChangePassword(self.user['id'], self.initial_password,
  780                                        new_password)
  781         # user cannot change password before min age
  782         with self.make_request():
  783             self.assertRaises(exception.PasswordAgeValidationError,
  784                               PROVIDERS.identity_api.change_password,
  785                               user_id=self.user['id'],
  786                               original_password=new_password,
  787                               new_password=uuid.uuid4().hex)
  788 
  789     def test_user_can_change_password_after_min_age(self):
  790         # user can change password after create
  791         new_password = uuid.uuid4().hex
  792         self.assertValidChangePassword(self.user['id'], self.initial_password,
  793                                        new_password)
  794         # set password_created_at so that the min password age has past
  795         password_created_at = (
  796             datetime.datetime.utcnow() -
  797             datetime.timedelta(
  798                 days=CONF.security_compliance.minimum_password_age + 1))
  799         self._update_password_created_at(self.user['id'], password_created_at)
  800         # user can change their password after min password age has past
  801         self.assertValidChangePassword(self.user['id'], new_password,
  802                                        uuid.uuid4().hex)
  803 
  804     def test_user_can_change_password_after_admin_reset(self):
  805         # user can change password after create
  806         new_password = uuid.uuid4().hex
  807         self.assertValidChangePassword(self.user['id'], self.initial_password,
  808                                        new_password)
  809         # user cannot change password before min age
  810 
  811         with self.make_request():
  812             self.assertRaises(exception.PasswordAgeValidationError,
  813                               PROVIDERS.identity_api.change_password,
  814                               user_id=self.user['id'],
  815                               original_password=new_password,
  816                               new_password=uuid.uuid4().hex)
  817         # admin reset
  818         new_password = uuid.uuid4().hex
  819         self.user['password'] = new_password
  820         PROVIDERS.identity_api.update_user(self.user['id'], self.user)
  821         # user can change password after admin reset
  822         self.assertValidChangePassword(self.user['id'], new_password,
  823                                        uuid.uuid4().hex)
  824 
  825     def assertValidChangePassword(self, user_id, password, new_password):
  826         with self.make_request():
  827             PROVIDERS.identity_api.change_password(
  828                 user_id=user_id, original_password=password,
  829                 new_password=new_password
  830             )
  831             PROVIDERS.identity_api.authenticate(
  832                 user_id=user_id, password=new_password
  833             )
  834 
  835     def _create_new_user(self, password):
  836         user = {
  837             'name': uuid.uuid4().hex,
  838             'domain_id': CONF.identity.default_domain_id,
  839             'enabled': True,
  840             'password': password
  841         }
  842         return PROVIDERS.identity_api.create_user(user)
  843 
  844     def _update_password_created_at(self, user_id, password_create_at):
  845         # User instance has an attribute password_ref. This attribute is used
  846         # in authentication. It always points to the last created password. The
  847         # order of passwords is determined by `created_at` field.
  848         # By changing `created_at`, this method interferes with password_ref
  849         # behaviour, making it return not last value. That's why all passwords
  850         # except the latest, need to have `created_at` slightly less than
  851         # the latest password.
  852         with sql.session_for_write() as session:
  853             user_ref = session.query(model.User).get(user_id)
  854             latest_password = user_ref.password_ref
  855             slightly_less = datetime.timedelta(minutes=1)
  856             for password_ref in user_ref.local_user.passwords:
  857                 password_ref.created_at = password_create_at - slightly_less
  858             latest_password.created_at = password_create_at
  859 
  860 
  861 class ChangePasswordRequiredAfterFirstUse(test_backend_sql.SqlTests):
  862     def _create_user(self, password, change_password_upon_first_use):
  863         self.config_fixture.config(
  864             group='security_compliance',
  865             change_password_upon_first_use=change_password_upon_first_use)
  866         user_dict = {
  867             'name': uuid.uuid4().hex,
  868             'domain_id': CONF.identity.default_domain_id,
  869             'enabled': True,
  870             'password': password
  871         }
  872         return PROVIDERS.identity_api.create_user(user_dict)
  873 
  874     def assertPasswordIsExpired(self, user_id, password):
  875         with self.make_request():
  876             self.assertRaises(exception.PasswordExpired,
  877                               PROVIDERS.identity_api.authenticate,
  878                               user_id=user_id,
  879                               password=password)
  880 
  881     def assertPasswordIsNotExpired(self, user_id, password):
  882         with self.make_request():
  883             PROVIDERS.identity_api.authenticate(
  884                 user_id=user_id, password=password
  885             )
  886 
  887     def test_password_expired_after_create(self):
  888         # create user, password expired
  889         initial_password = uuid.uuid4().hex
  890         user = self._create_user(initial_password, True)
  891         self.assertPasswordIsExpired(user['id'], initial_password)
  892         # change password (self-service), password not expired
  893         new_password = uuid.uuid4().hex
  894         with self.make_request():
  895             PROVIDERS.identity_api.change_password(
  896                 user['id'], initial_password, new_password
  897             )
  898         self.assertPasswordIsNotExpired(user['id'], new_password)
  899 
  900     def test_password_expired_after_reset(self):
  901         # create user with feature disabled, password not expired
  902         initial_password = uuid.uuid4().hex
  903         user = self._create_user(initial_password, False)
  904         self.assertPasswordIsNotExpired(user['id'], initial_password)
  905         # enable change_password_upon_first_use
  906         self.config_fixture.config(
  907             group='security_compliance',
  908             change_password_upon_first_use=True)
  909         # admin reset, password expired
  910         admin_password = uuid.uuid4().hex
  911         user['password'] = admin_password
  912         PROVIDERS.identity_api.update_user(user['id'], user)
  913         self.assertPasswordIsExpired(user['id'], admin_password)
  914         # change password (self-service), password not expired
  915         new_password = uuid.uuid4().hex
  916         with self.make_request():
  917             PROVIDERS.identity_api.change_password(
  918                 user['id'], admin_password, new_password
  919             )
  920         self.assertPasswordIsNotExpired(user['id'], new_password)
  921 
  922     def test_password_not_expired_when_feature_disabled(self):
  923         # create user with feature disabled
  924         initial_password = uuid.uuid4().hex
  925         user = self._create_user(initial_password, False)
  926         self.assertPasswordIsNotExpired(user['id'], initial_password)
  927         # admin reset
  928         admin_password = uuid.uuid4().hex
  929         user['password'] = admin_password
  930         PROVIDERS.identity_api.update_user(user['id'], user)
  931         self.assertPasswordIsNotExpired(user['id'], admin_password)
  932 
  933     def test_password_not_expired_for_ignore_user(self):
  934         # create user with feature disabled, password not expired
  935         initial_password = uuid.uuid4().hex
  936         user = self._create_user(initial_password, False)
  937         self.assertPasswordIsNotExpired(user['id'], initial_password)
  938         # enable change_password_upon_first_use
  939         self.config_fixture.config(
  940             group='security_compliance',
  941             change_password_upon_first_use=True)
  942         # ignore user and reset password, password not expired
  943         user['options'][iro.IGNORE_CHANGE_PASSWORD_OPT.option_name] = True
  944         admin_password = uuid.uuid4().hex
  945         user['password'] = admin_password
  946         PROVIDERS.identity_api.update_user(user['id'], user)
  947         self.assertPasswordIsNotExpired(user['id'], admin_password)
  948         # set ignore user to false and reset password, password is expired
  949         user['options'][iro.IGNORE_CHANGE_PASSWORD_OPT.option_name] = False
  950         admin_password = uuid.uuid4().hex
  951         user['password'] = admin_password
  952         PROVIDERS.identity_api.update_user(user['id'], user)
  953         self.assertPasswordIsExpired(user['id'], admin_password)